跳到内容

流式处理

LangGraph 内置了对流式处理的一流支持。有几种不同的方式可以从图运行中流式传回输出。

流式处理图输出 (.stream)

.stream 是一种用于从图运行中流式传回输出的异步方法。调用这些方法时,您可以指定几种不同的模式(例如 await graph.stream(..., { ...config, streamMode: "values" }))

  • "values": 这会在图的每个步骤后流式传输状态的完整值。
  • "updates": 这会在图的每个步骤后流式传输状态的更新。如果在同一步骤中进行了多次更新(例如运行了多个节点),则这些更新会分开流式传输。
  • "custom": 这会流式传输图节点内部的自定义数据。
  • "messages": 这会流式传输调用 LLM 的图节点的 LLM 令牌和元数据。
  • "debug": 这会在整个图执行过程中流式传输尽可能多的信息。

下面的可视化展示了 valuesupdates 模式之间的区别。

values vs updates

流式处理 LLM 令牌和事件 (.streamEvents)

此外,您可以使用 streamEvents 方法流式传回在节点内部发生的事件。这对于流式处理 LLM 调用的令牌很有用。

这是所有 LangChain 对象的标准方法。这意味着在执行图时,会沿途发出某些事件,如果您使用 .streamEvents 运行图,则可以看到这些事件。

所有事件都包含(除其他外)eventnamedata 字段。这些是什么意思?

  • event: 这是正在发出的事件类型。您可以在此处找到所有回调事件和触发器的详细表格。
  • name: 这是事件的名称。
  • data: 这是与事件相关的数据。

哪些类型的事物会导致事件发出?

  • 每个节点(可运行对象)在开始执行时发出 on_chain_start 事件,在节点执行期间发出 on_chain_stream 事件,在节点完成时发出 on_chain_end 事件。节点事件的 name 字段中将包含节点名称。
  • 图将在图执行开始时发出 on_chain_start 事件,在每个节点执行后发出 on_chain_stream 事件,并在图完成时发出 on_chain_end 事件。图事件的 name 字段中将包含 LangGraph
  • 对状态通道的任何写入(即任何时候更新状态键的值)都将发出 on_chain_starton_chain_end 事件。

此外,节点内部创建的任何事件(LLM 事件、工具事件、手动发出的事件等)也将在 .streamEvents 的输出中可见。

为了更具体地说明并看看这看起来如何,让我们看看运行一个简单图时返回的事件。

import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, MessagesAnnotation } from "langgraph";

const model = new ChatOpenAI({ model: "gpt-4-turbo-preview" });

function callModel(state: typeof MessagesAnnotation.State) {
  const response = model.invoke(state.messages);
  return { messages: response };
}

const workflow = new StateGraph(MessagesAnnotation)
  .addNode("callModel", callModel)
  .addEdge("start", "callModel")
  .addEdge("callModel", "end");
const app = workflow.compile();

const inputs = [{ role: "user", content: "hi!" }];

for await (const event of app.streamEvents(
  { messages: inputs },
  { version: "v2" }
)) {
  const kind = event.event;
  console.log(`${kind}: ${event.name}`);
}
on_chain_start: LangGraph
on_chain_start: __start__
on_chain_end: __start__
on_chain_start: callModel
on_chat_model_start: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_end: ChatOpenAI
on_chain_start: ChannelWrite<callModel,messages>
on_chain_end: ChannelWrite<callModel,messages>
on_chain_stream: callModel
on_chain_end: callModel
on_chain_stream: LangGraph
on_chain_end: LangGraph

我们从整体图开始 (on_chain_start: LangGraph)。然后我们写入 __start__ 节点(这是一个处理输入的特殊节点)。然后我们开始 callModel 节点 (on_chain_start: callModel)。然后我们开始聊天模型调用 (on_chat_model_start: ChatOpenAI),逐个令牌流式传回 (on_chat_model_stream: ChatOpenAI),然后完成聊天模型 (on_chat_model_end: ChatOpenAI)。从那里,我们将结果写回通道 (ChannelWrite),然后完成 callModel 节点,最后完成整个图。

希望这能让您很好地了解在简单图中发出的事件。但是这些事件包含什么数据呢?每种类型的事件都包含不同格式的数据。让我们看看 on_chat_model_stream 事件是什么样的。这是一种重要的事件类型,因为它用于流式处理来自 LLM 响应的令牌。

这些事件看起来像

{'event': 'on_chat_model_stream',
 'name': 'ChatOpenAI',
 'run_id': '3fdbf494-acce-402e-9b50-4eab46403859',
 'tags': ['seq:step:1'],
 'metadata': {'langgraph_step': 1,
  'langgraph_node': 'callModel',
  'langgraph_triggers': ['start:callModel'],
  'langgraph_task_idx': 0,
  'checkpoint_id': '1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',
  'checkpoint_ns': 'callModel',
  'ls_provider': 'openai',
  'ls_model_name': 'gpt-4o-mini',
  'ls_model_type': 'chat',
  'ls_temperature': 0.7},
 'data': {'chunk': AIMessageChunk({ content: 'Hello', id: 'run-3fdbf494-acce-402e-9b50-4eab46403859' })},
 'parent_ids': []}

我们可以看到我们有事件类型和名称(这我们之前已经知道)。

我们在元数据中也有很多东西。值得注意的是,'langgraph_node': 'callModel', 是一些非常有用的信息,它告诉我们这个模型是在哪个节点内部被调用的。

最后,data 是一个非常重要的字段。它包含此事件的实际数据!在这种情况下是一个 AIMessageChunk。它包含消息的 content,以及一个 id。这是整个 AIMessage 的 ID(不仅仅是这个块),并且非常有用——它帮助我们跟踪哪些块属于同一条消息(这样我们就可以在 UI 中将它们一起显示)。

这些信息包含了创建用于流式处理 LLM 令牌的 UI 所需的一切。