跳到内容

如何等待用户输入(函数式 API)

先决条件

本指南假设您熟悉以下内容

人机协作 (HIL) 交互对于 智能体系统至关重要。等待人工输入是一种常见的人机协作交互模式,允许智能体向用户提出澄清问题并在继续之前等待输入。

我们可以在 LangGraph 中使用 interrupt() 函数来实现这一点。interrupt 允许我们停止图执行以收集用户输入,并使用收集到的输入继续执行。

本指南演示了如何使用 LangGraph 的 函数式 API 实现人机协作工作流。具体来说,我们将演示

  1. 一个简单的用法示例
  2. 如何与 ReAct 代理一起使用

设置

注意

本指南需要 @langchain/langgraph>=0.2.42

首先,安装此示例所需的依赖项

npm install @langchain/langgraph @langchain/openai @langchain/core zod

接下来,我们需要为 OpenAI 设置 API 密钥(我们将使用的 LLM)

process.env.OPENAI_API_KEY = "YOUR_API_KEY";

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 —— 在此处阅读更多关于如何开始的信息

简单用法

让我们演示一个简单的用法示例。我们将创建三个 任务

  1. 追加 “bar”。
  2. 暂停等待人工输入。恢复时,追加人工输入。
  3. 追加 “qux”。
import { task, interrupt } from "@langchain/langgraph";

const step1 = task("step1", async (inputQuery: string) => {
  return `${inputQuery} bar`;
});

const humanFeedback = task(
  "humanFeedback",
  async (inputQuery: string) => {
    const feedback = interrupt(`Please provide feedback: ${inputQuery}`);
    return `${inputQuery} ${feedback}`;
  });

const step3 = task("step3", async (inputQuery: string) => {
  return `${inputQuery} qux`;
});

我们现在可以在一个简单的 入口点中组合这些任务

import { MemorySaver, entrypoint } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const graph = entrypoint({
  name: "graph",
  checkpointer,
}, async (inputQuery: string) => {
  const result1 = await step1(inputQuery);
  const result2 = await humanFeedback(result1);
  const result3 = await step3(result2);
  return result3;
});

我们为启用人机协作工作流所做的一切就是在任务内部调用 interrupt()

提示

先前任务的结果(在本例中为 step1)会被持久化,以便在 interrupt 之后不会再次运行。

让我们发送一个查询字符串

const config = {
  configurable: {
    thread_id: "1"
  }
};

const stream = await graph.stream("foo", config);

for await (const event of stream) {
  console.log(event);
}
{ step1: 'foo bar' }
{
  __interrupt__: [
    {
      value: 'Please provide feedback: foo bar',
      when: 'during',
      resumable: true,
      ns: [Array]
    }
  ]
}
请注意,我们在 step1 之后通过 interrupt 暂停了。interrupt 提供了恢复运行的指令。要恢复,我们发出一个 Command,其中包含 humanFeedback 任务期望的数据。

import { Command } from "@langchain/langgraph";

const resumeStream = await graph.stream(new Command({
  resume: "baz"
}), config);

// Continue execution
for await (const event of resumeStream) {
  if (event.__metadata__?.cached) {
    continue;
  }
  console.log(event);
}
{ humanFeedback: 'foo bar baz' }
{ step3: 'foo bar baz qux' }
{ graph: 'foo bar baz qux' }
恢复后,运行将继续执行剩余步骤并按预期终止。

代理

我们将基于“如何使用函数式 API 创建 ReAct 代理”指南中创建的代理进行构建。

在这里,我们将扩展代理,使其在需要时可以向人类寻求帮助。

定义模型和工具

首先,让我们定义我们将在示例中使用的工具和模型。与 ReAct 代理指南中一样,我们将使用一个占位符工具,用于获取某个位置的天气描述。

在这个例子中,我们将使用 OpenAI 聊天模型,但任何支持工具调用的模型都足够了。

import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
});

const getWeather = tool(async ({ location }) => {
  // This is a placeholder for the actual implementation
  const lowercaseLocation = location.toLowerCase();
  if (lowercaseLocation.includes("sf") || lowercaseLocation.includes("san francisco")) {
    return "It's sunny!";
  } else if (lowercaseLocation.includes("boston")) {
    return "It's rainy!";
  } else {
    return `I am not sure what the weather is in ${location}`;
  }
}, {
  name: "getWeather",
  schema: z.object({
    location: z.string().describe("Location to get the weather for"),
  }),
  description: "Call to get the weather from a specific location.",
});

为了向人类寻求帮助,我们可以简单地添加一个调用 interrupt 的工具

import { interrupt } from "@langchain/langgraph";
import { z } from "zod";

const humanAssistance = tool(async ({ query }) => {
  const humanResponse = interrupt({ query });
  return humanResponse.data;
}, {
  name: "humanAssistance",
  description: "Request assistance from a human.",
  schema: z.object({
    query: z.string().describe("Human readable question for the human")
  })
});

const tools = [getWeather, humanAssistance];

定义任务

我们的任务与 ReAct 代理指南中的任务基本相同

  1. 调用模型:我们希望使用消息列表查询我们的聊天模型。
  2. 调用工具:如果我们的模型生成工具调用,我们希望执行它们。

我们只是为模型添加了一个可访问的工具。

import {
  type BaseMessageLike,
  AIMessage,
  ToolMessage,
} from "@langchain/core/messages";
import { type ToolCall } from "@langchain/core/messages/tool";
import { task } from "@langchain/langgraph";

const toolsByName = Object.fromEntries(tools.map((tool) => [tool.name, tool]));

const callModel = task("callModel", async (messages: BaseMessageLike[]) => {
  const response = await model.bindTools(tools).invoke(messages);
  return response;
});

const callTool = task(
  "callTool",
  async (toolCall: ToolCall): Promise<AIMessage> => {
    const tool = toolsByName[toolCall.name];
    const observation = await tool.invoke(toolCall.args);
    return new ToolMessage({ content: observation, tool_call_id: toolCall.id });
    // Can also pass toolCall directly into the tool to return a ToolMessage
    // return tool.invoke(toolCall);
  });

定义入口点

我们的 入口点也与 ReAct 代理指南中的入口点相同

import { entrypoint, addMessages, MemorySaver } from "@langchain/langgraph";

const agent = entrypoint({
  name: "agent",
  checkpointer: new MemorySaver(),
}, async (messages: BaseMessageLike[]) => {
  let currentMessages = messages;
  let llmResponse = await callModel(currentMessages);
  while (true) {
    if (!llmResponse.tool_calls?.length) {
      break;
    }

    // Execute tools
    const toolResults = await Promise.all(
      llmResponse.tool_calls.map((toolCall) => {
        return callTool(toolCall);
      })
    );

    // Append to message list
    currentMessages = addMessages(currentMessages, [llmResponse, ...toolResults]);

    // Call model again
    llmResponse = await callModel(currentMessages);
  }

  return llmResponse;
});

用法

让我们用一个需要人工协助的问题来调用我们的模型。我们的问题还需要调用 getWeather 工具

import { BaseMessage, isAIMessage } from "@langchain/core/messages";

const prettyPrintMessage = (message: BaseMessage) => {
  console.log("=".repeat(30), `${message.getType()} message`, "=".repeat(30));
  console.log(message.content);
  if (isAIMessage(message) && message.tool_calls?.length) {
    console.log(JSON.stringify(message.tool_calls, null, 2));
  }
}

const prettyPrintStep = (step: Record<string, any>) => {
  if (step.__metadata__?.cached) {
    return;
  }
  for (const [taskName, update] of Object.entries(step)) {
    const message = update as BaseMessage;
    // Only print task updates
    if (taskName === "agent") continue;
    console.log(`\n${taskName}:`);
    if (taskName === "__interrupt__") {
      console.log(update);
    } else {
      prettyPrintMessage(message);
    }
  }
}

const userMessage = {
  role: "user",
  content: [
    "Can you reach out for human assistance: what should I feed my cat?",
    "Separately, can you check the weather in San Francisco?"
  ].join(" "),
};
console.log(userMessage);

const agentStream = await agent.stream([userMessage], {
  configurable: {
    thread_id: "1",
  }
});

let lastStep;

for await (const step of agentStream) {
  prettyPrintStep(step);
  lastStep = step;
}
{
  role: 'user',
  content: 'Can you reach out for human assistance: what should I feed my cat? Separately, can you check the weather in San Francisco?'
}

callModel:
============================== ai message ==============================

[
  {
    "name": "humanAssistance",
    "args": {
      "query": "What should I feed my cat?"
    },
    "type": "tool_call",
    "id": "call_TwrNq6tGI61cDCJEpj175h7J"
  },
  {
    "name": "getWeather",
    "args": {
      "location": "San Francisco"
    },
    "type": "tool_call",
    "id": "call_fMzUBvc0SpZpXxM2LQLXfbke"
  }
]

callTool:
============================== tool message ==============================
It's sunny!

__interrupt__:
[
  {
    value: { query: 'What should I feed my cat?' },
    when: 'during',
    resumable: true,
    ns: [ 'callTool:2e0c6c40-9541-57ef-a7af-24213a10d5a4' ]
  }
]
请注意,我们生成了两个工具调用,虽然我们的运行被中断了,但我们没有阻止 get_weather 工具的执行。

让我们检查一下我们在哪里被中断了

console.log(JSON.stringify(lastStep));
{"__interrupt__":[{"value":{"query":"What should I feed my cat?"},"when":"during","resumable":true,"ns":["callTool:2e0c6c40-9541-57ef-a7af-24213a10d5a4"]}]}
我们可以通过发出 Command 来恢复执行。请注意,我们在 Command 中提供的数据可以根据 humanAssistance 的实现进行自定义以满足您的需求。

import { Command } from "@langchain/langgraph";

const humanResponse = "You should feed your cat a fish.";
const humanCommand = new Command({
  resume: { data: humanResponse },
});

const resumeStream2 = await agent.stream(humanCommand, config);

for await (const step of resumeStream2) {
  prettyPrintStep(step);
}
callTool:
============================== tool message ==============================
You should feed your cat a fish.

callModel:
============================== ai message ==============================
For your cat, it is suggested that you feed it fish. As for the weather in San Francisco, it's currently sunny!
在上面,当我们恢复时,我们提供了最终的工具消息,允许模型生成其响应。查看 LangSmith 跟踪以查看运行的完整细分

  1. 来自初始查询的跟踪
  2. 恢复后的跟踪

注意: interrupt 函数通过抛出特殊的 GraphInterrupt 错误来传播。因此,您应该避免在 interrupt 函数周围使用 try/catch 块 - 或者如果您这样做,请确保 GraphInterrupt 错误在您的 catch 块中再次抛出。