从基础到高级探索 LangGraph

从基础到高级探索 LangGraph

Barry Lv6

构建具有人机交互的单代理和多代理工作流

LangChain 是构建由大型语言模型驱动的应用程序的领先框架之一。借助 LangChain 表达语言 (LCEL),定义和执行逐步的动作序列——也称为链——变得更加简单。从更技术的角度来看,LangChain 允许我们创建 DAG(有向无环图)。

随着 LLM 应用程序,特别是 LLM 代理的发展,我们开始不仅将 LLM 用于执行,还将其作为推理引擎。这一转变引入了频繁涉及重复(循环)和复杂条件的交互。在这种情况下,LCEL 不够用,因此 LangChain 实现了一个新模块——LangGraph

LangGraph(正如你从名称中可能猜到的)将所有交互建模为循环图。这些图使得开发具有多个循环和条件语句的高级工作流和交互成为可能,使其成为创建代理和多代理工作流的便捷工具。

在本文中,我将探讨 LangGraph 的关键特性和能力,包括多代理应用程序。我们将构建一个能够回答不同类型问题的系统,并深入了解如何实现人机交互设置。

上一篇文章 中,我们尝试使用 CrewAI,这是另一个流行的多代理系统框架。然而,LangGraph 采取了不同的方法。虽然 CrewAI 是一个具有许多预定义功能和现成组件的高级框架,但 LangGraph 在较低的层面上运行,提供广泛的自定义和控制。

通过这段介绍,让我们深入了解 LangGraph 的基本概念。

LangGraph 基础

LangGraph 是 LangChain 生态系统的一部分,因此我们将继续使用一些众所周知的概念,如提示模板、工具等。然而,LangGraph 引入了一些 额外的概念。 让我们来讨论一下它们。

LangGraph 的创建是为了定义循环图。图由以下元素组成:

  • 节点代表实际的操作,可以是 LLM、代理或函数。此外,一个特殊的 END 节点标记执行的结束。
  • 边连接节点并确定图的执行流程。有基本边,它们只是简单地将一个节点链接到另一个节点,还有条件边,它们包含 if 语句和额外的逻辑。

另一个重要的概念是图的状态。状态作为图组件之间协作的基础元素。它代表图的快照,任何部分——无论是节点还是边——都可以在执行期间访问和修改,以检索或更新信息。

此外,状态在持久性中也起着至关重要的作用。它在每一步之后自动保存,允许您在任何时刻暂停和恢复执行。此功能支持开发更复杂的应用程序,例如需要错误修正或包含人机交互的应用程序。

单代理工作流程

从零开始构建代理

让我们从简单的开始,尝试使用 LangGraph 进行一个基本用例——一个带有工具的代理。

我将尝试构建类似于我们在 上一篇文章 中使用 CrewAI 的应用程序。然后,我们将能够比较这两个框架。在这个例子中,让我们创建一个可以根据数据库中的表自动生成文档的应用程序。当我们为数据源创建文档时,这可以节省我们相当多的时间。

和往常一样,我们将首先定义代理的工具。由于我将在这个例子中使用 ClickHouse 数据库,因此我定义了一个执行任何查询的函数。如果你愿意,可以使用不同的数据库,因为我们不会依赖任何特定于数据库的功能。

1
2
3
4
5
6
7
8
9
10
CH_HOST = 'http://localhost:8123' # 默认地址 
import requests

def get_clickhouse_data(query, host = CH_HOST, connection_timeout = 1500):
r = requests.post(host, params = {'query': query},
timeout = connection_timeout)
if r.status_code == 200:
return r.text
else:
return '数据库返回以下错误:\n' + r.text

使 LLM 工具可靠且不易出错至关重要。如果数据库返回错误,我会将此反馈提供给 LLM,而不是抛出异常并停止执行。然后,LLM 代理将有机会修复错误并再次调用该函数。

让我们定义一个名为 execute_sql 的工具,它可以执行任何 SQL 查询。我们使用 pydantic 来指定工具的结构,确保 LLM 代理拥有有效使用该工具所需的所有信息。

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.tools import tool
from pydantic.v1 import BaseModel, Field
from typing import Optional

class SQLQuery(BaseModel):
query: str = Field(description="要执行的 SQL 查询")

@tool(args_schema = SQLQuery)
def execute_sql(query: str) -> str:
"""返回 SQL 查询执行的结果"""
return get_clickhouse_data(query)

我们可以打印创建的工具的参数,以查看传递给 LLM 的信息。

1
2
3
4
5
6
7
8
9
10
print(f'''
name: {execute_sql.name}
description: {execute_sql.description}
arguments: {execute_sql.args}
''')

# name: execute_sql
# description: 返回 SQL 查询执行的结果
# arguments: {'query': {'title': 'Query', 'description':
# '要执行的 SQL 查询', 'type': 'string'}}

一切看起来都很好。我们已经设置了必要的工具,现在可以继续定义 LLM 代理。正如我们上面讨论的,LangGraph 中代理的基石是其状态,这使得我们图的不同部分之间能够共享信息。

我们当前的例子相对简单。因此,我们只需要存储消息的历史记录。让我们定义代理状态。

1
2
3
4
5
6
7
8
9
# 有用的导入
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

# 定义代理状态
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]

我们在 AgentState 中定义了一个参数——messages——它是 AnyMessage 类对象的列表。此外,我们用 operator.add(归约器)对其进行了注解。这个注解确保每次节点返回消息时,它都会附加到状态中的现有列表中。如果没有这个操作符,每条新消息将替换先前的值,而不是添加到列表中。

下一步是定义代理本身。让我们从 __init__ 函数开始。我们将为代理指定三个参数:模型、工具列表和系统提示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SQLAgent:
# 初始化对象
def __init__(self, model, tools, system_prompt = ""):
self.system_prompt = system_prompt

# 用状态初始化图
graph = StateGraph(AgentState)

# 添加节点
graph.add_node("llm", self.call_llm)
graph.add_node("function", self.execute_function)
graph.add_conditional_edges(
"llm",
self.exists_function_calling,
{True: "function", False: END}
)
graph.add_edge("function", "llm")

# 设置起始点
graph.set_entry_point("llm")

self.graph = graph.compile()
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)

在初始化函数中,我们概述了图的结构,其中包括两个节点:llmaction。节点是实际的操作,因此我们有与之关联的函数。我们稍后将定义这些函数。

此外,我们有一个条件边,决定是否需要执行函数或生成最终答案。对于这个边,我们需要指定前一个节点(在我们的例子中是 llm)、一个决定下一步的函数,以及基于函数输出的后续步骤映射(格式为字典)。如果 exists_function_calling 返回 True,我们将继续到函数节点。否则,执行将在特殊的 END 节点结束,标志着过程的结束。

我们在 functionllm 之间添加了一条边。它只是将这两个步骤链接在一起,并将在没有任何条件的情况下执行。

在定义了主要结构后,现在是时候创建上述所有函数。第一个是 call_llm。这个函数将执行 LLM 并返回结果。

代理状态将自动传递给该函数,因此我们可以使用保存的系统提示和模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
class SQLAgent:
<...>

def call_llm(self, state: AgentState):
messages = state['messages']
# 如果定义了系统提示,则添加系统提示
if self.system_prompt:
messages = [SystemMessage(content=self.system_prompt)] + messages

# 调用 LLM
message = self.model.invoke(messages)

return {'messages': [message]}

因此,我们的函数返回一个字典,将用于更新代理状态。由于我们将 operator.add 用作状态的归约器,返回的消息将附加到存储在状态中的消息列表中。

我们需要的下一个函数是 execute_function,它将运行我们的工具。如果 LLM 代理决定调用一个工具,我们将在 message.tool_calls 参数中看到它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SQLAgent:
<...>

def execute_function(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls

results = []
for tool in tool_calls:
# 检查工具名称是否正确
if not t['name'] in self.tools:
# 将错误返回给代理
result = "错误: 没有这样的工具,请重试"
else:
# 从工具获取结果
result = self.tools[t['name']].invoke(t['args'])

results.append(
ToolMessage(
tool_call_id=t['id'],
name=t['name'],
content=str(result)
)
)
return {'messages': results}

在这个函数中,我们遍历 LLM 返回的工具调用,并调用这些工具或返回错误消息。最后,我们的函数返回一个字典,包含一个键 messages,将用于更新图状态。

只剩下一个函数——用于条件边的函数,它定义我们是否需要执行工具或提供最终结果。这非常简单。我们只需要检查最后一条消息是否包含任何工具调用。

1
2
3
4
5
6
class SQLAgent:
<...>

def exists_function_calling(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0

现在是时候为代理和 LLM 模型创建它。我将使用新的 OpenAI GPT 4o mini 模型(文档 ),因为它比 GPT 3.5 更便宜且性能更好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os

# 设置凭证
os.environ["OPENAI_MODEL_NAME"]='gpt-4o-mini'
os.environ["OPENAI_API_KEY"] = '<your_api_key>'

# 系统提示
prompt = '''您是 SQL 和数据分析方面的高级专家。
因此,您可以帮助团队收集所需的数据以支持他们的决策。
您非常准确,并考虑到数据中的所有细微差别。
您的目标是为数据库中的表提供详细的文档
以帮助用户。'''

model = ChatOpenAI(model="gpt-4o-mini")
doc_agent = SQLAgent(model, [execute_sql], system=prompt)

LangGraph 为我们提供了一个相当方便的功能来可视化图形。要使用它,您需要安装 pygraphviz

对于 M1/M2 芯片的 Mac 来说,这有点棘手,所以这里有一个小窍门(来源 ):

1
2
3
4
5
6
! brew install graphviz
! python3 -m pip install -U --no-cache-dir \
--config-settings="--global-option=build_ext" \
--config-settings="--global-option=-I$(brew --prefix graphviz)/include/" \
--config-settings="--global-option=-L$(brew --prefix graphviz)/lib/" \
pygraphviz

在解决安装问题后,这就是我们的图。

1
2
from IPython.display import Image
Image(doc_agent.graph.get_graph().draw_png())

如您所见,我们的图有循环。用 LCEL 实现这样的东西将相当具有挑战性。

最后,是时候执行我们的代理了。我们需要将初始消息集与我们的提问作为 HumanMessage 传递。

1
2
messages = [HumanMessage(content="我们在 ecommerce_db.users 表中有什么信息?")]
result = doc_agent.graph.invoke({"messages": messages})

result 变量中,我们可以观察到执行过程中生成的所有消息。该过程按预期工作:

  • 代理决定调用查询 describe ecommerce.db_users 的函数。
  • 然后 LLM 处理工具中的信息并提供用户友好的答案。
1
2
3
4
5
6
7
8
result['messages']

# [
# HumanMessage(content='我们在 ecommerce_db.users 表中有什么信息?'),
# AIMessage(content='', tool_calls=[{'name': 'execute_sql', 'args': {'query': 'DESCRIBE ecommerce_db.users;'}, 'id': 'call_qZbDU9Coa2tMjUARcX36h0ax', 'type': 'tool_call'}]),
# ToolMessage(content='user_id\tUInt64\t\t\t\t\t\ncountry\tString\t\t\t\t\t\nis_active\tUInt8\t\t\t\t\t\nage\tUInt64\t\t\t\t\t\n', name='execute_sql', tool_call_id='call_qZbDU9Coa2tMjUARcX36h0ax'),
# AIMessage(content='`ecommerce_db.users` 表包含以下列:<...>')
# ]

Here’s the final result. It looks pretty decent.

1
2
3
4
5
6
7
print(result['messages'][-1].content)

# `ecommerce_db.users` 表包含以下列:
# 1. **user_id**: `UInt64` - 每个用户的唯一标识符。
# 2. **country**: `String` - 用户所在的国家。
# 3. **is_active**: `UInt8` - 指示用户是否活跃(1)或不活跃(0)。
# 4. **age**: `UInt64` - 用户的年龄。

使用预构建代理

我们已经学习了如何从头开始构建代理。然而,我们可以利用 LangGraph 的内置功能来处理像这样的简单任务。

我们可以使用一个 预构建的 ReAct 代理 来获得类似的结果:一个可以与工具协作的代理。

1
2
3
from langgraph.prebuilt import create_react_agent
prebuilt_doc_agent = create_react_agent(model, [execute_sql],
state_modifier = system_prompt)

它与我们之前构建的代理是相同的。我们稍后会尝试一下,但首先,我们需要理解另外两个重要概念:持久性和流式处理。

持久性和流式处理

持久性是指在不同交互之间保持上下文的能力。当应用程序可以从用户获取额外输入时,这对于自主使用案例至关重要。

LangGraph 在每一步之后自动保存状态,允许您暂停或恢复执行。此功能支持实现高级业务逻辑,例如错误恢复或人机交互。

添加持久性最简单的方法是使用内存中的 SQLite 数据库。

1
2
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

对于现成的代理,我们可以在创建代理时将内存作为参数传递。

1
2
prebuilt_doc_agent = create_react_agent(model, [execute_sql], 
checkpointer=memory)

如果您正在使用自定义代理,则需要在编译图形时将内存作为检查点传递。

1
2
3
4
5
class SQLAgent:
def __init__(self, model, tools, system_prompt = ""):
<...>
self.graph = graph.compile(checkpointer=memory)
<...>

让我们执行代理并探索 LangGraph 的另一个特性:流式处理。通过流式处理,我们可以将每一步执行的结果作为流中的单独事件接收。此功能对于生产应用程序至关重要,因为需要同时处理多个对话(或线程)。

LangGraph 不仅支持事件流式处理,还支持令牌级流式处理。我想到的令牌流式处理的唯一用例是逐字实时显示答案(类似于 ChatGPT 的实现)。

让我们尝试使用流式处理与我们的新预构建代理。我还将使用 pretty_print 函数来格式化消息,使结果更易读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# defining thread
thread = {"configurable": {"thread_id": "1"}}
messages = [HumanMessage(content="What info do we have in ecommerce_db.users table?")]

for event in prebuilt_doc_agent.stream({"messages": messages}, thread):
for v in event.values():
v['messages'][-1].pretty_print()

# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_YieWiChbFuOlxBg8G1jDJitR)
# Call ID: call_YieWiChbFuOlxBg8G1jDJitR
# Args:
# query: SELECT * FROM ecommerce_db.users LIMIT 1;
# ================================= Tool Message =================================
# Name: execute_sql
# 1000001 United Kingdom 0 70
#
# ================================== Ai Message ==================================
#
# The `ecommerce_db.users` table contains at least the following information for users:
#
# - **User ID** (e.g., `1000001`)
# - **Country** (e.g., `United Kingdom`)
# - **Some numerical value** (e.g., `0`)
# - **Another numerical value** (e.g., `70`)
#
# The specific meaning of the numerical values and additional columns
# is not clear from the single row retrieved. Would you like more details
# or a broader query?

有趣的是,代理未能提供足够好的结果。由于代理没有查找表模式,因此很难猜测所有列的含义。我们可以通过在同一线程中使用后续问题来改善结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
followup_messages = [HumanMessage(content="I would like to know the column names and types. Maybe you could look it up in database using describe.")]

for event in prebuilt_doc_agent.stream({"messages": followup_messages}, thread):
for v in event.values():
v['messages'][-1].pretty_print()

# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_sQKRWtG6aEB38rtOpZszxTVs)
# Call ID: call_sQKRWtG6aEB38rtOpZszxTVs
# Args:
# query: DESCRIBE ecommerce_db.users;
# ================================= Tool Message =================================
# Name: execute_sql
#
# user_id UInt64
# country String
# is_active UInt8
# age UInt64
#
# ================================== Ai Message ==================================
#
# The `ecommerce_db.users` table has the following columns along with their data types:
#
# | Column Name | Data Type |
# |-------------|-----------|
# | user_id | UInt64 |
# | country | String |
# | is_active | UInt8 |
# | age | UInt64 |
#
# If you need further information or assistance, feel free to ask!

这次,我们从代理那里得到了完整的答案。由于我们提供了相同的线程,代理能够从之前的讨论中获取上下文。这就是持久性的工作原理。

让我们尝试更改线程并询问相同的后续问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
new_thread = {"configurable": {"thread_id": "42"}}
followup_messages = [HumanMessage(content="I would like to know the column names and types. Maybe you could look it up in database using describe.")]

for event in prebuilt_doc_agent.stream({"messages": followup_messages}, new_thread):
for v in event.values():
v['messages'][-1].pretty_print()

# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_LrmsOGzzusaLEZLP9hGTBGgo)
# Call ID: call_LrmsOGzzusaLEZLP9hGTBGgo
# Args:
# query: DESCRIBE your_table_name;
# ================================= Tool Message =================================
# Name: execute_sql
#
# Database returned the following error:
# Code: 60. DB::Exception: Table default.your_table_name does not exist. (UNKNOWN_TABLE) (version 23.12.1.414 (official build))
#
# ================================== Ai Message ==================================
#
# It seems that the table `your_table_name` does not exist in the database.
# Could you please provide the actual name of the table you want to describe?

代理缺乏回答我们问题所需的上下文并不令人惊讶。线程旨在隔离不同的对话,确保每个线程保持自己的上下文。

在实际应用中,管理内存至关重要。对话可能会变得相当冗长,在某些时候,每次都将整个历史记录传递给 LLM 并不实用。因此,值得修剪或过滤消息。我们在这里不会深入具体细节,但您可以在 LangGraph 文档 中找到相关指导。压缩对话历史的另一种选择是使用摘要(示例 )。

我们已经学习了如何使用 LangGraph 构建单代理系统。下一步是将多个代理组合到一个应用程序中。

多智能体系统

作为多智能体工作流的一个例子,我想构建一个能够处理来自各个领域问题的应用程序。我们将拥有一组专家代理,每个代理专注于不同类型的问题,以及一个路由代理,它将找到最合适的专家来处理每个查询。这样的应用程序有许多潜在的用例:从自动化客户支持到回答同事在内部聊天中的问题。

首先,我们需要创建代理状态——将帮助代理共同解决问题的信息。我将使用以下字段:

  • question — 初始客户请求;
  • question_type — 定义哪个代理将处理请求的类别;
  • answer — 对问题的建议答案;
  • feedback — 一个未来使用的字段,用于收集反馈。
1
2
3
4
5
class MultiAgentState(TypedDict):
question: str
question_type: str
answer: str
feedback: str

我不使用任何 reducers,因此我们的状态将仅存储每个字段的最新版本。

接下来,让我们创建一个路由节点。它将是一个简单的 LLM 模型,用于定义问题的类别(数据库、LangChain 或一般问题)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
question_category_prompt = '''You are a senior specialist of analytical support. Your task is to classify the incoming questions. 
Depending on your answer, question will be routed to the right team, so your task is crucial for our team.
There are 3 possible question types:
- DATABASE - questions related to our database (tables or fields)
- LANGCHAIN- questions related to LangGraph or LangChain libraries
- GENERAL - general questions
Return in the output only one word (DATABASE, LANGCHAIN or GENERAL).
'''

def router_node(state: MultiAgentState):
messages = [
SystemMessage(content=question_category_prompt),
HumanMessage(content=state['question'])
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"question_type": response.content}

现在我们有了第一个节点——路由器——让我们构建一个简单的图来测试工作流。

1
2
3
4
5
6
7
8
9
memory = SqliteSaver.from_conn_string(":memory:")

builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)

builder.set_entry_point("router")
builder.add_edge('router', END)

graph = builder.compile(checkpointer=memory)

让我们用不同类型的问题测试我们的工作流,看看它在实际操作中的表现。这将帮助我们评估路由代理是否正确地将问题分配给适当的专家代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream({
'question': "Does LangChain support Ollama?",
}, thread):
print(s)

# {'router': {'question_type': 'LANGCHAIN'}}

thread = {"configurable": {"thread_id": "2"}}
for s in graph.stream({
'question': "What info do we have in ecommerce_db.users table?",
}, thread):
print(s)
# {'router': {'question_type': 'DATABASE'}}

thread = {"configurable": {"thread_id": "3"}}
for s in graph.stream({
'question': "How are you?",
}, thread):
print(s)

# {'router': {'question_type': 'GENERAL'}}

它运行良好。我建议您逐步构建复杂图形,并独立测试每个步骤。通过这种方法,您可以确保每次迭代都按预期工作,并可以节省大量调试时间。

接下来,让我们为我们的专家代理创建节点。我们将使用之前构建的 SQL 工具的 ReAct 代理作为数据库代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# database expert
sql_expert_system_prompt = '''
You are an expert in SQL, so you can help the team
to gather needed data to power their decisions.
You are very accurate and take into account all the nuances in data.
You use SQL to get the data before answering the question.
'''

def sql_expert_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
sql_agent = create_react_agent(model, [execute_sql],
state_modifier = sql_expert_system_prompt)
messages = [HumanMessage(content=state['question'])]
result = sql_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}

对于与 LangChain 相关的问题,我们将使用 ReAct 代理。为了使代理能够回答有关该库的问题,我们将为其配备一个搜索引擎工具。我选择了 Tavily 作为此目的,因为它提供了针对 LLM 应用程序优化的搜索结果。

如果您没有帐户,可以注册以免费使用 Tavily(每月最多 1K 请求)。要开始使用,您需要在环境变量中指定 Tavily API 密钥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# search expert 
from langchain_community.tools.tavily_search import TavilySearchResults
os.environ["TAVILY_API_KEY"] = 'tvly-...'
tavily_tool = TavilySearchResults(max_results=5)

search_expert_system_prompt = '''
You are an expert in LangChain and other technologies.
Your goal is to answer questions based on results provided by search.
You don't add anything yourself and provide only information baked by other sources.
'''

def search_expert_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
sql_agent = create_react_agent(model, [tavily_tool],
state_modifier = search_expert_system_prompt)
messages = [HumanMessage(content=state['question'])]
result = sql_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}

对于一般问题,我们将利用一个没有特定工具的简单 LLM 模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
# general model
general_prompt = '''You're a friendly assistant and your goal is to answer general questions.
Please, don't provide any unchecked information and just tell that you don't know if you don't have enough info.
'''

def general_assistant_node(state: MultiAgentState):
messages = [
SystemMessage(content=general_prompt),
HumanMessage(content=state['question'])
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"answer": response.content}

最后缺少的是一个用于路由的条件函数。这将非常简单——我们只需从路由节点定义的状态中传播问题类型。

1
2
def route_question(state: MultiAgentState):
return state['question_type']

现在,是时候创建我们的图形了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)

builder.set_entry_point("router")
builder.add_edge('database_expert', END)
builder.add_edge('langchain_expert', END)
builder.add_edge('general_assistant', END)
graph = builder.compile(checkpointer=memory)

现在,我们可以在几个问题上测试设置,以查看它的表现如何。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
thread = {"configurable": {"thread_id": "2"}}
results = []
for s in graph.stream({
'question': "What info do we have in ecommerce_db.users table?",
}, thread):
print(s)
results.append(s)
print(results[-1]['database_expert']['answer'])

# The `ecommerce_db.users` table contains the following columns:
# 1. **User ID**: A unique identifier for each user.
# 2. **Country**: The country where the user is located.
# 3. **Is Active**: A flag indicating whether the user is active (1 for active, 0 for inactive).
# 4. **Age**: The age of the user.
# Here are some sample entries from the table:
#
# | User ID | Country | Is Active | Age |
# |---------|----------------|-----------|-----|
# | 1000001 | United Kingdom | 0 | 70 |
# | 1000002 | France | 1 | 87 |
# | 1000003 | France | 1 | 88 |
# | 1000004 | Germany | 1 | 25 |
# | 1000005 | Germany | 1 | 48 |
#
# This gives an overview of the user data available in the table.

干得好!它为与数据库相关的问题提供了相关结果。让我们尝试询问有关 LangChain 的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
thread = {"configurable": {"thread_id": "42"}}
results = []
for s in graph.stream({
'question': "Does LangChain support Ollama?",
}, thread):
print(s)
results.append(s)

print(results[-1]['langchain_expert']['answer'])

# Yes, LangChain supports Ollama. Ollama allows you to run open-source
# large language models, such as Llama 2, locally, and LangChain provides
# a flexible framework for integrating these models into applications.
# You can interact with models run by Ollama using LangChain, and there are
# specific wrappers and tools available for this integration.
#
# For more detailed information, you can visit the following resources:
# - [LangChain and Ollama Integration](https://js.langchain.com/v0.1/docs/integrations/llms/ollama/)
# - [ChatOllama Documentation](https://js.langchain.com/v0.2/docs/integrations/chat/ollama/)
# - [Medium Article on Ollama and LangChain](https://readmedium.com/ollama-and-langchain-run-llms-locally-900931914a46)

太棒了!一切都运作良好,显然 Tavily 的搜索对于 LLM 应用程序是有效的。

添加人机交互

我们在创建一个回答问题的工具方面做得非常出色。然而,在许多情况下,保持人类参与以批准建议的行动或提供额外反馈是有益的。让我们添加一个步骤,在返回最终结果给用户之前收集人类的反馈。

最简单的方法是添加两个额外的节点:

  • 一个 human 节点来收集反馈,
  • 一个 editor 节点来重新审视答案,考虑到反馈。

让我们创建这些节点:

  • Human 节点: 这将是一个虚拟节点,不会执行任何操作。
  • Editor 节点: 这将是一个 LLM 模型,接收所有相关信息(客户问题、草拟答案和提供的反馈)并修订最终答案。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def human_feedback_node(state: MultiAgentState):
pass

editor_prompt = '''You're an editor and your goal is to provide the final answer to the customer, taking into account the feedback.
You don't add any information on your own. You use friendly and professional tone.
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.

Question from customer:
----
{question}
----
Draft answer:
----
{answer}
----
Feedback:
----
{feedback}
----
'''

def editor_node(state: MultiAgentState):
messages = [
SystemMessage(content=editor_prompt.format(question = state['question'], answer = state['answer'], feedback = state['feedback']))
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"answer": response.content}

让我们将这些节点添加到我们的图中。此外,我们需要在 human 节点之前引入一个中断,以确保流程暂停以获取人类反馈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_node('human', human_feedback_node)
builder.add_node('editor', editor_node)

builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)

builder.set_entry_point("router")

builder.add_edge('database_expert', 'human')
builder.add_edge('langchain_expert', 'human')
builder.add_edge('general_assistant', 'human')
builder.add_edge('human', 'editor')
builder.add_edge('editor', END)
graph = builder.compile(checkpointer=memory, interrupt_before = ['human'])

现在,当我们运行图时,执行将在 human 节点之前停止。

1
2
3
4
5
6
7
8
9
10
11
thread = {"configurable": {"thread_id": "2"}}

for event in graph.stream({
'question': "What are the types of fields in ecommerce_db.users table?",
}, thread):
print(event)


# {'question_type': 'DATABASE', 'question': 'What are the types of fields in ecommerce_db.users table?'}
# {'router': {'question_type': 'DATABASE'}}
# {'database_expert': {'answer': 'The `ecommerce_db.users` table has the following fields:\n\n1. **user_id**: UInt64\n2. **country**: String\n3. **is_active**: UInt8\n4. **age**: UInt64'}}

让我们获取客户输入并用反馈更新状态。

1
2
3
4
5
user_input = input("Do I need to change anything in the answer?")
# Do I need to change anything in the answer?
# It looks wonderful. Could you only make it a bit friendlier please?

graph.update_state(thread, {"feedback": user_input}, as_node="human")

我们可以检查状态以确认反馈已被填充,并且序列中的下一个节点是 editor

1
2
3
4
5
print(graph.get_state(thread).values['feedback'])
# It looks wonderful. Could you only make it a bit friendlier please?

print(graph.get_state(thread).next)
# ('editor',)

我们可以继续执行。传递 None 作为输入将从暂停的地方恢复流程。

1
2
3
4
5
6
7
8
9
10
11
for event in graph.stream(None, thread, stream_mode="values"):
print(event)

print(event['answer'])

# Hello! The `ecommerce_db.users` table has the following fields:
# 1. **user_id**: UInt64
# 2. **country**: String
# 3. **is_active**: UInt8
# 4. **age**: UInt64
# Have a nice day!

编辑器考虑了我们的反馈,并在最终消息中添加了一些礼貌用语。这是一个很棒的结果!

我们可以通过为我们的编辑器配备 Human 工具,以更具主动性的方式实现人机交互。

让我们调整我们的编辑器。我稍微修改了提示并将工具添加到代理中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from langchain_community.tools import HumanInputRun
human_tool = HumanInputRun()

editor_agent_prompt = '''You're an editor and your goal is to provide the final answer to the customer, taking into the initial question.
If you need any clarifications or need feedback, please, use human. Always reach out to human to get the feedback before final answer.
You don't add any information on your own. You use friendly and professional tone.
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.

Question from customer:
----
{question}
----
Draft answer:
----
{answer}
----
'''

model = ChatOpenAI(model="gpt-4o-mini")
editor_agent = create_react_agent(model, [human_tool])
messages = [SystemMessage(content=editor_agent_prompt.format(question = state['question'], answer = state['answer']))]
editor_result = editor_agent.invoke({"messages": messages})

# Is the draft answer complete and accurate for the customer's question about the types of fields in the ecommerce_db.users table?
# Yes, but could you please make it friendlier.

print(editor_result['messages'][-1].content)
# The `ecommerce_db.users` table has the following fields:
# 1. **user_id**: UInt64
# 2. **country**: String
# 3. **is_active**: UInt8
# 4. **age**: UInt64
#
# If you have any more questions, feel free to ask!

因此,编辑器向人类提出了问题:“草拟答案是否完整且准确地回答了客户关于 ecommerce_db.users 表中字段类型的问题?”在收到反馈后,编辑器修订了答案,使其更具用户友好性。

让我们更新我们的主图,以整合新的代理,而不是使用两个单独的节点。通过这种方法,我们不再需要中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def editor_agent_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
editor_agent = create_react_agent(model, [human_tool])
messages = [SystemMessage(content=editor_agent_prompt.format(question = state['question'], answer = state['answer']))]
result = editor_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}

builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_node('editor', editor_agent_node)

builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)

builder.set_entry_point("router")

builder.add_edge('database_expert', 'editor')
builder.add_edge('langchain_expert', 'editor')
builder.add_edge('general_assistant', 'editor')
builder.add_edge('editor', END)

graph = builder.compile(checkpointer=memory)

thread = {"configurable": {"thread_id": "42"}}
results = []

for event in graph.stream({
'question': "What are the types of fields in ecommerce_db.users table?",
}, thread):
print(event)
results.append(event)

这个图将与之前的图类似。我个人更喜欢这种方法,因为它利用了工具,使解决方案更加灵活。例如,代理可以多次联系人工并根据需要细化问题。

就是这样。我们构建了一个多代理系统,可以回答来自不同领域的问题,并考虑人类反馈。

您可以在 GitHub 上找到完整代码。

摘要

在本文中,我们探讨了 LangGraph 库及其在构建单代理和多代理工作流中的应用。我们考察了它的一系列功能,现在是总结其优缺点的时候了。此外,将 LangGraph 与我们在 我之前的文章 中讨论的 CrewAI 进行比较也将是有益的。

总体而言,我认为 LangGraph 是一个相当强大的框架,用于构建复杂的 LLM 应用:

  • LangGraph 是一个低级框架,提供广泛的自定义选项,使您能够构建所需的精确功能。
  • 由于 LangGraph 构建在 LangChain 之上,它与其生态系统无缝集成,使得利用现有工具和组件变得简单。

然而,LangGraph 还有一些可以改进的地方:

  • LangGraph 的灵活性伴随着更高的入门门槛。虽然您可以在 15-30 分钟内理解 CrewAI 的概念,但熟悉和掌握 LangGraph 需要一些时间。
  • LangGraph 提供了更高的控制水平,但缺少 CrewAI 一些很酷的预构建功能,例如 协作 或现成的 RAG 工具。
  • LangGraph 不像 CrewAI 那样强制执行最佳实践(例如角色扮演或保护措施),这可能导致较差的结果。

我认为 CrewAI 是新手和常见用例的更好框架,因为它可以帮助您快速获得良好结果,并提供指导以防止错误。

如果您想构建高级应用并需要更多控制,LangGraph 是一个不错的选择。请记住,您需要投入时间学习 LangGraph,并对最终解决方案负全责,因为该框架不会提供指导来帮助您避免常见错误。

非常感谢您阅读本文。希望这篇文章对您有所启发。如果您有任何后续问题或评论,请在评论区留言。

参考

本文灵感来源于 DeepLearning.AI 的 “LangGraph 中的 AI 代理” 短期课程。

  • 标题: 从基础到高级探索 LangGraph
  • 作者: Barry
  • 创建于 : 2024-08-16 02:36:10
  • 更新于 : 2024-08-31 06:59:45
  • 链接: https://wx.role.fun/2024/08/16/ef6f3d52b81c4f9fa54d072e33ba6c11/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。