如何从最终节点流式传输¶
在 [1]
已复制!
%%capture --no-stderr
%pip install -U langgraph langchain-openai langchain-community
%%capture --no-stderr %pip install -U langgraph langchain-openai langchain-community
在 [3]
已复制!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
定义模型和工具¶
在 [4]
已复制!
from typing import Literal
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt import ToolNode
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
final_model = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
model = model.bind_tools(tools)
# NOTE: this is where we're adding a tag that we'll can use later to filter the model stream events to only the model called in the final node.
# This is not necessary if you call a single LLM but might be important in case you call multiple models within the node and want to filter events
# from only one of them.
final_model = final_model.with_config(tags=["final_node"])
tool_node = ToolNode(tools=tools)
from typing import Literal from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.runnables import ConfigurableField from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent from langgraph.prebuilt import ToolNode @tool def get_weather(city: Literal["nyc", "sf"]): """使用此工具获取天气信息。""" if city == "nyc": return "纽约可能多云" elif city == "sf": return "旧金山永远阳光明媚" else: raise AssertionError("未知城市") tools = [get_weather] model = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) final_model = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) model = model.bind_tools(tools) # 注意:这是我们在添加一个标签的地方,稍后我们可以使用它来过滤模型流式传输事件,只保留最终节点中调用的模型的事件。 # 如果您只调用一个 LLM,则无需这样做,但如果您在节点内调用多个模型并希望过滤事件 # 从其中一个模型中,这可能很重要。 final_model = final_model.with_config(tags=["final_node"]) tool_node = ToolNode(tools=tools)
定义图¶
在 [5]
已复制!
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import MessagesState
from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
def should_continue(state: MessagesState) -> Literal["tools", "final"]:
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then we route to the "tools" node
if last_message.tool_calls:
return "tools"
# Otherwise, we stop (reply to the user)
return "final"
def call_model(state: MessagesState):
messages = state["messages"]
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
def call_final_model(state: MessagesState):
messages = state["messages"]
last_ai_message = messages[-1]
response = final_model.invoke(
[
SystemMessage("Rewrite this in the voice of Al Roker"),
HumanMessage(last_ai_message.content),
]
)
# overwrite the last AI message from the agent
response.id = last_ai_message.id
return {"messages": [response]}
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_node("tools", tool_node)
# add a separate final node
builder.add_node("final", call_final_model)
builder.add_edge(START, "agent")
builder.add_conditional_edges(
"agent",
should_continue,
)
builder.add_edge("tools", "agent")
builder.add_edge("final", END)
graph = builder.compile()
from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import END, StateGraph, START from langgraph.graph.message import MessagesState from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage def should_continue(state: MessagesState) -> Literal["tools", "final"]: messages = state["messages"] last_message = messages[-1] # 如果 LLM 进行工具调用,则将路由到“tools”节点 if last_message.tool_calls: return "tools" # 否则,我们将停止(回复用户) return "final" def call_model(state: MessagesState): messages = state["messages"] response = model.invoke(messages) # 我们返回一个列表,因为它将被添加到现有的列表中 return {"messages": [response]} def call_final_model(state: MessagesState): messages = state["messages"] last_ai_message = messages[-1] response = final_model.invoke( [ SystemMessage("用艾尔·罗克的声音重写这段话"), HumanMessage(last_ai_message.content), ] ) # 覆盖代理回复的最后一个 AI 消息。response.id = last_ai_message.id return {"messages": [response]} builder = StateGraph(MessagesState) builder.add_node("agent", call_model) builder.add_node("tools", tool_node) # 添加一个单独的最终节点 builder.add_node("final", call_final_model) builder.add_edge(START, "agent") builder.add_conditional_edges( "agent", should_continue, ) builder.add_edge("tools", "agent") builder.add_edge("final", END) graph = builder.compile()
在 [6]
已复制!
from IPython.display import display, Image
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import display, Image display(Image(graph.get_graph().draw_mermaid_png()))
从最终节点流式传输输出¶
根据事件元数据过滤¶
获取特定节点(在本例中为“final”节点)内部的 LLM 事件的第一个选项是根据事件元数据中的“langgraph_node”字段进行过滤。如果您需要从节点内的所有 LLM 调用中流式传输事件,这将足够了。这意味着,如果您在节点内调用了多个不同的 LLM,此过滤器将包括来自所有 LLM 的事件。
在 [7]
已复制!
from langchain_core.messages import HumanMessage
inputs = {"messages": [HumanMessage(content="what is the weather in sf")]}
for msg, metadata in graph.stream(inputs, stream_mode="messages"):
if (
msg.content
and not isinstance(msg, HumanMessage)
and metadata["langgraph_node"] == "final"
):
print(msg.content, end="|", flush=True)
from langchain_core.messages import HumanMessage inputs = {"messages": [HumanMessage(content="旧金山的 天气怎么样")]} for msg, metadata in graph.stream(inputs, stream_mode="messages"): if ( msg.content and not isinstance(msg, HumanMessage) and metadata["langgraph_node"] == "final" ): print(msg.content, end="|", flush=True)
Well| folks|,| let| me| tell| you|,| the| weather| in| San| Francisco| is| always| sunny|!| That|'s| right|,| you| can| expect| clear| skies| and| plenty| of| sunshine| when| you|'re| in| the| City| by| the| Bay|.| So| grab| your| sunglasses| and| get| ready| to| enjoy| some| beautiful| weather| in| San| Francisco|!|
根据自定义标签过滤¶
或者,您可以像我们在开始时那样将自定义标签的配置添加到您的 LLM 中,通过添加 final_model.with_config(tags=["final_node"])
。这将使我们能够更准确地过滤事件,只保留来自此模型的事件。
在 [8]
已复制!
inputs = {"messages": [HumanMessage(content="what's the weather in nyc?")]}
async for event in graph.astream_events(inputs, version="v2"):
kind = event["event"]
tags = event.get("tags", [])
# filter on the custom tag
if kind == "on_chat_model_stream" and "final_node" in event.get("tags", []):
data = event["data"]
if data["chunk"].content:
# Empty content in the context of OpenAI or Anthropic usually means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(data["chunk"].content, end="|", flush=True)
inputs = {"messages": [HumanMessage(content="纽约的天气怎么样?")]} async for event in graph.astream_events(inputs, version="v2"): kind = event["event"] tags = event.get("tags", []) # 根据自定义标签过滤 if kind == "on_chat_model_stream" and "final_node" in event.get("tags", []): data = event["data"] if data["chunk"].content: # OpenAI 或 Anthropic 上下文中的空内容通常表示 # 模型要求调用工具。 # 所以我们只打印非空内容 print(data["chunk"].content, end="|", flush=True)
Looks| like| we|'ve| got| some| clouds| roll|in|'| in| over| the| Big| Apple| today|,| folks|!| Keep| an| eye| out| for| some| over|cast| skies| in| NYC|.|