人机环路¶
本指南使用新的 interrupt
函数。
从 LangGraph 0.2.31 版本开始,设置断点的推荐方法是使用 interrupt
函数,因为它简化了人机环路模式。
如果您正在寻找此概念指南的先前版本,该版本依赖于静态断点和 NodeInterrupt
异常,请点击此处查看。
人机环路(或 “on-the-loop”)工作流程将人工输入集成到自动化流程中,允许在关键阶段进行决策、验证或更正。这在基于 LLM 的应用程序中尤其有用,因为底层模型可能会偶尔产生不准确之处。在合规性、决策制定或内容生成等低容错场景中,人工参与通过启用模型输出的审查、更正或覆盖来确保可靠性。
用例¶
在基于 LLM 的应用程序中,人机环路工作流程的主要用例包括
-
🛠️ 审查工具调用:人工可以在工具执行之前审查、编辑或批准 LLM 请求的工具调用。
-
✅ 验证 LLM 输出:人工可以审查、编辑或批准 LLM 生成的内容。
-
💡 提供上下文:使 LLM 能够显式请求人工输入以进行澄清或补充细节,或支持多轮对话。
interrupt
¶
LangGraph 中的 interrupt
函数通过在特定节点暂停图,向人工呈现信息,并使用他们的输入恢复图,从而实现人机环路工作流程。此函数对于批准、编辑或收集其他输入等任务非常有用。interrupt
函数与 Command
对象结合使用,以使用人工提供的值恢复图。
import { interrupt } from "@langchain/langgraph";
function humanNode(state: typeof GraphAnnotation.State) {
const value = interrupt(
// Any JSON serializable value to surface to the human.
// For example, a question or a piece of text or a set of keys in the state
{
text_to_revise: state.some_text,
}
);
// Update the state with the human's input or route the graph based on the input
return {
some_text: value,
};
}
const graph = workflow.compile({
checkpointer, // Required for `interrupt` to work
});
// Run the graph until the interrupt
const threadConfig = { configurable: { thread_id: "some_id" } };
await graph.invoke(someInput, threadConfig);
// Below code can run some amount of time later and/or in a different process
// Human input
const valueFromHuman = "...";
// Resume the graph with the human's input
await graph.invoke(new Command({ resume: valueFromHuman }), threadConfig);
完整代码
这是一个如何在图中使用 interrupt
的完整示例,如果您想查看实际代码。
import { MemorySaver, Annotation, interrupt, Command, StateGraph } from "@langchain/langgraph";
// Define the graph state
const StateAnnotation = Annotation.Root({
some_text: Annotation<string>()
});
function humanNode(state: typeof StateAnnotation.State) {
const value = interrupt(
// Any JSON serializable value to surface to the human.
// For example, a question or a piece of text or a set of keys in the state
{
text_to_revise: state.some_text
}
);
return {
// Update the state with the human's input
some_text: value
};
}
// Build the graph
const workflow = new StateGraph(StateAnnotation)
// Add the human-node to the graph
.addNode("human_node", humanNode)
.addEdge("__start__", "human_node")
// A checkpointer is required for `interrupt` to work.
const checkpointer = new MemorySaver();
const graph = workflow.compile({
checkpointer
});
// Using stream() to directly surface the `__interrupt__` information.
for await (const chunk of await graph.stream(
{ some_text: "Original text" },
threadConfig
)) {
console.log(chunk);
}
// Resume using Command
for await (const chunk of await graph.stream(
new Command({ resume: "Edited text" }),
threadConfig
)) {
console.log(chunk);
}
前提条件¶
要在您的图中使用 interrupt
,您需要
-
指定一个检查点以在每个步骤后保存图状态。
-
在适当的位置调用
interrupt()
。有关示例,请参阅设计模式部分。 -
使用线程 ID运行图,直到命中
interrupt
。 -
使用
invoke
/stream
恢复执行(请参阅Command 原语)。
设计模式¶
通常,您可以使用人机环路工作流程执行三种不同的操作
-
批准或拒绝:在关键步骤(例如 API 调用)之前暂停图,以审查和批准该操作。如果该操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。此模式通常涉及基于人工输入路由图。
-
编辑图状态:暂停图以审查和编辑图状态。这对于纠正错误或使用其他信息更新状态非常有用。此模式通常涉及使用人工输入更新状态。
-
获取输入:在图的特定步骤中显式请求人工输入。这对于收集其他信息或上下文以告知代理的决策过程或支持多轮对话非常有用。
下面我们展示可以使用这些操作实现的不同设计模式。
注意: interrupt
函数通过抛出特殊的 GraphInterrupt
错误来传播。因此,您应避免在 interrupt
函数周围使用 try/catch
块 - 或者,如果您这样做,请确保在您的 catch
块中再次抛出 GraphInterrupt
错误。
批准或拒绝¶
在关键步骤(例如 API 调用)之前暂停图,以审查和批准该操作。如果该操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。
import { interrupt, Command } from "@langchain/langgraph";
function humanApproval(state: typeof GraphAnnotation.State): Command {
const isApproved = interrupt({
question: "Is this correct?",
// Surface the output that should be
// reviewed and approved by the human.
llm_output: state.llm_output,
});
if (isApproved) {
return new Command({ goto: "some_node" });
} else {
return new Command({ goto: "another_node" });
}
}
// Add the node to the graph in an appropriate location
// and connect it to the relevant nodes.
const graph = graphBuilder
.addNode("human_approval", humanApproval)
.compile({ checkpointer });
// After running the graph and hitting the interrupt, the graph will pause.
// Resume it with either an approval or rejection.
const threadConfig = { configurable: { thread_id: "some_id" } };
await graph.invoke(new Command({ resume: true }), threadConfig);
有关更详细的示例,请参阅如何审查工具调用。
审查和编辑状态¶
import { interrupt } from "@langchain/langgraph";
function humanEditing(state: typeof GraphAnnotation.State): Command {
const result = interrupt({
// Interrupt information to surface to the client.
// Can be any JSON serializable value.
task: "Review the output from the LLM and make any necessary edits.",
llm_generated_summary: state.llm_generated_summary,
});
// Update the state with the edited text
return {
llm_generated_summary: result.edited_text,
};
}
// Add the node to the graph in an appropriate location
// and connect it to the relevant nodes.
const graph = graphBuilder
.addNode("human_editing", humanEditing)
.compile({ checkpointer });
// After running the graph and hitting the interrupt, the graph will pause.
// Resume it with the edited text.
const threadConfig = { configurable: { thread_id: "some_id" } };
await graph.invoke(
new Command({ resume: { edited_text: "The edited text" } }),
threadConfig
);
有关更详细的示例,请参阅如何使用 interrupt 等待用户输入。
审查工具调用¶
import { interrupt, Command } from "@langchain/langgraph";
function humanReviewNode(state: typeof GraphAnnotation.State): Command {
// This is the value we'll be providing via Command.resume(<human_review>)
const humanReview = interrupt({
question: "Is this correct?",
// Surface tool calls for review
tool_call: toolCall,
});
const [reviewAction, reviewData] = humanReview;
// Approve the tool call and continue
if (reviewAction === "continue") {
return new Command({ goto: "run_tool" });
}
// Modify the tool call manually and then continue
else if (reviewAction === "update") {
const updatedMsg = getUpdatedMsg(reviewData);
// Remember that to modify an existing message you will need
// to pass the message with a matching ID.
return new Command({
goto: "run_tool",
update: { messages: [updatedMsg] },
});
}
// Give natural language feedback, and then pass that back to the agent
else if (reviewAction === "feedback") {
const feedbackMsg = getFeedbackMsg(reviewData);
return new Command({
goto: "call_llm",
update: { messages: [feedbackMsg] },
});
}
}
有关更详细的示例,请参阅如何审查工具调用。
多轮对话¶
多轮对话涉及代理和人工之间的多次来回交互,这可以使代理以对话方式从人工那里收集更多信息。
这种设计模式在由多个代理组成的 LLM 应用程序中非常有用。一个或多个代理可能需要与人工进行多轮对话,其中人工在对话的不同阶段提供输入或反馈。为简单起见,下面的代理实现被图示为一个单节点,但实际上它可能是由多个节点组成的较大图的一部分,并且包括条件边。
在这种模式下,每个代理都有自己的人工节点用于收集用户输入。
这可以通过使用唯一名称命名人工节点(例如,“代理 1 的人工节点”、“代理 2 的人工节点”)或通过使用子图来实现,其中子图包含人工节点和代理节点。
import { interrupt } from "@langchain/langgraph";
function humanInput(state: typeof GraphAnnotation.State) {
const humanMessage = interrupt("human_input");
return {
messages: [
{
role: "human",
content: humanMessage
}
]
};
}
function agent(state: typeof GraphAnnotation.State) {
// Agent logic
// ...
}
const graph = graphBuilder
.addNode("human_input", humanInput)
.addEdge("human_input", "agent")
.compile({ checkpointer });
// After running the graph and hitting the interrupt, the graph will pause.
// Resume it with the human's input.
await graph.invoke(
new Command({ resume: "hello!" }),
threadConfig
);
在这种模式下,单个人工节点用于收集多个代理的用户输入。活动代理从状态确定,因此在收集人工输入后,图可以路由到正确的代理。
import { interrupt, Command, MessagesAnnotation } from "@langchain/langgraph";
function humanNode(state: typeof MessagesAnnotation.State): Command {
/**
* A node for collecting user input.
*/
const userInput = interrupt("Ready for user input.");
// Determine the **active agent** from the state, so
// we can route to the correct agent after collecting input.
// For example, add a field to the state or use the last active agent.
// or fill in `name` attribute of AI messages generated by the agents.
const activeAgent = ...;
return new Command({
goto: activeAgent,
update: {
messages: [{
role: "human",
content: userInput,
}]
}
});
}
有关更详细的示例,请参阅如何实现多轮对话。
验证人工输入¶
如果您需要在图本身内(而不是在客户端)验证人工提供的输入,则可以通过在单个节点内使用多个 interrupt 调用来实现。
import { interrupt } from "@langchain/langgraph";
function humanNode(state: typeof GraphAnnotation.State) {
/**
* Human node with validation.
*/
let question = "What is your age?";
while (true) {
const answer = interrupt(question);
// Validate answer, if the answer isn't valid ask for input again.
if (typeof answer !== "number" || answer < 0) {
question = `'${answer}' is not a valid age. What is your age?`;
continue;
} else {
// If the answer is valid, we can proceed.
break;
}
}
console.log(`The human in the loop is ${answer} years old.`);
return {
age: answer,
};
}
Command 原语¶
当使用 interrupt
函数时,图将在 interrupt 处暂停并等待用户输入。
可以使用 Command 原语恢复图执行,该原语可以通过 invoke
或 stream
方法传递。
Command
原语提供多个选项来控制和修改恢复期间的图状态
- 将值传递给
interrupt
:使用new Command({ resume: value })
向图提供数据,例如用户响应。执行从使用interrupt
的节点开头恢复,但是,这次interrupt(...)
调用将返回在new Command({ resume: value })
中传递的值,而不是暂停图。
// Resume graph execution with the user's input.
await graph.invoke(new Command({ resume: { age: "25" } }), threadConfig);
- 更新图状态:使用
Command({ goto: ..., update: ... })
修改图状态。请注意,恢复从使用interrupt
的节点开头开始。执行从使用interrupt
的节点开头恢复,但使用更新后的状态。
// Update the graph state and resume.
// You must provide a `resume` value if using an `interrupt`.
await graph.invoke(
new Command({ resume: "Let's go!!!", update: { foo: "bar" } }),
threadConfig
);
通过利用 Command
,您可以恢复图执行,处理用户输入,并动态调整图的状态。
与 invoke
一起使用¶
当您使用 stream
运行图时,您将收到一个 Interrupt
事件,让您知道 interrupt
已被触发。
invoke
不返回 interrupt 信息。要访问此信息,您必须使用 getState 方法 在调用 invoke
后检索图状态。
// Run the graph up to the interrupt
const result = await graph.invoke(inputs, threadConfig);
// Get the graph state to get interrupt information.
const state = await graph.getState(threadConfig);
// Print the state values
console.log(state.values);
// Print the pending tasks
console.log(state.tasks);
// Resume the graph with the user's input.
await graph.invoke(new Command({ resume: { age: "25" } }), threadConfig);
{
foo: "bar";
} // State values
[
{
id: "5d8ffc92-8011-0c9b-8b59-9d3545b7e553",
name: "node_foo",
path: ["__pregel_pull", "node_foo"],
error: null,
interrupts: [
{
value: "value_in_interrupt",
resumable: true,
ns: ["node_foo:5d8ffc92-8011-0c9b-8b59-9d3545b7e553"],
when: "during",
},
],
state: null,
result: null,
},
]; // Pending tasks. interrupts
如何从 interrupt 恢复工作?¶
使用 interrupt
的一个关键方面是了解恢复如何工作。当您在 interrupt
后恢复执行时,图执行从触发最后一个 interrupt
的图节点的开头开始。
从节点开头到 interrupt
的所有代码都将重新执行。
let counter = 0;
function node(state: State) {
// All the code from the beginning of the node to the interrupt will be re-executed
// when the graph resumes.
counter += 1;
console.log(`> Entered the node: ${counter} # of times`);
// Pause the graph and wait for user input.
const answer = interrupt();
console.log("The value of counter is:", counter);
// ...
}
在恢复图时,计数器将再次递增,从而产生以下输出
常见陷阱¶
副作用¶
将具有副作用的代码(例如 API 调用)放在 interrupt
之后,以避免重复,因为每次节点恢复时都会重新触发这些代码。
当节点从 interrupt
恢复时,此代码将再次重新执行 API 调用。如果 API 调用不是幂等的或者只是成本很高,这可能会有问题。
import { interrupt } from "@langchain/langgraph";
function humanNode(state: typeof GraphAnnotation.State) {
/**
* Human node with validation.
*/
const answer = interrupt(question);
return {
answer
};
}
function apiCallNode(state: typeof GraphAnnotation.State) {
apiCall(); // OK as it's in a separate node
}
作为函数调用的子图¶
当作为函数调用子图时,父图将从调用子图(以及触发 interrupt
的位置)的节点开头恢复执行。类似地,子图将从调用 interrupt()
函数的节点开头恢复。
例如,
async function nodeInParentGraph(state: typeof GraphAnnotation.State) {
someCode(); // <-- This will re-execute when the subgraph is resumed.
// Invoke a subgraph as a function.
// The subgraph contains an `interrupt` call.
const subgraphResult = await subgraph.invoke(someInput);
...
}
示例:父图和子图执行流程
假设我们有一个包含 3 个节点的父图
父图:node_1
→ node_2
(子图调用) → node_3
子图有 3 个节点,其中第二个节点包含 interrupt
子图:sub_node_1
→ sub_node_2
(interrupt
) → sub_node_3
当恢复图时,执行将按如下方式进行
- 跳过父图中的
node_1
(已执行,图状态已保存在快照中)。 - 从头重新执行父图中的
node_2
。 - 跳过子图中的
sub_node_1
(已执行,图状态已保存在快照中)。 - 从头重新执行子图中的
sub_node_2
。 - 继续
sub_node_3
和后续节点。
这是一个简短的示例代码,您可以使用它来了解子图如何与 interrupts 一起工作。它计算每个节点被输入的次数并打印计数。
import {
StateGraph,
START,
interrupt,
Command,
MemorySaver,
Annotation
} from "@langchain/langgraph";
const GraphAnnotation = Annotation.Root({
stateCounter: Annotation<number>({
reducer: (a, b) => a + b,
default: () => 0
})
})
let counterNodeInSubgraph = 0;
function nodeInSubgraph(state: typeof GraphAnnotation.State) {
counterNodeInSubgraph += 1; // This code will **NOT** run again!
console.log(`Entered 'nodeInSubgraph' a total of ${counterNodeInSubgraph} times`);
return {};
}
let counterHumanNode = 0;
async function humanNode(state: typeof GraphAnnotation.State) {
counterHumanNode += 1; // This code will run again!
console.log(`Entered humanNode in sub-graph a total of ${counterHumanNode} times`);
const answer = await interrupt("what is your name?");
console.log(`Got an answer of ${answer}`);
return {};
}
const checkpointer = new MemorySaver();
const subgraphBuilder = new StateGraph(GraphAnnotation)
.addNode("some_node", nodeInSubgraph)
.addNode("human_node", humanNode)
.addEdge(START, "some_node")
.addEdge("some_node", "human_node")
const subgraph = subgraphBuilder.compile({ checkpointer });
let counterParentNode = 0;
async function parentNode(state: typeof GraphAnnotation.State) {
counterParentNode += 1; // This code will run again on resuming!
console.log(`Entered 'parentNode' a total of ${counterParentNode} times`);
// Please note that we're intentionally incrementing the state counter
// in the graph state as well to demonstrate that the subgraph update
// of the same key will not conflict with the parent graph (until
const subgraphState = await subgraph.invoke(state);
return subgraphState;
}
const builder = new StateGraph(GraphAnnotation)
.addNode("parent_node", parentNode)
.addEdge(START, "parent_node")
// A checkpointer must be enabled for interrupts to work!
const graph = builder.compile({ checkpointer });
const config = {
configurable: {
thread_id: crypto.randomUUID(),
}
};
for await (const chunk of await graph.stream({ stateCounter: 1 }, config)) {
console.log(chunk);
}
console.log('--- Resuming ---');
for await (const chunk of await graph.stream(new Command({ resume: "35" }), config)) {
console.log(chunk);
}
这将打印出
--- First invocation ---
In parent node: { foo: 'bar' }
Entered 'parentNode' a total of 1 times
Entered 'nodeInSubgraph' a total of 1 times
Entered humanNode in sub-graph a total of 1 times
{ __interrupt__: [{ value: 'what is your name?', resumable: true, ns: ['parent_node:0b23d72f-aaba-0329-1a59-ca4f3c8bad3b', 'human_node:25df717c-cb80-57b0-7410-44e20aac8f3c'], when: 'during' }] }
--- Resuming ---
In parent node: { foo: 'bar' }
Entered 'parentNode' a total of 2 times
Entered humanNode in sub-graph a total of 2 times
Got an answer of 35
{ parent_node: null }
使用多个 interrupts¶
在单个节点中使用多个 interrupts 对于诸如验证人工输入之类的模式可能很有帮助。但是,如果在同一节点中使用多个 interrupts 处理不当,可能会导致意外行为。
当节点包含多个 interrupt 调用时,LangGraph 会保留特定于执行该节点的任务的恢复值列表。每当执行恢复时,它都从节点开头开始。对于遇到的每个 interrupt,LangGraph 都会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内 interrupt 调用的顺序至关重要。
为避免出现问题,请避免在执行之间动态更改节点的结构。这包括添加、删除或重新排序 interrupt 调用,因为此类更改可能会导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command.resume(...).update(SOME_STATE_MUTATION)
改变状态或依赖全局变量来动态修改节点的结构。
不正确代码的示例
import { v4 as uuidv4 } from "uuid";
import {
StateGraph,
MemorySaver,
START,
interrupt,
Command,
Annotation
} from "@langchain/langgraph";
const GraphAnnotation = Annotation.Root({
name: Annotation<string>(),
age: Annotation<string>()
});
function humanNode(state: typeof GraphAnnotation.State) {
let name;
if (!state.name) {
name = interrupt("what is your name?");
} else {
name = "N/A";
}
let age;
if (!state.age) {
age = interrupt("what is your age?");
} else {
age = "N/A";
}
console.log(`Name: ${name}. Age: ${age}`);
return {
age,
name,
};
}
const builder = new StateGraph(GraphAnnotation)
.addNode("human_node", humanNode);
.addEdge(START, "human_node");
// A checkpointer must be enabled for interrupts to work!
const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });
const config = {
configurable: {
thread_id: uuidv4(),
}
};
for await (const chunk of await graph.stream({ age: undefined, name: undefined }, config)) {
console.log(chunk);
}
for await (const chunk of await graph.stream(
new Command({ resume: "John", update: { name: "foo" } }),
config
)) {
console.log(chunk);
}