如何从最终节点流式传输¶
图的一个常见模式是从仅最终节点内部流式传输 LLM 令牌。本指南演示了如何执行此操作。
定义模型和工具¶
首先,设置一个聊天模型和一个要在图中调用的工具
npm install @langchain/langgraph @langchain/anthropic @langchain/core
在 [1]
已复制!
import { z } from "zod";
import { tool } from "@langchain/core/tools";
import { ChatAnthropic } from "@langchain/anthropic";
const getWeather = tool(async ({ city }) => {
if (city === "nyc") {
return "It might be cloudy in nyc";
} else if (city === "sf") {
return "It's always sunny in sf";
} else {
throw new Error("Unknown city.");
}
}, {
name: "get_weather",
schema: z.object({
city: z.enum(["nyc", "sf"]),
}),
description: "Use this to get weather information",
});
const tools = [getWeather];
const model = new ChatAnthropic({
model: "claude-3-5-sonnet-20240620",
}).bindTools(tools);
// We add a tag that we'll be using later to filter outputs
const finalModel = new ChatAnthropic({
model: "claude-3-5-sonnet-20240620",
}).withConfig({
tags: ["final_node"],
});
import { z } from "zod"; import { tool } from "@langchain/core/tools"; import { ChatAnthropic } from "@langchain/anthropic"; const getWeather = tool(async ({ city }) => { if (city === "nyc") { return "It might be cloudy in nyc"; } else if (city === "sf") { return "It's always sunny in sf"; } else { throw new Error("Unknown city."); } }, { name: "get_weather", schema: z.object({ city: z.enum(["nyc", "sf"]), }), description: "Use this to get weather information", }); const tools = [getWeather]; const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620", }).bindTools(tools); // 我们添加一个稍后将用于过滤输出的标签 const finalModel = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620", }).withConfig({ tags: ["final_node"], });
定义图¶
现在,布局你的图
在 [2]
已复制!
import { StateGraph, MessagesAnnotation } from "@langchain/langgraph";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import { AIMessage, HumanMessage, SystemMessage } from "@langchain/core/messages";
const shouldContinue = async (state: typeof MessagesAnnotation.State) => {
const messages = state.messages;
const lastMessage: AIMessage = messages[messages.length - 1];
// If the LLM makes a tool call, then we route to the "tools" node
if (lastMessage.tool_calls?.length) {
return "tools";
}
// Otherwise, we stop (reply to the user)
return "final";
};
const callModel = async (state: typeof MessagesAnnotation.State) => {
const messages = state.messages;
const response = await model.invoke(messages);
// We return a list, because this will get added to the existing list
return { messages: [response] };
};
const callFinalModel = async (state: typeof MessagesAnnotation.State) => {
const messages = state.messages;
const lastAIMessage = messages[messages.length - 1];
const response = await finalModel.invoke([
new SystemMessage("Rewrite this in the voice of Al Roker"),
new HumanMessage({ content: lastAIMessage.content })
]);
// MessagesAnnotation allows you to overwrite messages from the agent
// by returning a message with the same id
response.id = lastAIMessage.id;
return { messages: [response] };
}
const toolNode = new ToolNode<typeof MessagesAnnotation.State>(tools);
const graph = new StateGraph(MessagesAnnotation)
.addNode("agent", callModel)
.addNode("tools", toolNode)
// add a separate final node
.addNode("final", callFinalModel)
.addEdge("__start__", "agent")
// Third parameter is optional and only here to draw a diagram of the graph
.addConditionalEdges("agent", shouldContinue, {
tools: "tools",
final: "final",
})
.addEdge("tools", "agent")
.addEdge("final", "__end__")
.compile();
import { StateGraph, MessagesAnnotation } from "@langchain/langgraph"; import { ToolNode } from "@langchain/langgraph/prebuilt"; import { AIMessage, HumanMessage, SystemMessage } from "@langchain/core/messages"; const shouldContinue = async (state: typeof MessagesAnnotation.State) => { const messages = state.messages; const lastMessage: AIMessage = messages[messages.length - 1]; // 如果 LLM 进行工具调用,则我们路由到“tools”节点 if (lastMessage.tool_calls?.length) { return "tools"; } // 否则,我们停止(回复用户) return "final"; }; const callModel = async (state: typeof MessagesAnnotation.State) => { const messages = state.messages; const response = await model.invoke(messages); // 我们返回一个列表,因为这将添加到现有列表中 return { messages: [response] }; }; const callFinalModel = async (state: typeof MessagesAnnotation.State) => { const messages = state.messages; const lastAIMessage = messages[messages.length - 1]; const response = await finalModel.invoke([ new SystemMessage("Rewrite this in the voice of Al Roker"), new HumanMessage({ content: lastAIMessage.content }) ]); // MessagesAnnotation 允许你通过返回具有相同 ID 的消息来覆盖代理的消息 // by returning a message with the same id response.id = lastAIMessage.id; return { messages: [response] }; } const toolNode = new ToolNode(tools); const graph = new StateGraph(MessagesAnnotation) .addNode("agent", callModel) .addNode("tools", toolNode) // 添加一个单独的最终节点 .addNode("final", callFinalModel) .addEdge("__start__", "agent") // 第三个参数是可选的,仅用于绘制图的图表 .addConditionalEdges("agent", shouldContinue, { tools: "tools", final: "final", }) .addEdge("tools", "agent") .addEdge("final", "__end__") .compile();
在 [3]
已复制!
import * as tslab from "tslab";
const diagram = graph.getGraph();
const image = await diagram.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
tslab.display.png(new Uint8Array(arrayBuffer));
import * as tslab from "tslab"; const diagram = graph.getGraph(); const image = await diagram.drawMermaidPng(); const arrayBuffer = await image.arrayBuffer(); tslab.display.png(new Uint8Array(arrayBuffer));
从最终节点流式传输输出¶
在 [4]
已复制!
const inputs = { messages: [new HumanMessage("What's the weather in nyc?")] };
const eventStream = await graph.streamEvents(inputs, { version: "v2"});
for await (const { event, tags, data } of eventStream) {
if (event === "on_chat_model_stream" && tags.includes("final_node")) {
if (data.chunk.content) {
// Empty content in the context of OpenAI or Anthropic usually means
// that the model is asking for a tool to be invoked.
// So we only print non-empty content
console.log(data.chunk.content, "|");
}
}
}
const inputs = { messages: [new HumanMessage("What's the weather in nyc?")] }; const eventStream = await graph.streamEvents(inputs, { version: "v2"}); for await (const { event, tags, data } of eventStream) { if (event === "on_chat_model_stream" && tags.includes("final_node")) { if (data.chunk.content) { // OpenAI 或 Anthropic 上下文中的空内容通常表示 // 模型正在请求调用工具。 // 所以我们只打印非空内容 console.log(data.chunk.content, "|"); } } }
Hey | there, folks | ! Al | Roker here with | your weather update. | Well | , well | , well, it seems | like | the | Big | Apple might | be getting | a little over | cast today. That | 's right | , we | 're | looking | at some | cloud cover moving in over | New | York City. But hey | , don't let that | dampen your spirits! | A | little clou | d never | hurt anybody | , | right? Now | , I | ' | d love | to give | you more | details, | but Mother | Nature can | be as | unpredictable as | a game | of chance sometimes | . So | , if | you want | the full | scoop on NYC | 's weather | or | if | you're | curious | about conditions | in any other city across | this | great nation of ours | , just give | me a ho | ller! I'm here | to keep | you in the know, | whether | it's sunshine | , | rain, or anything | in between. Remember | , a clou | dy day is | just | the | sun | 's | way of letting | you know it's still | there, even if you | can't see it. | Stay | weather | -aware | , | an | d don | 't forget your | umbrella... | just in case! |
在 [ ]
已复制!