跳到内容

如何使用 NodeInterrupt 添加动态断点

注意

对于人工介入 (human-in-the-loop)工作流程,请使用新的interrupt()函数。有关使用interrupt的设计模式的更多信息,请查阅人工介入概念指南

前提条件

本指南假设您熟悉以下概念

人工介入 (Human-in-the-loop, HIL) 交互对于代理系统至关重要。断点是一种常见的 HIL 交互模式,允许图在特定步骤停止并寻求人工批准后再继续(例如,对于敏感操作)。

在 LangGraph 中,您可以在节点执行之前/之后添加断点。但通常情况下,根据某些条件从给定节点内部动态地中断图可能很有帮助。这样做时,提供有关中断发生原因的信息也可能很有帮助。

本指南展示了如何使用NodeInterrupt(一种可以在节点内部引发的特殊异常)动态中断图。让我们看看它的实际应用!

设置

首先,让我们安装所需的包

pip install -U langgraph

为 LangGraph 开发设置 LangSmith

注册 LangSmith,以便快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据调试、测试和监控您使用 LangGraph 构建的 LLM 应用程序——在此处阅读更多关于如何入门的信息here

定义图

API 参考: StateGraph | START | END | MemorySaver

from typing_extensions import TypedDict
from IPython.display import Image, display

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt


class State(TypedDict):
    input: str


def step_1(state: State) -> State:
    print("---Step 1---")
    return state


def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt
    # if the length of the input is longer than 5 characters
    if len(state["input"]) > 5:
        raise NodeInterrupt(
            f"Received input that is longer than 5 characters: {state['input']}"
        )

    print("---Step 2---")
    return state


def step_3(state: State) -> State:
    print("---Step 3---")
    return state


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

使用动态中断运行图

首先,让我们使用一个长度小于等于 5 个字符的输入来运行图。这应该安全地忽略我们定义的中断条件,并在图执行结束时返回原始输入。

initial_input = {"input": "hello"}
thread_config = {"configurable": {"thread_id": "1"}}

for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello'}
---Step 1---
{'input': 'hello'}
---Step 2---
{'input': 'hello'}
---Step 3---
{'input': 'hello'}
如果我们此时检查图,可以看到没有剩余任务需要运行,并且图确实已完成执行。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
()
()
现在,让我们使用一个长度大于 5 个字符的输入来运行图。这应该会触发我们在step_2节点内部通过引发NodeInterrupt错误定义的动态中断。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "2"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
我们可以看到图在执行step_2时停止了。如果此时检查图状态,我们可以看到关于接下来将执行哪个节点(step_2)、哪个节点引发了中断(也是step_2)以及关于中断的额外信息。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)
如果我们尝试从断点处恢复图,由于输入和图状态没有改变,图会再次中断。

# NOTE: to resume the graph from a dynamic interrupt we use the same syntax as with regular interrupts -- we pass None as the input
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)

更新图状态

要解决这个问题,我们可以做几件事。

首先,我们可以像开始时那样,用一个更短的输入在不同的线程上简单地运行图。或者,如果想从断点处恢复图的执行,我们可以更新状态,使其输入长度小于 5 个字符(这是我们中断的条件)。

# NOTE: this update will be applied as of the last successful node before the interrupt, i.e. `step_1`, right before the node with an interrupt
graph.update_state(config=thread_config, values={"input": "foo"})
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 2---
{'input': 'foo'}
---Step 3---
{'input': 'foo'}
()
{'input': 'foo'}
您还可以作为节点step_2(被中断的节点)更新状态,这将完全跳过该节点。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "3"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

# NOTE: this update will skip the node `step_2` altogether
graph.update_state(config=thread_config, values=None, as_node="step_2")
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 3---
{'input': 'hello world'}
()
{'input': 'hello world'}

评论