如何从您的图中流式传输 LLM 令牌¶
在本示例中,我们将从为代理提供支持的语言模型中流式传输令牌。我们将使用 ReAct 代理作为示例。
本操作指南与本目录中的其他指南非常类似,因此我们将在下面使用 STREAMING 标记来突出显示与其他指南的差异(如果您只想搜索这些差异)。
注意
在本操作指南中,我们将从头开始创建代理以提高透明度(但会很冗长)。您可以使用 create_react_agent(model, tools=tool)
(API 文档) 构造函数来实现类似的功能。如果您习惯于 LangChain 的 AgentExecutor 类,这可能更合适。
关于 Python < 3.11 的注意
在使用 python 3.8、3.9 或 3.10 时,请确保您手动将 RunnableConfig 传递给调用时的 llm,如下所示:llm.ainvoke(..., config)
。流式方法使用作为回调传递的流式跟踪器收集来自嵌套代码的所有事件。在 3.11 及更高版本中,这是通过 contextvar 自动处理的;在 3.11 之前,asyncio 的任务 缺乏适当的 contextvar 支持,这意味着回调将仅在您手动传递配置时才会传播。我们在下面的 call_model
方法中执行此操作。
设置¶
首先,我们需要安装所需的软件包。
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langsmith
接下来,我们需要为 OpenAI(我们将使用的 LLM)设置 API 密钥。
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
设置状态¶
langgraph
中的主要图形类型是 StateGraph。此图由它传递给每个节点的 State
对象参数化。然后,每个节点都会返回图形用来 update
该状态的操作。这些操作可以设置状态上的特定属性(例如,覆盖现有值)或添加到现有属性。是否设置或添加取决于您用来构造图形的 State
对象的注释。
对于本示例,我们将跟踪的状态只是一系列消息。我们希望每个节点都只是向该列表添加消息。因此,我们将使用一个带有单个键(messages
)的 TypedDict
并对其进行注释,以便 messages
属性是“仅追加”的。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
# Add messages essentially does this with more
# robust handling
# def add_messages(left: list, right: list):
# return left + right
class State(TypedDict):
messages: Annotated[list, add_messages]
from langchain_core.tools import tool
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder, but don't tell the LLM that...
return ["Cloudy with a chance of hail."]
tools = [search]
现在,我们可以将这些工具包装在一个简单的 ToolNode 中。这是一个简单的类,它接受包含 带有 tool_calls 的 AIMessages 的消息列表,运行工具,并将输出作为 ToolMessage 返回。
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools)
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo")
完成此操作后,我们应该确保模型知道它可以使用这些工具。我们可以通过将 LangChain 工具转换为函数调用的格式,然后将它们绑定到模型类来做到这一点。
model = model.bind_tools(tools)
定义节点¶
现在,我们需要在图中定义几个不同的节点。在 langgraph
中,节点可以是函数或 可运行。对于本示例,我们需要两个主要的节点。
- 代理:负责决定采取什么(如果有的话)操作。
- 用于调用工具的函数:如果代理决定采取操作,此节点将执行该操作。
我们还需要定义一些边。其中一些边可能是条件性的。它们是条件性的原因是,根据节点的输出,可能会走多条路径之一。哪条路径将被走直到该节点运行才知道(LLM 决定)。
- 条件边:调用代理后,我们应该:
- a. 如果代理说要采取操作,则应调用用于调用工具的函数
b. 如果代理说它已完成,则应完成
正常边:调用工具后,它应该始终返回到代理以决定下一步该做什么
让我们定义节点以及一个函数来决定如何决定使用哪条条件边。
STREAMING
我们将每个节点定义为一个异步函数。
from typing import Literal
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph
# Define the function that determines whether to continue or not
def should_continue(state: State):
messages = state["messages"]
last_message = messages[-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return END
# Otherwise if there is, we continue
else:
return "tools"
# Define the function that calls the model
async def call_model(state: State, config: RunnableConfig):
messages = state["messages"]
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.pythonlang.cn/3/library/asyncio-task.html#creating-tasks
response = await model.ainvoke(messages, config)
# We return a list, because this will get added to the existing list
return {"messages": response}
call_model(state: State, config: RunnableConfig):
中,我们 a) 在节点中接受 RunnableConfig,以及 b) 将其作为 llm.ainvoke(..., config)
的第二个参数传递。对于 python 3.11 及更高版本,这是可选的。In [6]
from typing import Literal from langchain_core.runnables import RunnableConfig from langgraph.graph import END, START, StateGraph # 定义一个函数来确定是否继续 def should_continue(state: State): messages = state["messages"] last_message = messages[-1] # 如果没有函数调用,则我们完成 if not last_message.tool_calls: return END # 否则,如果有,我们继续 else: return "tools" # 定义调用模型的函数 async def call_model(state: State, config: RunnableConfig): messages = state["messages"] # 注意:对于 python < 3.11,显式地传递配置是必需的 # 因为 contextvar 支持在 3.11 之前没有添加:https://docs.pythonlang.cn/3/library/asyncio-task.html#creating-tasks response = await model.ainvoke(messages, config) # 我们返回一个列表,因为这将被添加到现有列表中 return {"messages": response}
# Define a new graph
workflow = StateGraph(State)
# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Next we pass in the path map - all the nodes this edge could go to
["tools", END],
)
workflow.add_edge("tools", "agent")
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
In [8]
from IPython.display import Image, display display(Image(app.get_graph().draw_mermaid_png()))
from langchain_core.messages import AIMessageChunk, HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
first = True
async for msg, metadata in app.astream({"messages": inputs}, stream_mode="messages"):
if msg.content and not isinstance(msg, HumanMessage):
print(msg.content, end="|", flush=True)
if isinstance(msg, AIMessageChunk):
if first:
gathered = msg
first = False
else:
gathered = gathered + msg
if msg.tool_call_chunks:
print(gathered.tool_calls)
ChatOpenAI(model="gpt-3.5-turbo-1106", streaming=True)
)[{'name': 'search', 'args': {}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': ''}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': 'weather'}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': 'weather in'}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': 'weather in San'}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': 'weather in San Francisco'}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] [{'name': 'search', 'args': {'query': 'weather in San Francisco'}, 'id': 'call_lfwgOci165GXplBjSDBeD4sE', 'type': 'tool_call'}] ["Cloudy with a chance of hail."]|The| weather| in| San| Francisco| is| currently| cloudy| with| a| chance| of| hail|.|