如何流式传输自定义数据¶
从节点内部进行流式传输最常见的用例是流式传输 LLM 标记,但您可能还想流式传输自定义数据。
例如,如果您有一个长时间运行的工具调用,您可以在步骤之间调度自定义事件,并使用这些自定义事件来监控进度。您还可以将这些自定义事件显示给应用程序的最终用户,以向他们展示当前任务的进展情况。
您可以通过两种方式执行此操作
- 使用图的
.stream
/.astream
方法,其中stream_mode="custom"
- 使用 adispatch_custom_events 发射自定义事件。
下面我们将看到如何使用这两个 API。
设置¶
首先,让我们安装所需的包
在 [1]
已复制!
%%capture --no-stderr
%pip install -U langgraph
%%capture --no-stderr %pip install -U langgraph
使用 .stream / .astream
流式传输自定义数据¶
定义图¶
在 [2]
已复制!
from langchain_core.messages import AIMessage
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.types import StreamWriter
async def my_node(
state: MessagesState,
writer: StreamWriter, # <-- provide StreamWriter to write chunks to be streamed
):
chunks = [
"Four",
"score",
"and",
"seven",
"years",
"ago",
"our",
"fathers",
"...",
]
for chunk in chunks:
# write the chunk to be streamed using stream_mode=custom
writer(chunk)
return {"messages": [AIMessage(content=" ".join(chunks))]}
# Define a new graph
workflow = StateGraph(MessagesState)
workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
app = workflow.compile()
from langchain_core.messages import AIMessage from langgraph.graph import START, StateGraph, MessagesState, END from langgraph.types import StreamWriter async def my_node( state: MessagesState, writer: StreamWriter, # <-- 提供 StreamWriter 来写入要流式传输的块 ): chunks = [ "Four", "score", "and", "seven", "years", "ago", "our", "fathers", "...", ] for chunk in chunks: # 使用 stream_mode=custom 将块写入要流式传输的内容 writer(chunk) return {"messages": [AIMessage(content=" ".join(chunks))]} # 定义一个新的图 workflow = StateGraph(MessagesState) workflow.add_node("model", my_node) workflow.add_edge(START, "model") workflow.add_edge("model", END) app = workflow.compile()
流式传输内容¶
在 [5]
已复制!
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="What are you thinking about?")]
async for chunk in app.astream({"messages": inputs}, stream_mode="custom"):
print(chunk, flush=True)
from langchain_core.messages import HumanMessage inputs = [HumanMessage(content="What are you thinking about?")] async for chunk in app.astream({"messages": inputs}, stream_mode="custom"): print(chunk, flush=True)
Four score and seven years ago our fathers ...
您可能需要使用 多个流式传输模式,因为您将需要同时访问自定义数据和状态更新。
在 [6]
已复制!
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="What are you thinking about?")]
async for chunk in app.astream({"messages": inputs}, stream_mode=["custom", "updates"]):
print(chunk, flush=True)
from langchain_core.messages import HumanMessage inputs = [HumanMessage(content="What are you thinking about?")] async for chunk in app.astream({"messages": inputs}, stream_mode=["custom", "updates"]): print(chunk, flush=True)
('custom', 'Four') ('custom', 'score') ('custom', 'and') ('custom', 'seven') ('custom', 'years') ('custom', 'ago') ('custom', 'our') ('custom', 'fathers') ('custom', '...') ('updates', {'model': {'messages': [AIMessage(content='Four score and seven years ago our fathers ...', additional_kwargs={}, response_metadata={})]}})
使用 .astream_events
流式传输自定义数据¶
如果您已经在工作流中使用图的 .astream_events
方法,那么您也可以通过使用 adispatch_custom_event
发射自定义事件来流式传输自定义数据
PYTHON<=3.10 中的 ASYNC
如果您在 python<=3.10 中运行异步代码,LangChain 无法自动将配置(包括 astream_events()
所需的回调)传播到子可运行项。这是您可能无法看到从自定义可运行项或工具发射事件的常见原因。
如果您运行的是 python<=3.10,则需要在异步环境中将 RunnableConfig
对象手动传播到子可运行项。有关如何手动传播配置的示例,请参阅下面使用 adispatch_custom_event
的节点的实现。
如果您运行的是 python>=3.11,则 RunnableConfig
将自动传播到异步环境中的子可运行项。但是,如果您的代码可能会在其他 Python 版本中运行,那么手动传播 RunnableConfig
仍然是一个好主意。
定义图¶
在 [19]
已复制!
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.callbacks.manager import adispatch_custom_event
async def my_node(state: MessagesState, config: RunnableConfig):
chunks = [
"Four",
"score",
"and",
"seven",
"years",
"ago",
"our",
"fathers",
"...",
]
for chunk in chunks:
await adispatch_custom_event(
"my_custom_event",
{"chunk": chunk},
config=config, # <-- propagate config
)
return {"messages": [AIMessage(content=" ".join(chunks))]}
# Define a new graph
workflow = StateGraph(MessagesState)
workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
app = workflow.compile()
from langchain_core.runnables import RunnableConfig, RunnableLambda from langchain_core.callbacks.manager import adispatch_custom_event async def my_node(state: MessagesState, config: RunnableConfig): chunks = [ "Four", "score", "and", "seven", "years", "ago", "our", "fathers", "...", ] for chunk in chunks: await adispatch_custom_event( "my_custom_event", {"chunk": chunk}, config=config, # <-- 传播配置 ) return {"messages": [AIMessage(content=" ".join(chunks))]} # 定义一个新的图 workflow = StateGraph(MessagesState) workflow.add_node("model", my_node) workflow.add_edge(START, "model") workflow.add_edge("model", END) app = workflow.compile()
流式传输内容¶
在 [20]
已复制!
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="What are you thinking about?")]
async for event in app.astream_events({"messages": inputs}, version="v2"):
tags = event.get("tags", [])
if event["event"] == "on_custom_event" and event["name"] == "my_custom_event":
data = event["data"]
if data:
print(data["chunk"], end="|", flush=True)
from langchain_core.messages import HumanMessage inputs = [HumanMessage(content="What are you thinking about?")] async for event in app.astream_events({"messages": inputs}, version="v2"): tags = event.get("tags", []) if event["event"] == "on_custom_event" and event["name"] == "my_custom_event": data = event["data"] if data: print(data["chunk"], end="|", flush=True)
Four|score|and|seven|years|ago|our|fathers|...|