跳到内容

如何从你的图(graph)中流式传输 LLM tokens

在本示例中,我们将从驱动代理的语言模型中流式传输 tokens。我们将使用 ReAct agent 作为示例。

注意

如果您使用的是 @langchain/core < 0.2.3 版本,当调用聊天模型或 LLM 时,您需要在节点内调用 await model.stream() 以获取逐个 token 的流式传输事件,并在需要时聚合最终输出以更新图状态。在更高版本的 @langchain/core 中,这会自动发生,您可以调用 await model.invoke()
有关如何升级 @langchain/core 的更多信息,请查看此处的说明

本操作指南紧密遵循本目录中的其他指南,展示了如何将此功能整合到 LangGraph 中的原型代理中。

流式传输支持

许多聊天模型都支持 token 流式传输,但并非所有都支持。请查看您的 LLM 集成是否支持 token 流式传输 此处(文档)。请注意,某些集成可能支持通用 token 流式传输,但不支持流式传输工具调用。

注意

在本操作指南中,我们将从头开始创建我们的代理,以使其透明(但详细)。您可以使用 createReactAgent({ llm, tools }) (API 文档) 构造器完成类似的功能。如果您习惯使用 LangChain 的 AgentExecutor 类,这可能更合适。

设置

本指南将使用 OpenAI 的 GPT-4o 模型。我们将选择性地为 LangSmith tracing 设置我们的 API 密钥,这将为我们提供一流的可观察性。


// process.env.OPENAI_API_KEY = "sk_...";

// Optional, add tracing in LangSmith
// process.env.LANGCHAIN_API_KEY = "ls__...";
// process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "true";
// process.env.LANGCHAIN_TRACING = "true";
// process.env.LANGCHAIN_PROJECT = "Stream Tokens: LangGraphJS";

定义状态

状态是我们图(graph)中所有节点的接口。

import { Annotation } from "@langchain/langgraph";
import type { BaseMessageLike } from "@langchain/core/messages";

const StateAnnotation = Annotation.Root({
  messages: Annotation<BaseMessageLike[]>({
    reducer: (x, y) => x.concat(y),
  }),
});

设置工具

首先定义您要使用的工具。对于这个简单的例子,我们将创建一个占位符搜索引擎,但请参阅 此处的文档,了解如何创建您自己的自定义工具。

import { tool } from "@langchain/core/tools";
import { z } from "zod";

const searchTool = tool((_) => {
  // This is a placeholder for the actual implementation
  return "Cold, with a low of 3℃";
}, {
  name: "search",
  description:
    "Use to surf the web, fetch current information, check the weather, and retrieve other information.",
  schema: z.object({
    query: z.string().describe("The query to use in your search."),
  }),
});

await searchTool.invoke({ query: "What's the weather like?" });

const tools = [searchTool];

现在我们可以将这些工具包装在预构建的 ToolNode 中。每当我们的 LLM 调用它们时,此对象将实际运行这些工具(函数)。

import { ToolNode } from "@langchain/langgraph/prebuilt";

const toolNode = new ToolNode(tools);

设置模型

现在加载聊天模型

  1. 它应该与消息一起工作。我们将以消息的形式表示所有代理状态,因此它需要能够很好地与它们一起工作。
  2. 它应该与工具调用一起工作,这意味着它可以在其响应中返回函数参数。

注意

这些模型要求不是使用 LangGraph 的通用要求,它们只是这个示例的要求。

import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  temperature: 0,
});

完成此操作后,我们应该确保模型知道它可以调用这些工具。我们可以通过调用 bindTools 来做到这一点。

const boundModel = model.bindTools(tools);

定义图(graph)

现在我们可以将所有内容放在一起了。

import { StateGraph, END } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";

const routeMessage = (state: typeof StateAnnotation.State) => {
  const { messages } = state;
  const lastMessage = messages[messages.length - 1] as AIMessage;
  // If no tools are called, we can finish (respond to the user)
  if (!lastMessage?.tool_calls?.length) {
    return END;
  }
  // Otherwise if there is, we continue and call the tools
  return "tools";
};

const callModel = async (
  state: typeof StateAnnotation.State,
) => {
  // For versions of @langchain/core < 0.2.3, you must call `.stream()`
  // and aggregate the message from chunks instead of calling `.invoke()`.
  const { messages } = state;
  const responseMessage = await boundModel.invoke(messages);
  return { messages: [responseMessage] };
};

const workflow = new StateGraph(StateAnnotation)
  .addNode("agent", callModel)
  .addNode("tools", toolNode)
  .addEdge("__start__", "agent")
  .addConditionalEdges("agent", routeMessage)
  .addEdge("tools", "agent");

const agent = workflow.compile();
import * as tslab from "tslab";

const runnableGraph = agent.getGraph();
const image = await runnableGraph.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();

await tslab.display.png(new Uint8Array(arrayBuffer));

流式传输 LLM Tokens

您可以使用两种方法访问每个节点生成的 LLM tokens

  • `stream` 方法以及 streamMode: "messages"
  • `streamEvents` 方法

`stream` 方法

兼容性

本节需要 @langchain/langgraph>=0.2.20。如需升级帮助,请参阅 本指南

对于此方法,您必须使用也支持流式传输的 LLM(例如 new ChatOpenAI({ model: "gpt-4o-mini" }))或在内部 LLM 调用上调用 .stream

import { isAIMessageChunk } from "@langchain/core/messages";

const stream = await agent.stream(
  { messages: [{ role: "user", content: "What's the current weather in Nepal?" }] },
  { streamMode: "messages" },
);

for await (const [message, _metadata] of stream) {
  if (isAIMessageChunk(message) && message.tool_call_chunks?.length) {
    console.log(`${message.getType()} MESSAGE TOOL CALL CHUNK: ${message.tool_call_chunks[0].args}`);
  } else {
    console.log(`${message.getType()} MESSAGE CONTENT: ${message.content}`);
  }
}
ai MESSAGE TOOL CALL CHUNK: 
ai MESSAGE TOOL CALL CHUNK: {"
ai MESSAGE TOOL CALL CHUNK: query
ai MESSAGE TOOL CALL CHUNK: ":"
ai MESSAGE TOOL CALL CHUNK: current
ai MESSAGE TOOL CALL CHUNK:  weather
ai MESSAGE TOOL CALL CHUNK:  in
ai MESSAGE TOOL CALL CHUNK:  Nepal
ai MESSAGE TOOL CALL CHUNK: "}
ai MESSAGE CONTENT: 
tool MESSAGE CONTENT: Cold, with a low of 3℃
ai MESSAGE CONTENT: 
ai MESSAGE CONTENT: The
ai MESSAGE CONTENT:  current
ai MESSAGE CONTENT:  weather
ai MESSAGE CONTENT:  in
ai MESSAGE CONTENT:  Nepal
ai MESSAGE CONTENT:  is
ai MESSAGE CONTENT:  cold
ai MESSAGE CONTENT: ,
ai MESSAGE CONTENT:  with
ai MESSAGE CONTENT:  a
ai MESSAGE CONTENT:  low
ai MESSAGE CONTENT:  temperature
ai MESSAGE CONTENT:  of
ai MESSAGE CONTENT:  
ai MESSAGE CONTENT: 3
ai MESSAGE CONTENT: ℃
ai MESSAGE CONTENT: .
ai MESSAGE CONTENT:

禁用流式传输

如果您希望为给定的节点或模型调用禁用流式传输,您可以添加一个 "nostream" 标签。这是一个示例,我们在其中添加了一个初始节点,该节点具有一个 LLM 调用,该调用不会在最终输出中进行流式传输

import { RunnableLambda } from "@langchain/core/runnables";

const unstreamed = async (_: typeof StateAnnotation.State) => {
  const model = new ChatOpenAI({
    model: "gpt-4o-mini",
    temperature: 0,
  });
  const res = await model.invoke("How are you?");
  console.log("LOGGED UNSTREAMED MESSAGE", res.content);
  // Don't update the state, this is just to show a call that won't be streamed
  return {};
}

const agentWithNoStream = new StateGraph(StateAnnotation)
  .addNode("unstreamed",
    // Add a "nostream" tag to the entire node
    RunnableLambda.from(unstreamed).withConfig({
      tags: ["nostream"]
    })
  )
  .addNode("agent", callModel)
  .addNode("tools", toolNode)
  // Run the unstreamed node before the agent
  .addEdge("__start__", "unstreamed")
  .addEdge("unstreamed", "agent")
  .addConditionalEdges("agent", routeMessage)
  .addEdge("tools", "agent")
  .compile();

const stream = await agentWithNoStream.stream(
  { messages: [{ role: "user", content: "What's the current weather in Nepal?" }] },
  { streamMode: "messages" },
);

for await (const [message, _metadata] of stream) {
  if (isAIMessageChunk(message) && message.tool_call_chunks?.length) {
    console.log(`${message.getType()} MESSAGE TOOL CALL CHUNK: ${message.tool_call_chunks[0].args}`);
  } else {
    console.log(`${message.getType()} MESSAGE CONTENT: ${message.content}`);
  }
}
LOGGED UNSTREAMED MESSAGE I'm just a computer program, so I don't have feelings, but I'm here and ready to help you! How can I assist you today?
ai MESSAGE TOOL CALL CHUNK: 
ai MESSAGE TOOL CALL CHUNK: {"
ai MESSAGE TOOL CALL CHUNK: query
ai MESSAGE TOOL CALL CHUNK: ":"
ai MESSAGE TOOL CALL CHUNK: current
ai MESSAGE TOOL CALL CHUNK:  weather
ai MESSAGE TOOL CALL CHUNK:  in
ai MESSAGE TOOL CALL CHUNK:  Nepal
ai MESSAGE TOOL CALL CHUNK: "}
ai MESSAGE CONTENT: 
tool MESSAGE CONTENT: Cold, with a low of 3℃
ai MESSAGE CONTENT: 
ai MESSAGE CONTENT: The
ai MESSAGE CONTENT:  current
ai MESSAGE CONTENT:  weather
ai MESSAGE CONTENT:  in
ai MESSAGE CONTENT:  Nepal
ai MESSAGE CONTENT:  is
ai MESSAGE CONTENT:  cold
ai MESSAGE CONTENT: ,
ai MESSAGE CONTENT:  with
ai MESSAGE CONTENT:  a
ai MESSAGE CONTENT:  low
ai MESSAGE CONTENT:  temperature
ai MESSAGE CONTENT:  of
ai MESSAGE CONTENT:  
ai MESSAGE CONTENT: 3
ai MESSAGE CONTENT: ℃
ai MESSAGE CONTENT: .
ai MESSAGE CONTENT:
如果您从 "unstreamed" 节点中删除标签,则模型调用 within 的结果也将在最终流中。

`streamEvents` 方法

您也可以像这样使用 streamEvents 方法

const eventStream = await agent.streamEvents(
  { messages: [{ role: "user", content: "What's the weather like today?" }] },
  {
    version: "v2",
  }
);

for await (const { event, data } of eventStream) {
  if (event === "on_chat_model_stream" && isAIMessageChunk(data.chunk)) {
    if (data.chunk.tool_call_chunks !== undefined && data.chunk.tool_call_chunks.length > 0) {
      console.log(data.chunk.tool_call_chunks);
    }
  }
}
[
  {
    name: 'search',
    args: '',
    id: 'call_Qpd6frHt0yUYWynRbZEXF3le',
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: '{"',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: 'query',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: '":"',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: 'current',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: ' weather',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: ' today',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]
[
  {
    name: undefined,
    args: '"}',
    id: undefined,
    index: 0,
    type: 'tool_call_chunk'
  }
]