跳到内容

如何流式传输自定义数据

先决条件

本指南假定您熟悉以下内容

从节点内部进行流式传输最常见的用例是流式传输 LLM 令牌,但您可能也希望流式传输自定义数据。

例如,如果您有一个长时间运行的工具调用,您可以在步骤之间调度自定义事件,并使用这些自定义事件来监控进度。您还可以将这些自定义事件呈现给应用程序的最终用户,以向他们展示当前任务的进展情况。

您可以通过两种方式执行此操作

  • 使用图的 .stream 方法和 streamMode: "custom"
  • 使用 dispatchCustomEventsstreamEvents 发射自定义事件。

下面我们将了解如何使用这两个 API。

设置

首先,让我们安装所需的软件包

npm install @langchain/langgraph @langchain/core

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 使您可以使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 阅读此处了解更多关于如何开始的信息。

使用 .stream 流式传输自定义数据

兼容性

本节需要 @langchain/langgraph>=0.2.20。有关升级帮助,请参阅本指南

定义图

import {
  StateGraph,
  MessagesAnnotation,
  LangGraphRunnableConfig,
} from "@langchain/langgraph";

const myNode = async (
  _state: typeof MessagesAnnotation.State,
  config: LangGraphRunnableConfig
) => {
  const chunks = [
    "Four",
    "score",
    "and",
    "seven",
    "years",
    "ago",
    "our",
    "fathers",
    "...",
  ];
  for (const chunk of chunks) {
    // write the chunk to be streamed using streamMode=custom
    // Only populated if one of the passed stream modes is "custom".
    config.writer?.(chunk);
  }
  return {
    messages: [{
      role: "assistant",
      content: chunks.join(" "),
    }],
  };
};

const graph = new StateGraph(MessagesAnnotation)
  .addNode("model", myNode)
  .addEdge("__start__", "model")
  .compile();

流式传输内容

const inputs = [{
  role: "user",
  content: "What are you thinking about?",
}];

const stream = await graph.stream(
  { messages: inputs },
  { streamMode: "custom" }
);

for await (const chunk of stream) {
  console.log(chunk);
}
Four
score
and
seven
years
ago
our
fathers
...
您可能需要使用多种流式传输模式,因为您将需要访问自定义数据和状态更新。

const streamMultiple = await graph.stream(
  { messages: inputs },
  { streamMode: ["custom", "updates"] }
);

for await (const chunk of streamMultiple) {
  console.log(chunk);
}
[ 'custom', 'Four' ]
[ 'custom', 'score' ]
[ 'custom', 'and' ]
[ 'custom', 'seven' ]
[ 'custom', 'years' ]
[ 'custom', 'ago' ]
[ 'custom', 'our' ]
[ 'custom', 'fathers' ]
[ 'custom', '...' ]
[ 'updates', { model: { messages: [Array] } } ]

使用 .streamEvents 流式传输自定义数据

如果您已经在工作流程中使用图的 .streamEvents 方法,您还可以通过使用 dispatchCustomEvents 发射自定义事件来流式传输自定义数据

定义图

import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";

const graphNode = async (_state: typeof MessagesAnnotation.State) => {
  const chunks = [
    "Four",
    "score",
    "and",
    "seven",
    "years",
    "ago",
    "our",
    "fathers",
    "...",
  ];
  for (const chunk of chunks) {
    await dispatchCustomEvent("my_custom_event", { chunk });
  }
  return {
    messages: [{
      role: "assistant",
      content: chunks.join(" "),
    }],
  };
};

const graphWithDispatch = new StateGraph(MessagesAnnotation)
  .addNode("model", graphNode)
  .addEdge("__start__", "model")
  .compile();

流式传输内容

const eventStream = await graphWithDispatch.streamEvents(
  {
    messages: [{
      role: "user",
      content: "What are you thinking about?",
    }]
  },
  {
    version: "v2",
  },
);

for await (const { event, name, data } of eventStream) {
  if (event === "on_custom_event" && name === "my_custom_event") {
    console.log(`${data.chunk}|`);
  }
}
Four|
score|
and|
seven|
years|
ago|
our|
fathers|
...|