跳到内容

如何流式传输 LLM tokens (不使用 LangChain 模型)

在本指南中,我们将流式传输为代理提供支持的语言模型中的 tokens,而无需使用 LangChain 聊天模型。我们将以直接在 ReAct 代理中使用 OpenAI 客户端库为例。

设置

首先,分别安装 openailanggraph

$ npm install openai @langchain/langgraph @langchain/core

兼容性

本指南需要 @langchain/core>=0.2.19,如果您使用 LangSmith,则需要 langsmith>=0.1.39。如需升级方面的帮助,请参阅此指南

您还需要确保已将您的 OpenAI 密钥设置为 process.env.OPENAI_API_KEY

定义模型和工具模式

首先,初始化 OpenAI SDK 并定义一个工具模式,供模型使用 OpenAI 的格式填充

import OpenAI from "openai";

const openaiClient = new OpenAI({});

const toolSchema: OpenAI.ChatCompletionTool = {
  type: "function",
  function: {
    name: "get_items",
    description: "Use this tool to look up which items are in the given place.",
    parameters: {
      type: "object",
      properties: {
        place: {
          type: "string",
        },
      },
      required: ["place"],
    }
  }
};

调用模型

现在,为 LangGraph 节点定义一个调用模型的方法。它将处理与模型的工具调用格式化,以及通过自定义回调事件进行流式传输。

如果您正在使用 LangSmith,您也可以包装 OpenAI 客户端,以获得与 LangChain 聊天模型相同的良好追踪效果。

如下所示

import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
import { wrapOpenAI } from "langsmith/wrappers/openai";
import { Annotation } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  messages: Annotation<OpenAI.ChatCompletionMessageParam[]>({
    reducer: (x, y) => x.concat(y),
  }),
});

// If using LangSmith, use "wrapOpenAI" on the whole client or
// "traceable" to wrap a single method for nicer tracing:
// https://langsmith.langchain.ac.cn/how_to_guides/tracing/annotate_code
const wrappedClient = wrapOpenAI(openaiClient);

const callModel = async (state: typeof StateAnnotation.State) => {
  const { messages } = state;
  const stream = await wrappedClient.chat.completions.create({
    messages,
    model: "gpt-4o-mini",
    tools: [toolSchema],
    stream: true,
  });
  let responseContent = "";
  let role: string = "assistant";
  let toolCallId: string | undefined;
  let toolCallName: string | undefined;
  let toolCallArgs = "";
  for await (const chunk of stream) {
    const delta = chunk.choices[0].delta;
    if (delta.role !== undefined) {
      role = delta.role;
    }
    if (delta.content) {
      responseContent += delta.content;
      await dispatchCustomEvent("streamed_token", {
        content: delta.content,
      });
    }
    if (delta.tool_calls !== undefined && delta.tool_calls.length > 0) {
      // note: for simplicity we're only handling a single tool call here
      const toolCall = delta.tool_calls[0];
      if (toolCall.function?.name !== undefined) {
        toolCallName = toolCall.function.name;
      }
      if (toolCall.id !== undefined) {
        toolCallId = toolCall.id;
      }
      await dispatchCustomEvent("streamed_tool_call_chunk", toolCall);
      toolCallArgs += toolCall.function?.arguments ?? "";
    }
  }
  let finalToolCalls;
  if (toolCallName !== undefined && toolCallId !== undefined) {
    finalToolCalls = [{
      id: toolCallId,
      function: {
        name: toolCallName,
        arguments: toolCallArgs
      },
      type: "function" as const,
    }];
  }

  const responseMessage = {
    role: role as any,
    content: responseContent,
    tool_calls: finalToolCalls,
  };
  return { messages: [responseMessage] };
}

请注意,您不能在 LangGraph 节点之外调用此方法,因为如果在正确的上下文之外调用 dispatchCustomEvent 将会失败。

定义工具和工具调用节点

接下来,设置实际的工具函数和节点,当模型填充工具调用时,该节点将调用该函数

const getItems = async ({ place }: { place: string }) => {
  if (place.toLowerCase().includes("bed")) {  // For under the bed
    return "socks, shoes and dust bunnies";
  } else if (place.toLowerCase().includes("shelf")) {  // For 'shelf'
    return "books, pencils and pictures";
  } else {  // if the agent decides to ask about a different place
    return "cat snacks";
  }
};

const callTools = async (state: typeof StateAnnotation.State) => {
  const { messages } = state;
  const mostRecentMessage = messages[messages.length - 1];
  const toolCalls = (mostRecentMessage as OpenAI.ChatCompletionAssistantMessageParam).tool_calls;
  if (toolCalls === undefined || toolCalls.length === 0) {
    throw new Error("No tool calls passed to node.");
  }
  const toolNameMap = {
    get_items: getItems,
  };
  const functionName = toolCalls[0].function.name;
  const functionArguments = JSON.parse(toolCalls[0].function.arguments);
  const response = await toolNameMap[functionName](functionArguments);
  const toolMessage = {
    tool_call_id: toolCalls[0].id,
    role: "tool" as const,
    name: functionName,
    content: response,
  }
  return { messages: [toolMessage] };
}

构建图

最后,是时候构建您的图了

import { StateGraph } from "@langchain/langgraph";
import OpenAI from "openai";

// We can reuse the same `GraphState` from above as it has not changed.
const shouldContinue = (state: typeof StateAnnotation.State) => {
  const { messages } = state;
  const lastMessage =
    messages[messages.length - 1] as OpenAI.ChatCompletionAssistantMessageParam;
  if (lastMessage?.tool_calls !== undefined && lastMessage?.tool_calls.length > 0) {
    return "tools";
  }
  return "__end__";
}

const graph = new StateGraph(StateAnnotation)
  .addNode("model", callModel)
  .addNode("tools", callTools)
  .addEdge("__start__", "model")
  .addConditionalEdges("model", shouldContinue, {
    tools: "tools",
    __end__: "__end__",
  })
  .addEdge("tools", "model")
  .compile();
import * as tslab from "tslab";

const representation = graph.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();

await tslab.display.png(new Uint8Array(arrayBuffer));

流式传输 tokens

现在我们可以使用 .streamEvents 方法从 OpenAI 模型中获取流式传输的 tokens 和工具调用

const eventStream = await graph.streamEvents(
  { messages: [{ role: "user", content: "what's in the bedroom?" }] },
  { version: "v2" },
);

for await (const { event, name, data } of eventStream) {
  if (event === "on_custom_event") {
    console.log(name, data);
  }
}
streamed_tool_call_chunk {
  index: 0,
  id: 'call_v99reml4gZvvUypPgOpLgxM2',
  type: 'function',
  function: { name: 'get_items', arguments: '' }
}
streamed_tool_call_chunk { index: 0, function: { arguments: '{"' } }
streamed_tool_call_chunk { index: 0, function: { arguments: 'place' } }
streamed_tool_call_chunk { index: 0, function: { arguments: '":"' } }
streamed_tool_call_chunk { index: 0, function: { arguments: 'bed' } }
streamed_tool_call_chunk { index: 0, function: { arguments: 'room' } }
streamed_tool_call_chunk { index: 0, function: { arguments: '"}' } }
streamed_token { content: 'In' }
streamed_token { content: ' the' }
streamed_token { content: ' bedroom' }
streamed_token { content: ',' }
streamed_token { content: ' you' }
streamed_token { content: ' can' }
streamed_token { content: ' find' }
streamed_token { content: ' socks' }
streamed_token { content: ',' }
streamed_token { content: ' shoes' }
streamed_token { content: ',' }
streamed_token { content: ' and' }
streamed_token { content: ' dust' }
streamed_token { content: ' b' }
streamed_token { content: 'unnies' }
streamed_token { content: '.' }
并且如果您设置了 LangSmith 追踪,您还将看到像这样的追踪