如何在没有 LangChain LLM 的情况下流式传输 LLM 令牌¶
在这个示例中,我们将从为代理提供动力的语言模型中流式传输令牌。我们将直接使用 OpenAI 客户端库,而不是使用 LangChain 聊天模型。我们还将使用 ReAct 代理作为示例。
设置¶
首先,让我们安装所需的包并设置我们的 API 密钥
In [1]
已复制!
%%capture --no-stderr
%pip install -U langgraph openai
%%capture --no-stderr %pip install -U langgraph openai
In [ ]
已复制!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
定义模型、工具和图¶
定义一个将调用 OpenAI API 的节点¶
In [7]
已复制!
from openai import AsyncOpenAI
from langchain_core.language_models.chat_models import ChatGenerationChunk
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables.config import (
ensure_config,
get_callback_manager_for_config,
)
openai_client = AsyncOpenAI()
# define tool schema for openai tool calling
tool = {
"type": "function",
"function": {
"name": "get_items",
"description": "Use this tool to look up which items are in the given place.",
"parameters": {
"type": "object",
"properties": {"place": {"type": "string"}},
"required": ["place"],
},
},
}
async def call_model(state, config=None):
config = ensure_config(config | {"tags": ["agent_llm"]})
callback_manager = get_callback_manager_for_config(config)
messages = state["messages"]
llm_run_manager = callback_manager.on_chat_model_start({}, [messages])[0]
response = await openai_client.chat.completions.create(
messages=messages, model="gpt-3.5-turbo", tools=[tool], stream=True
)
response_content = ""
role = None
tool_call_id = None
tool_call_function_name = None
tool_call_function_arguments = ""
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
response_content += delta.content
# note: we're wrapping the response in ChatGenerationChunk so that we can stream this back using stream_mode="messages"
chunk = ChatGenerationChunk(
message=AIMessageChunk(
content=delta.content,
)
)
llm_run_manager.on_llm_new_token(delta.content, chunk=chunk)
if delta.tool_calls:
# note: for simplicity we're only handling a single tool call here
if delta.tool_calls[0].function.name is not None:
tool_call_function_name = delta.tool_calls[0].function.name
tool_call_id = delta.tool_calls[0].id
# note: we're wrapping the tools calls in ChatGenerationChunk so that we can stream this back using stream_mode="messages"
tool_call_chunk = ChatGenerationChunk(
message=AIMessageChunk(
content="",
additional_kwargs={"tool_calls": [delta.tool_calls[0].dict()]},
)
)
llm_run_manager.on_llm_new_token("", chunk=tool_call_chunk)
tool_call_function_arguments += delta.tool_calls[0].function.arguments
if tool_call_function_name is not None:
tool_calls = [
{
"id": tool_call_id,
"function": {
"name": tool_call_function_name,
"arguments": tool_call_function_arguments,
},
"type": "function",
}
]
else:
tool_calls = None
response_message = {
"role": role,
"content": response_content,
"tool_calls": tool_calls,
}
return {"messages": [response_message]}
from openai import AsyncOpenAI from langchain_core.language_models.chat_models import ChatGenerationChunk from langchain_core.messages import AIMessageChunk from langchain_core.runnables.config import ( ensure_config, get_callback_manager_for_config, ) openai_client = AsyncOpenAI() # 定义用于 openai 工具调用的工具模式 tool = { "type": "function", "function": { "name": "get_items", "description": "使用此工具查找给定位置的哪些物品。", "parameters": { "type": "object", "properties": {"place": {"type": "string"}}, "required": ["place"], }, }, } async def call_model(state, config=None): config = ensure_config(config | {"tags": ["agent_llm"]}) callback_manager = get_callback_manager_for_config(config) messages = state["messages"] llm_run_manager = callback_manager.on_chat_model_start({}, [messages])[0] response = await openai_client.chat.completions.create( messages=messages, model="gpt-3.5-turbo", tools=[tool], stream=True ) response_content = "" role = None tool_call_id = None tool_call_function_name = None tool_call_function_arguments = "" async for chunk in response: delta = chunk.choices[0].delta if delta.role is not None: role = delta.role if delta.content: response_content += delta.content # 注意:我们将响应包装在 ChatGenerationChunk 中,以便我们可以使用 stream_mode="messages" 流式传输回它 chunk = ChatGenerationChunk( message=AIMessageChunk( content=delta.content, ) ) llm_run_manager.on_llm_new_token(delta.content, chunk=chunk) if delta.tool_calls: # 注意:为简单起见,我们这里只处理单个工具调用 if delta.tool_calls[0].function.name is not None: tool_call_function_name = delta.tool_calls[0].function.name tool_call_id = delta.tool_calls[0].id # 注意:我们将工具调用包装在 ChatGenerationChunk 中,以便我们可以使用 stream_mode="messages" 流式传输回它 tool_call_chunk = ChatGenerationChunk( message=AIMessageChunk( content="", additional_kwargs={"tool_calls": [delta.tool_calls[0].dict()]}, ) ) llm_run_manager.on_llm_new_token("", chunk=tool_call_chunk) tool_call_function_arguments += delta.tool_calls[0].function.arguments if tool_call_function_name is not None: tool_calls = [ { "id": tool_call_id, "function": { "name": tool_call_function_name, "arguments": tool_call_function_arguments, }, "type": "function", } ] else: tool_calls = None response_message = { "role": role, "content": response_content, "tool_calls": tool_calls, } return {"messages": [response_message]}
定义我们的工具和一个工具调用节点¶
In [8]
已复制!
import json
async def get_items(place: str) -> str:
"""Use this tool to look up which items are in the given place."""
if "bed" in place: # For under the bed
return "socks, shoes and dust bunnies"
if "shelf" in place: # For 'shelf'
return "books, penciles and pictures"
else: # if the agent decides to ask about a different place
return "cat snacks"
# define mapping to look up functions when running tools
function_name_to_function = {"get_items": get_items}
async def call_tools(state):
messages = state["messages"]
tool_call = messages[-1]["tool_calls"][0]
function_name = tool_call["function"]["name"]
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await function_name_to_function[function_name](**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
import json async def get_items(place: str) -> str: """使用此工具查找给定位置的哪些物品。""" if "bed" in place: # 用于床下 return "袜子、鞋子和灰尘兔子" if "shelf" in place: # 用于“架子” return "书籍、铅笔和图片" else: # 如果代理决定询问其他位置 return "猫零食" # 定义映射以在运行工具时查找函数 function_name_to_function = {"get_items": get_items} async def call_tools(state): messages = state["messages"] tool_call = messages[-1]["tool_calls"][0] function_name = tool_call["function"]["name"] function_arguments = tool_call["function"]["arguments"] arguments = json.loads(function_arguments) function_response = await function_name_to_function[function_name](**arguments) tool_message = { "tool_call_id": tool_call["id"], "role": "tool", "name": function_name, "content": function_response, } return {"messages": [tool_message]}
定义我们的图¶
In [9]
已复制!
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END, START
class State(TypedDict):
messages: Annotated[list, operator.add]
def should_continue(state) -> Literal["tools", END]:
messages = state["messages"]
last_message = messages[-1]
if last_message["tool_calls"]:
return "tools"
return END
workflow = StateGraph(State)
workflow.add_edge(START, "model")
workflow.add_node("model", call_model) # i.e. our "agent"
workflow.add_node("tools", call_tools)
workflow.add_conditional_edges("model", should_continue)
workflow.add_edge("tools", "model")
graph = workflow.compile()
import operator from typing import Annotated, Literal from typing_extensions import TypedDict from langgraph.graph import StateGraph, END, START class State(TypedDict): messages: Annotated[list, operator.add] def should_continue(state) -> Literal["tools", END]: messages = state["messages"] last_message = messages[-1] if last_message["tool_calls"]: return "tools" return END workflow = StateGraph(State) workflow.add_edge(START, "model") workflow.add_node("model", call_model) # 例如,我们的“代理” workflow.add_node("tools", call_tools) workflow.add_conditional_edges("model", should_continue) workflow.add_edge("tools", "model") graph = workflow.compile()
流式传输令牌¶
In [10]
已复制!
from langchain_core.messages import AIMessageChunk
first = True
async for msg, metadata in graph.astream(
{"messages": [{"role": "user", "content": "what's in the bedroom"}]},
stream_mode="messages",
):
if msg.content:
print(msg.content, end="|", flush=True)
if isinstance(msg, AIMessageChunk):
if first:
gathered = msg
first = False
else:
gathered = gathered + msg
if msg.tool_call_chunks:
print(gathered.tool_calls)
from langchain_core.messages import AIMessageChunk first = True async for msg, metadata in graph.astream( {"messages": [{"role": "user", "content": "卧室里有什么"}]}, stream_mode="messages", ): if msg.content: print(msg.content, end="|", flush=True) if isinstance(msg, AIMessageChunk): if first: gathered = msg first = False else: gathered = gathered + msg if msg.tool_call_chunks: print(gathered.tool_calls)
[{'name': 'get_items', 'args': {}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {'place': ''}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {'place': 'bed'}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {'place': 'bedroom'}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] [{'name': 'get_items', 'args': {'place': 'bedroom'}, 'id': 'call_h7g3jsgeRXIOUiaEC0VtM4EI', 'type': 'tool_call'}] In| the| bedroom|,| you| have| socks|,| shoes|,| and| some| dust| b|unn|ies|.|