跳到内容

如何等待用户输入 (函数式 API)

前提条件

本指南假设您熟悉以下内容

人机协作 (HIL) 交互对于 代理系统 至关重要。等待人工输入是一种常见的人机协作交互模式,允许代理询问用户澄清问题并在继续之前等待输入。

我们可以在 LangGraph 中使用 interrupt() 函数来实现这一点。 interrupt 允许我们停止图的执行以收集用户的输入,并使用收集到的输入继续执行。

本指南演示了如何使用 LangGraph 的 函数式 API 实现人机协作工作流程。具体来说,我们将演示

  1. 一个简单的用法示例
  2. 如何与 ReAct 代理一起使用

设置

首先,让我们安装所需的软件包并设置我们的 API 密钥

%%capture --no-stderr
%pip install -U langgraph langchain-openai
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

设置 LangSmith 以获得更好的调试体验

注册 LangSmith 以快速发现问题并提高您的 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序——阅读 文档 以了解更多关于如何开始的信息。

简单用法

让我们演示一个简单的用法示例。我们将创建三个 任务

  1. 追加 "bar"
  2. 暂停等待人工输入。恢复时,追加人工输入。
  3. 追加 "qux"
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """Append bar."""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """Append user input."""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """Append qux."""
    return f"{input_query} qux"

API 参考:entrypoint | task | Command | interrupt

我们现在可以在一个简单的 入口点 中组合这些任务

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3

API 参考:MemorySaver

我们为启用人机协作工作流程所做的全部工作就是在任务内部调用 interrupt()

提示

先前任务的结果——在本例中为 step_1——被持久化,因此在 interrupt 之后不会再次运行。

让我们发送一个查询字符串

config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
{'step_1': 'foo bar'}


{'__interrupt__': (Interrupt(value='Please provide feedback: foo bar', resumable=True, ns=['graph:d66b2e35-0ee3-d8d6-1a22-aec9d58f13b9', 'human_feedback:e0cd4ee2-b874-e1d2-8bc4-3f7ddc06bcc2'], when='during'),)}
请注意,我们在 step_1 之后使用 interrupt 暂停了。interrupt 提供了恢复运行的指令。要恢复,我们发出一个 Command,其中包含 human_feedback 任务期望的数据。

# Continue execution
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
{'human_feedback': 'foo bar baz'}


{'step_3': 'foo bar baz qux'}


{'graph': 'foo bar baz qux'}
恢复后,运行将继续执行剩余步骤并按预期终止。

代理

我们将以 如何使用函数式 API 创建 ReAct 代理 指南中创建的代理为基础进行构建。

在这里,我们将扩展代理,允许它在需要时联系人工寻求帮助。

定义模型和工具

让我们首先定义我们将用于示例的工具和模型。与 ReAct 代理指南 中一样,我们将使用一个简单的占位工具来获取某个地点的天气描述。

在本示例中,我们将使用 OpenAI 聊天模型,但任何 支持工具调用 的模型都足够了。

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

model = ChatOpenAI(model="gpt-4o-mini")


@tool
def get_weather(location: str):
    """Call to get the weather from a specific location."""
    # This is a placeholder for the actual implementation
    if any([city in location.lower() for city in ["sf", "san francisco"]]):
        return "It's sunny!"
    elif "boston" in location.lower():
        return "It's rainy!"
    else:
        return f"I am not sure what the weather is in {location}"

API 参考:ChatOpenAI | tool

为了联系人工寻求帮助,我们可以简单地添加一个调用 interrupt 的工具

from langgraph.types import Command, interrupt


@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]


tools = [get_weather, human_assistance]

API 参考:Command | interrupt

定义任务

我们的任务与 ReAct 代理指南 中的任务基本没有变化

  1. 调用模型:我们希望使用消息列表查询我们的聊天模型。
  2. 调用工具:如果我们的模型生成工具调用,我们希望执行它们。

我们只有一个模型可访问的工具。

from langchain_core.messages import ToolMessage
from langgraph.func import entrypoint, task

tools_by_name = {tool.name: tool for tool in tools}


@task
def call_model(messages):
    """Call model with a sequence of messages."""
    response = model.bind_tools(tools).invoke(messages)
    return response


@task
def call_tool(tool_call):
    tool = tools_by_name[tool_call["name"]]
    observation = tool.invoke(tool_call)
    return ToolMessage(content=observation, tool_call_id=tool_call["id"])

API 参考:ToolMessage | entrypoint | task

定义入口点

我们的 入口点 也与 ReAct 代理指南 中的入口点相同

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.message import add_messages

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    llm_response = call_model(messages).result()
    while True:
        if not llm_response.tool_calls:
            break

        # Execute tools
        tool_result_futures = [
            call_tool(tool_call) for tool_call in llm_response.tool_calls
        ]
        tool_results = [fut.result() for fut in tool_result_futures]

        # Append to message list
        messages = add_messages(messages, [llm_response, *tool_results])

        # Call model again
        llm_response = call_model(messages).result()

    # Generate final response
    messages = add_messages(messages, llm_response)
    return entrypoint.final(value=llm_response, save=messages)

API 参考:MemorySaver | add_messages

用法

让我们使用一个需要人工协助的问题来调用我们的模型。我们的问题还需要调用 get_weather 工具

def _print_step(step: dict) -> None:
    for task_name, result in step.items():
        if task_name == "agent":
            continue  # just stream from tasks
        print(f"\n{task_name}:")
        if task_name == "__interrupt__":
            print(result)
        else:
            result.pretty_print()
config = {"configurable": {"thread_id": "1"}}

user_message = {
    "role": "user",
    "content": (
        "Can you reach out for human assistance: what should I feed my cat? "
        "Separately, can you check the weather in San Francisco?"
    ),
}
print(user_message)

for step in agent.stream([user_message], config):
    _print_step(step)
{'role': 'user', 'content': 'Can you reach out for human assistance: what should I feed my cat? Separately, can you check the weather in San Francisco?'}

call_model:
================================== Ai Message ==================================
Tool Calls:
  human_assistance (call_joAEBVX7Abfm7TsZ0k95ZkVx)
 Call ID: call_joAEBVX7Abfm7TsZ0k95ZkVx
  Args:
    query: What should I feed my cat?
  get_weather (call_ut7zfHFCcms63BOZLrRHszGH)
 Call ID: call_ut7zfHFCcms63BOZLrRHszGH
  Args:
    location: San Francisco

call_tool:
================================= Tool Message =================================

content="It's sunny!" name='get_weather' tool_call_id='call_ut7zfHFCcms63BOZLrRHszGH'

__interrupt__:
(Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)
请注意,我们生成了两个工具调用,虽然我们的运行被中断了,但我们并没有阻止 get_weather 工具的执行。

让我们检查一下我们在哪里被中断了

print(step)
{'__interrupt__': (Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)}
我们可以通过发出 Command 来恢复执行。请注意,我们在 Command 中提供的数据可以根据您对 human_assistance 的实现进行定制。

human_response = "You should feed your cat a fish."
human_command = Command(resume={"data": human_response})

for step in agent.stream(human_command, config):
    _print_step(step)
call_tool:
================================= Tool Message =================================

content='You should feed your cat a fish.' name='human_assistance' tool_call_id='call_joAEBVX7Abfm7TsZ0k95ZkVx'

call_model:
================================== Ai Message ==================================

For human assistance, you should feed your cat fish. 

Regarding the weather in San Francisco, it's sunny!
在上面,当我们恢复时,我们提供了最终的工具消息,允许模型生成其响应。查看 LangSmith 追踪以查看运行的完整分解

  1. 来自初始查询的追踪
  2. 恢复后的追踪

评论