跳过到内容

流式

LangGraph 构建了对流式的原生支持。有多种不同的方式可以从图运行中流式返回输出

流式输出图 (.stream.astream)

.stream.astream 是同步和异步方法,用于从图运行中流式返回输出。在调用这些方法时,可以指定多种不同的模式(例如 `graph.stream(..., mode="..."))

  • "values": 这会在图的每一步之后流式传输状态的完整值。
  • "updates": 这会在图的每一步之后流式传输状态的更新。如果在同一步骤中进行了多个更新(例如运行多个节点),则这些更新将单独流式传输。
  • "debug": 这在整个图执行过程中流式传输尽可能多的信息。

下面的可视化展示了 valuesupdates 模式之间的区别

values vs updates

流式 LLM 标记和事件 (.astream_events)

此外,可以使用 astream_events 方法流式传输节点内部发生的事件。这对于 流式传输 LLM 调用的标记非常有用。

这是所有 LangChain 对象的标准方法。这意味着当图执行时,某些事件会沿途发出,如果你使用 .astream_events 运行图,你就可以看到这些事件。

所有事件都具有(除其他外)eventnamedata 字段。这些是什么意思?

  • event: 这是正在发出的事件类型。你可以在 这里找到所有回调事件和触发器的详细表格。
  • name: 这是事件的名称。
  • data: 这是与事件相关联的数据。

什么类型的事件会导致事件被发出?

  • 每个节点(可运行的)都会在开始执行时发出 on_chain_start,在节点执行期间发出 on_chain_stream,并在节点完成时发出 on_chain_end。节点事件将具有事件的 name 字段中的节点名称
  • 该图将在图执行开始时发出 on_chain_start,在每个节点执行后发出 on_chain_stream,并在图完成时发出 on_chain_end。图事件将在事件的 name 字段中具有 LangGraph
  • 对状态通道的任何写入(即任何时候更新其中一个状态键的值)都会发出 on_chain_starton_chain_end 事件

此外,在你的节点内创建的任何事件(LLM 事件、工具事件、手动发出的事件等)也将在 .astream_events 的输出中可见。

为了使这个更具体,并看看它的样子,让我们看看在运行一个简单的图时返回了哪些事件

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END

model = ChatOpenAI(model="gpt-4o-mini")


def call_model(state: MessagesState):
    response = model.invoke(state['messages'])
    return {"messages": response}

workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()

inputs = [{"role": "user", "content": "hi!"}]
async for event in app.astream_events({"messages": inputs}, version="v1"):
    kind = event["event"]
    print(f"{kind}: {event['name']}")
on_chain_start: LangGraph
on_chain_start: __start__
on_chain_end: __start__
on_chain_start: call_model
on_chat_model_start: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_end: ChatOpenAI
on_chain_start: ChannelWrite<call_model,messages>
on_chain_end: ChannelWrite<call_model,messages>
on_chain_stream: call_model
on_chain_end: call_model
on_chain_stream: LangGraph
on_chain_end: LangGraph

我们从整体图开始(on_chain_start: LangGraph)。然后我们写入 __start__ 节点(这是一个处理输入的特殊节点)。然后我们启动 call_model 节点(on_chain_start: call_model)。然后我们启动聊天模型调用(on_chat_model_start: ChatOpenAI),按标记流式返回(on_chat_model_stream: ChatOpenAI),然后完成聊天模型(on_chat_model_end: ChatOpenAI)。从那里,我们将结果写回通道(ChannelWrite<call_model,messages>),然后完成 call_model 节点,最后完成整个图。

这应该可以让你很好地了解在一个简单的图中发出的事件。但是这些事件包含什么数据?每种类型的事件都包含不同格式的数据。让我们看看 on_chat_model_stream 事件是什么样的。这是一种重要的事件类型,因为它需要从 LLM 响应中流式传输标记。

这些事件看起来像

{'event': 'on_chat_model_stream',
 'name': 'ChatOpenAI',
 'run_id': '3fdbf494-acce-402e-9b50-4eab46403859',
 'tags': ['seq:step:1'],
 'metadata': {'langgraph_step': 1,
  'langgraph_node': 'call_model',
  'langgraph_triggers': ['start:call_model'],
  'langgraph_task_idx': 0,
  'checkpoint_id': '1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',
  'checkpoint_ns': 'call_model',
  'ls_provider': 'openai',
  'ls_model_name': 'gpt-4o-mini',
  'ls_model_type': 'chat',
  'ls_temperature': 0.7},
 'data': {'chunk': AIMessageChunk(content='Hello', id='run-3fdbf494-acce-402e-9b50-4eab46403859')},
 'parent_ids': []}
我们可以看到我们有事件类型和名称(我们之前已经知道这些了)。

我们还在元数据中有一堆东西。值得注意的是,'langgraph_node': 'call_model', 是一些非常有用的信息,它告诉我们这个模型是在哪个节点内调用的。

最后,data 是一个非常重要的字段。这包含了此事件的实际数据!在这种情况下,它是一个 AIMessageChunk。它包含消息的 content 以及一个 id。这是整个 AIMessage 的 ID(不仅仅是这个片段),非常有用 - 它可以帮助我们跟踪哪些片段是同一个消息的一部分(这样我们就可以在 UI 中将它们显示在一起)。

此信息包含创建流式 LLM 标记 UI 所需的一切。你可以在 这里看到一个指南。

PYTHON<=3.10 中的 ASYNC

在 Python <= 3.10 中使用 .astream_events 时,你可能看不到从节点内部发出的事件。如果你在你的节点内异步使用 Langchain RunnableLambda、RunnableGenerator 或 Tool,你将不得不手动将回调传播到这些对象。这是因为在这种情况下,LangChain 无法自动将回调传播到子对象。请参阅 这里这里 的示例。

评论