跳到内容

如何从工具内部流式传输数据

先决条件

本指南假定您熟悉以下内容

如果您的图调用了使用 LLM 或任何其他流式 API 的工具,您可能希望在工具执行期间显示部分结果,尤其是在工具运行时间较长的情况下。

  1. 要从工具内部流式传输任意数据,您可以使用 stream_mode="custom"get_stream_writer()

    from langgraph.config import get_stream_writer
    
    def tool(tool_arg: str):
        writer = get_stream_writer()
        for chunk in custom_data_stream():
            # stream any arbitrary data
            writer(chunk)
        ...
    
    for chunk in graph.stream(
        inputs,
        stream_mode="custom"
    ):
        print(chunk)
    
  2. 要流式传输工具调用 LLM 生成的 LLM 令牌,您可以使用 stream_mode="messages"

    from langgraph.graph import StateGraph, MessagesState
    from langchain_openai import ChatOpenAI
    
    model = ChatOpenAI()
    
    def tool(tool_arg: str):
        model.invoke(tool_arg)
        ...
    
    def call_tools(state: MessagesState):
        tool_call = get_tool_call(state)
        tool_result = tool(**tool_call["args"])
        ...
    
    graph = (
        StateGraph(MessagesState)
        .add_node(call_tools)
        ...
        .compile()
    
    for msg, metadata in graph.stream(
        inputs,
        stream_mode="messages"
    ):
        print(msg)
    

不使用 LangChain

如果您需要从工具内部流式传输数据,而无需使用 LangChain,您可以使用 stream_mode="custom"。查看下面的示例以了解更多信息。

Python < 3.11 中的异步

当将 Python < 3.11 与异步代码一起使用时,请确保在调用聊天模型时手动将 RunnableConfig 传递给它,例如:model.ainvoke(..., config)。stream 方法使用作为回调传递的流式跟踪器收集来自嵌套代码的所有事件。在 3.11 及更高版本中,这通过 contextvars 自动处理;在 3.11 之前,asyncio 的任务 缺乏适当的 contextvar 支持,这意味着回调仅在您手动传递 config 时才会传播。我们在下面的 call_model 函数中执行此操作。

设置

首先,让我们安装所需的软件包并设置我们的 API 密钥

%%capture --no-stderr
%pip install -U langgraph langchain-openai

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
OPENAI_API_KEY:  ········

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 让您可以使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 阅读更多关于如何开始使用的信息 此处

流式传输自定义数据

我们将使用 预构建的 ReAct 代理 用于本指南

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer


@tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()

    # this can be replaced with any actual streaming logic that you might have
    items = ["books", "penciles", "pictures"]
    for chunk in items:
        writer({"custom_tool_data": chunk})

    return ", ".join(items)


llm = ChatOpenAI(model_name="gpt-4o-mini")
tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API 参考:tool | ChatOpenAI | create_react_agent

现在让我们使用需要工具调用的输入来调用我们的代理

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the office?"}
    ]
}
async for chunk in agent.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk)
{'custom_tool_data': 'books'}
{'custom_tool_data': 'penciles'}
{'custom_tool_data': 'pictures'}

流式传输 LLM 令牌

from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import RunnableConfig


@tool
async def get_items(
    place: str,
    # Manually accept config (needed for Python <= 3.10)
    config: RunnableConfig,
) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    # Attention: when using async, you should be invoking the LLM using ainvoke!
    # If you fail to do so, streaming will NOT work.
    response = await llm.ainvoke(
        [
            {
                "role": "user",
                "content": (
                    f"Can you tell me what kind of items i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. And include a brief description of each item."
                ),
            }
        ],
        config,
    )
    return response.content


tools = [get_items]
# contains `agent` (tool-calling LLM) and `tools` (tool executor) nodes
agent = create_react_agent(llm, tools=tools)

API 参考:AIMessageChunk | RunnableConfig

inputs = {
    "messages": [  
        {"role": "user", "content": "what items are in the bedroom?"}
    ]
}
async for msg, metadata in agent.astream(
    inputs,
    stream_mode="messages",
):
    if (
        isinstance(msg, AIMessageChunk)
        and msg.content
        # Stream all messages from the tool node
        and metadata["langgraph_node"] == "tools"
    ):
        print(msg.content, end="|", flush=True)
Certainly|!| Here| are| three| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| supported| by| a| frame|.| It| is| designed| for| sleeping| and| can| vary| in| size| from| twin| to| king|.| Beds| often| have| bedding|,| including| sheets|,| pillows|,| and| comfort|ers|,| to| enhance| comfort|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| drawers| used| for| storing| clothing| and| personal| items|.| Dress|ers| often| have| a| flat| surface| on| top|,| which| can| be| used| for| decorative| items|,| a| mirror|,| or| personal| accessories|.| They| help| keep| the| bedroom| organized| and| clutter|-free|.

|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| such| as| a| lamp|,| alarm| clock|,| books|,| or| personal| items|.| Night|stands| provide| convenience| for| easy| access| to| essentials| during| the| night|,| adding| functionality| and| style| to| the| bedroom| decor|.|

不使用 LangChain 的示例

您还可以从工具调用内部流式传输数据,而无需使用 LangChain。以下示例演示了如何为具有单个工具执行节点的图执行此操作。我们将把它留给读者作为练习,从头开始实现 ReAct 代理,而无需使用 LangChain。

import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# this is our tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# this is the tool-calling graph node
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)  
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)

API 参考:StateGraph | START

现在让我们使用包含工具调用的 AI 消息来调用我们的图

inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)
Sure|!| Here| are| three| common| items| you| might| find| in| a| bedroom|:

|1|.| **|Bed|**|:| The| focal| point| of| the| bedroom|,| a| bed| typically| consists| of| a| mattress| resting| on| a| frame|,| and| it| may| include| pillows| and| bedding|.| It| provides| a| comfortable| place| for| sleeping| and| resting|.

|2|.| **|D|resser|**|:| A| piece| of| furniture| with| multiple| drawers|,| a| dresser| is| used| for| storing| clothes|,| accessories|,| and| personal| items|.| It| often| has| a| flat| surface| that| may| be| used| to| display| decorative| items| or| a| mirror|.

|3|.| **|Night|stand|**|:| Also| known| as| a| bedside| table|,| a| night|stand| is| placed| next| to| the| bed| and| typically| holds| items| like| lamps|,| books|,| alarm| clocks|,| and| personal| belongings| for| convenience| during| the| night|.

|These| items| contribute| to| the| functionality| and| comfort| of| the| bedroom| environment|.|

评论