流式输出¶
您可以从 LangGraph Agent 或工作流中流式输出。
支持的流模式¶
将以下一种或多种流模式作为列表传递给 stream()
或 astream()
方法
模式 | 描述 |
---|---|
values |
在图的每一步之后,流式传输状态的完整值。 |
updates |
在图的每个步骤后流式传输状态更新。如果同一步骤中发生多次更新(例如,运行多个节点),则这些更新会单独流式传输。 |
custom |
从图节点内部流式传输自定义数据。 |
messages |
在任何调用 LLM 的图节点中,流式传输二元组(LLM 令牌,元数据)。 |
debug |
在图执行的整个过程中流式传输尽可能多的信息。 |
从 Agent 流式传输¶
Agent 进度¶
要流式传输 Agent 进度,请使用带有 stream_mode="updates"
的 stream()
或 astream()
方法。这会在每个 Agent 步骤后发出一个事件。
例如,如果您有一个调用工具一次的 Agent,您应该会看到以下更新
- LLM 节点:带有工具调用请求的 AI 消息
- 工具节点:带有执行结果的工具消息
- LLM 节点:最终的 AI 响应
LLM 令牌¶
要在 LLM 生成令牌时流式传输它们,请使用 stream_mode="messages"
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
async for token, metadata in agent.astream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
stream_mode="messages"
):
print("Token", token)
print("Metadata", metadata)
print("\n")
工具更新¶
要在工具执行时流式传输其更新,您可以使用 get_stream_writer。
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
writer = get_stream_writer()
# stream any arbitrary data
writer(f"Looking up data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
stream_mode="custom"
):
print(chunk)
print("\n")
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
writer = get_stream_writer()
# stream any arbitrary data
writer(f"Looking up data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
)
async for chunk in agent.astream(
{"messages": [{"role": "user", "content": "what is the weather in sf"}]},
stream_mode="custom"
):
print(chunk)
print("\n")
注意
如果您在工具内部添加了 get_stream_writer
,您将无法在 LangGraph 执行上下文之外调用该工具。
流式传输多种模式¶
您可以通过将流模式作为列表传递来指定多种流模式:stream_mode=["updates", "messages", "custom"]
禁用流式传输¶
在某些应用程序中,您可能需要为给定模型禁用单个令牌的流式传输。这在多 Agent 系统中很有用,可以控制哪些 Agent 流式传输其输出。
请参阅模型指南,了解如何禁用流式传输。
从工作流流式传输¶
基本用法示例¶
LangGraph 图提供了 .stream()
(同步) 和 .astream()
(异步) 方法,以迭代器的形式产生流式输出。
扩展示例:流式更新
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
for chunk in graph.stream( # (1)!
{"topic": "ice cream"},
stream_mode="updates", # (2)!
):
print(chunk)
stream()
方法返回一个迭代器,该迭代器产生流式输出。- 设置
stream_mode="updates"
以仅流式传输每个节点后图状态的更新。也支持其他流模式。有关详细信息,请参阅支持的流模式。
output {'refineTopic': {'topic': 'ice cream and cats'}} {'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
|
流式传输多种模式¶
您可以将列表作为 stream_mode
参数传递,以同时流式传输多种模式。
流式输出将是 (mode, chunk)
元组,其中 mode
是流模式的名称,chunk
是该模式流式传输的数据。
流式传输图状态¶
使用 updates
和 values
流模式,以在图执行时流式传输其状态。
updates
在图的每个步骤后流式传输状态的**更新**。values
在图的每个步骤后流式传输状态的**完整值**。
API 参考: StateGraph | START | END
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
流式传输子图输出¶
要在流式输出中包含子图的输出,您可以在父图的 .stream()
方法中设置 subgraphs=True
。这将同时流式传输父图和任何子图的输出。
输出将以元组 (namespace, data)
的形式流式传输,其中 namespace
是一个包含调用子图的节点路径的元组,例如 ("parent_node:<task_id>", "child_node:<task_id>")
。
for chunk in graph.stream(
{"foo": "foo"},
subgraphs=True, # (1)!
stream_mode="updates",
):
print(chunk)
- 设置
subgraphs=True
以流式传输子图的输出。
扩展示例:从子图流式传输
from langgraph.graph import START, StateGraph
from typing import TypedDict
# Define subgraph
class SubgraphState(TypedDict):
foo: str # note that this key is shared with the parent graph state
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# Define parent graph
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
for chunk in graph.stream(
{"foo": "foo"},
stream_mode="updates",
subgraphs=True, # (1)!
):
print(chunk)
- 设置
subgraphs=True
以流式传输子图的输出。
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
注意,我们不仅接收节点更新,还接收命名空间,它们告诉我们正在从哪个图(或子图)流式传输。
调试¶
使用 debug
流模式,以在图执行的整个过程中流式传输尽可能多的信息。流式输出包括节点的名称和完整状态。
LLM 令牌¶
使用 messages
流模式,可以从图的任何部分(包括节点、工具、子图或任务)逐个令牌地流式传输大型语言模型 (LLM) 的输出。
来自messages
模式的流式输出是一个元组 (message_chunk, metadata)
,其中
message_chunk
:来自 LLM 的令牌或消息段。metadata
:一个字典,包含有关图节点和 LLM 调用的详细信息。
如果您的 LLM 未作为 LangChain 集成提供,您可以使用
custom
模式来流式传输其输出。详情请参阅与任何 LLM 配合使用。
在 Python < 3.11 中使用异步需要手动配置
在 Python < 3.11 中使用异步代码时,您必须显式地将 RunnableConfig
传递给 ainvoke()
以启用正常的流式传输。详情请参阅Python < 3.11 的异步处理或升级到 Python 3.11+。
API 参考: init_chat_model | StateGraph | START
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
llm = init_chat_model(model="openai:gpt-4o-mini")
def call_model(state: MyState):
"""Call the LLM to generate a joke about a topic"""
llm_response = llm.invoke( # (1)!
[
{"role": "user", "content": f"Generate a joke about {state.topic}"}
]
)
return {"joke": llm_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
for message_chunk, metadata in graph.stream( # (2)!
{"topic": "ice cream"},
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
- 请注意,即使 LLM 是使用
.invoke
而不是.stream
运行的,也会发出消息事件。 - “messages” 流模式返回一个元组
(message_chunk, metadata)
的迭代器,其中message_chunk
是 LLM 流式传输的令牌,metadata
是一个包含有关调用 LLM 的图节点信息和其他信息的字典。
按 LLM 调用筛选¶
您可以将 tags
与 LLM 调用关联,以按 LLM 调用筛选流式传输的令牌。
API 参考: init_chat_model
from langchain.chat_models import init_chat_model
llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) # (1)!
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) # (2)!
graph = ... # define a graph that uses these LLMs
async for msg, metadata in graph.astream( # (3)!
{"topic": "cats"},
stream_mode="messages",
):
if metadata["tags"] == ["joke"]: # (4)!
print(msg.content, end="|", flush=True)
- llm_1 被标记为“joke”。
- llm_2 被标记为“poem”。
stream_mode
设置为“messages”以流式传输 LLM 令牌。metadata
包含有关 LLM 调用的信息,包括标签。- 按元数据中的
tags
字段筛选流式传输的令牌,只包括来自带有“joke”标签的 LLM 调用的令牌。
扩展示例:按标签筛选
from typing import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph
joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) # (1)!
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) # (2)!
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.pythonlang.cn/3/library/asyncio-task.html#creating-tasks
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config, # (3)!
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config, # (3)!
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages", # (4)!
):
if metadata["tags"] == ["joke"]: # (4)!
print(msg.content, end="|", flush=True)
joke_model
被标记为“joke”。poem_model
被标记为“poem”。config
被显式传递以确保上下文变量正确传播。这在使用异步代码的 Python < 3.11 中是必需的。更多详情请参阅异步部分。stream_mode
设置为“messages”以流式传输 LLM 令牌。metadata
包含有关 LLM 调用的信息,包括标签。
按节点筛选¶
要仅从特定节点流式传输令牌,请使用 stream_mode="messages"
并按流式元数据中的 langgraph_node
字段筛选输出
for msg, metadata in graph.stream( # (1)!
inputs,
stream_mode="messages",
):
if msg.content and metadata["langgraph_node"] == "some_node_name": # (2)!
...
- “messages” 流模式返回一个
(message_chunk, metadata)
元组,其中message_chunk
是 LLM 流式传输的令牌,metadata
是一个包含有关调用 LLM 的图节点信息和其他信息的字典。 - 按元数据中的
langgraph_node
字段筛选流式传输的令牌,只包括来自write_poem
节点的令牌。
扩展示例:从特定节点流式传输 LLM 令牌
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
topic: str
joke: str
poem: str
def write_joke(state: State):
topic = state["topic"]
joke_response = model.invoke(
[{"role": "user", "content": f"Write a joke about {topic}"}]
)
return {"joke": joke_response.content}
def write_poem(state: State):
topic = state["topic"]
poem_response = model.invoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}]
)
return {"poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# write both the joke and the poem concurrently
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
for msg, metadata in graph.stream( # (1)!
{"topic": "cats"},
stream_mode="messages",
):
if msg.content and metadata["langgraph_node"] == "write_poem": # (2)!
print(msg.content, end="|", flush=True)
- “messages” 流模式返回一个
(message_chunk, metadata)
元组,其中message_chunk
是 LLM 流式传输的令牌,metadata
是一个包含有关调用 LLM 的图节点信息和其他信息的字典。 - 按元数据中的
langgraph_node
字段筛选流式传输的令牌,只包括来自write_poem
节点的令牌。
流式传输自定义数据¶
要从 LangGraph 节点或工具内部发送自定义用户定义数据,请遵循以下步骤
- 使用
get_stream_writer()
访问流写入器并发出自定义数据。 - 在调用
.stream()
或.astream()
时设置stream_mode="custom"
,以在流中获取自定义数据。您可以组合多种模式(例如,["updates", "custom"]
),但至少必须有一种是"custom"
。
在 Python < 3.11 的异步模式下没有 get_stream_writer()
在 Python < 3.11 上运行的异步代码中,get_stream_writer()
将无法工作。相反,应向您的节点或工具添加一个 writer
参数并手动传递它。请参阅Python < 3.11 的异步处理以获取用法示例。
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
writer = get_stream_writer() # (1)!
writer({"custom_key": "Generating custom data inside node"}) # (2)!
return {"answer": "some data"}
graph = (
StateGraph(State)
.add_node(node)
.add_edge(START, "node")
.compile()
)
inputs = {"query": "example"}
# Usage
for chunk in graph.stream(inputs, stream_mode="custom"): # (3)!
print(chunk)
- 获取流写入器以发送自定义数据。
- 发出一个自定义键值对(例如,进度更新)。
- 设置
stream_mode="custom"
以在流中接收自定义数据。
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""Query the database."""
writer = get_stream_writer() # (1)!
writer({"data": "Retrieved 0/100 records", "type": "progress"}) # (2)!
# perform query
writer({"data": "Retrieved 100/100 records", "type": "progress"}) # (3)!
return "some-answer"
graph = ... # define a graph that uses this tool
for chunk in graph.stream(inputs, stream_mode="custom"): # (4)!
print(chunk)
- 访问流写入器以发送自定义数据。
- 发出一个自定义键值对(例如,进度更新)。
- 发出另一个自定义键值对。
- 设置
stream_mode="custom"
以在流中接收自定义数据。
与任何 LLM 配合使用¶
您可以使用 stream_mode="custom"
从任何 LLM API 流式传输数据——即使该 API 没有实现 LangChain 聊天模型接口。
这使您可以集成原始的 LLM 客户端或提供自有流接口的外部服务,使 LangGraph 在自定义设置方面具有高度灵活性。
API 参考: get_stream_writer
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
"""Example node that calls an arbitrary model and streams the output"""
writer = get_stream_writer() # (1)!
# Assume you have a streaming client that yields chunks
for chunk in your_custom_streaming_client(state["topic"]): # (2)!
writer({"custom_llm_chunk": chunk}) # (3)!
return {"result": "completed"}
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
# Add other nodes and edges as needed
.compile()
)
for chunk in graph.stream(
{"topic": "cats"},
stream_mode="custom", # (4)!
):
# The chunk will contain the custom data streamed from the llm
print(chunk)
- 获取流写入器以发送自定义数据。
- 使用您的自定义流客户端生成 LLM 令牌。
- 使用写入器将自定义数据发送到流中。
- 设置
stream_mode="custom"
以在流中接收自定义数据。
扩展示例:流式传输任意聊天模型
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# this is our tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# this is the tool-calling graph node
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
让我们用一个包含工具调用的 AI 消息来调用该图
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
为特定聊天模型禁用流式传输¶
如果您的应用程序混合使用了支持流式传输的模型和不支持的模型,您可能需要为不支持流式传输的模型明确禁用它。
在初始化模型时设置 disable_streaming=True
。
Python < 3.11 的异步处理¶
在 Python < 3.11 版本中,asyncio 任务不支持 context
参数。
这限制了 LangGraph 自动传播上下文的能力,并以两种关键方式影响了 LangGraph 的流式传输机制
- 您必须将
RunnableConfig
显式传递到异步 LLM 调用中(例如,ainvoke()
),因为回调不会自动传播。 - 您不能在异步节点或工具中使用
get_stream_writer()
——您必须直接传递一个writer
参数。
扩展示例:使用手动配置的异步 LLM 调用
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model
llm = init_chat_model(model="openai:gpt-4o-mini")
class State(TypedDict):
topic: str
joke: str
async def call_model(state, config): # (1)!
topic = state["topic"]
print("Generating joke...")
joke_response = await llm.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config, # (2)!
)
return {"joke": joke_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
async for chunk, metadata in graph.astream(
{"topic": "ice cream"},
stream_mode="messages", # (3)!
):
if chunk.content:
print(chunk.content, end="|", flush=True)
- 在异步节点函数中接受
config
作为参数。 - 将
config
传递给llm.ainvoke()
以确保正确的上下文传播。 - 设置
stream_mode="messages"
以流式传输 LLM 令牌。
扩展示例:使用流写入器的异步自定义流式传输
from typing import TypedDict
from langgraph.types import StreamWriter
class State(TypedDict):
topic: str
joke: str
async def generate_joke(state: State, writer: StreamWriter): # (1)!
writer({"custom_key": "Streaming custom data while generating a joke"})
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge(START, "generate_joke")
.compile()
)
async for chunk in graph.astream(
{"topic": "ice cream"},
stream_mode="custom", # (2)!
):
print(chunk)
- 在异步节点或工具的函数签名中添加
writer
作为参数。LangGraph 将自动将流写入器传递给该函数。 - 设置
stream_mode="custom"
以在流中接收自定义数据。