跳到内容

启用人工干预

要审查、编辑和批准智能体或工作流中的工具调用,请使用中断来暂停图并等待人工输入。中断使用 LangGraph 的持久化层,它会保存图的状态,从而无限期地暂停图的执行,直到您恢复为止。

信息

有关“人在环路”(human-in-the-loop)工作流的更多信息,请参阅“人在环路”概念指南

使用 interrupt 暂停

动态中断(也称为动态断点)是根据图的当前状态触发的。您可以通过在适当的位置调用 interrupt 函数来设置动态中断。图将暂停,从而允许人工干预,然后用他们的输入恢复图的执行。这对于批准、编辑或收集额外上下文等任务非常有用。

注意

自 v1.0 起,interrupt 是推荐的暂停图的方式。NodeInterrupt 已被弃用,并将在 v2.0 中移除。

要在您的图中使用 interrupt,您需要:

  1. 指定一个检查点(checkpointer)以在每一步后保存图的状态。
  2. 在适当的位置调用 interrupt()。示例请参见常见模式部分。
  3. 使用线程 ID 运行图,直到遇到 interrupt
  4. 使用 invoke/stream 恢复执行(请参阅 Command 原语)。

API 参考:interrupt | Command

from langgraph.types import interrupt, Command

def human_node(state: State):
    value = interrupt( # (1)!
        {
            "text_to_revise": state["some_text"] # (2)!
        }
    )
    return {
        "some_text": value # (3)!
    }


graph = graph_builder.compile(checkpointer=checkpointer) # (4)!

# Run the graph until the interrupt is hit.
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "original text"}, config=config) # (5)!
print(result['__interrupt__']) # (6)!
# > [
# >    Interrupt(
# >       value={'text_to_revise': 'original text'},
# >       resumable=True,
# >       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# >    )
# > ]

print(graph.invoke(Command(resume="Edited text"), config=config)) # (7)!
# > {'some_text': 'Edited text'}
  1. interrupt(...) 会在 human_node 暂停执行,将给定的有效负载呈现给人工。
  2. 任何可 JSON 序列化的值都可以传递给 interrupt 函数。这里是一个包含要修改文本的字典。
  3. 一旦恢复,interrupt(...) 的返回值是人工提供的输入,用于更新状态。
  4. 需要检查点来持久化图的状态。在生产环境中,这应该是持久的(例如,由数据库支持)。
  5. 图表以某些初始状态被调用。
  6. 当图遇到中断时,它会返回一个包含有效负载和元数据的 Interrupt 对象。
  7. 图表通过 Command(resume=...) 恢复,注入人工输入并继续执行。
扩展示例:使用 interrupt
from typing import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph

from langgraph.types import interrupt, Command


class State(TypedDict):
    some_text: str


def human_node(state: State):
    value = interrupt(  # (1)!
        {
            "text_to_revise": state["some_text"]  # (2)!
        }
    )
    return {
        "some_text": value  # (3)!
    }


# Build the graph
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
checkpointer = InMemorySaver()  # (4)!
graph = graph_builder.compile(checkpointer=checkpointer)
# Pass a thread ID to the graph to run it.
config = {"configurable": {"thread_id": uuid.uuid4()}}
# Run the graph until the interrupt is hit.
result = graph.invoke({"some_text": "original text"}, config=config)  # (5)!

print(result['__interrupt__']) # (6)!
# > [
# >    Interrupt(
# >       value={'text_to_revise': 'original text'},
# >       resumable=True,
# >       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# >    )
# > ]
print(result["__interrupt__"])  # (6)!
# > [Interrupt(value={'text_to_revise': 'original text'}, id='6d7c4048049254c83195429a3659661d')]

print(graph.invoke(Command(resume="Edited text"), config=config)) # (7)!
# > {'some_text': 'Edited text'}
  1. interrupt(...) 会在 human_node 暂停执行,将给定的有效负载呈现给人工。
  2. 任何可 JSON 序列化的值都可以传递给 interrupt 函数。这里是一个包含要修改文本的字典。
  3. 一旦恢复,interrupt(...) 的返回值是人工提供的输入,用于更新状态。
  4. 需要检查点来持久化图的状态。在生产环境中,这应该是持久的(例如,由数据库支持)。
  5. 图表以某些初始状态被调用。
  6. 当图遇到中断时,它会返回一个包含有效负载和元数据的 Interrupt 对象。
  7. 图表通过 Command(resume=...) 恢复,注入人工输入并继续执行。

0.4.0 版本新增

如果图被中断,__interrupt__ 是一个在运行图时将返回的特殊键。在 0.4.0 版本中已添加对 invokeainvoke__interrupt__ 的支持。如果您使用的是旧版本,只有在使用 streamastream 时才能在结果中看到 __interrupt__。您也可以使用 graph.get_state(thread_id) 来获取中断值。

警告

在开发者体验方面,中断类似于 Python 的 input() 函数,但它们不会自动从中断点恢复执行。相反,它们会重新运行使用了中断的整个节点。因此,中断通常最好放在节点的开头或专用节点中。

使用 Command 原语恢复

警告

interrupt 恢复与 Python 的 input() 函数不同,后者会从调用 input() 函数的确切点恢复执行。

当在图中使用 interrupt 函数时,执行会在此处暂停并等待用户输入。

要恢复执行,请使用 Command 原语,它可以通过 invokestream 方法提供。图会从最初调用 interrupt(...) 的节点的开头恢复执行。这一次,interrupt 函数将返回在 Command(resume=value) 中提供的值,而不会再次暂停。从节点开头到 interrupt 的所有代码都将被重新执行。

# Resume graph execution by providing the user's input.
graph.invoke(Command(resume={"age": "25"}), thread_config)

一次调用恢复多个中断

当带有中断条件的节点并行运行时,任务队列中可能会有多个中断。例如,下面的图有两个并行运行的节点需要人工输入:

image

一旦您的图被中断并停滞,您可以使用 Command.resume 一次性恢复所有中断,传入一个将中断 ID 映射到恢复值的字典。

API 参考:RunnableConfig | InMemorySaver | START | StateGraph | interrupt | Command

from typing import TypedDict
import uuid
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command


class State(TypedDict):
    text_1: str
    text_2: str


def human_node_1(state: State):
    value = interrupt({"text_to_revise": state["text_1"]})
    return {"text_1": value}


def human_node_2(state: State):
    value = interrupt({"text_to_revise": state["text_2"]})
    return {"text_2": value}


graph_builder = StateGraph(State)
graph_builder.add_node("human_node_1", human_node_1)
graph_builder.add_node("human_node_2", human_node_2)

# Add both nodes in parallel from START
graph_builder.add_edge(START, "human_node_1")
graph_builder.add_edge(START, "human_node_2")

checkpointer = InMemorySaver()
graph = graph_builder.compile(checkpointer=checkpointer)

thread_id = str(uuid.uuid4())
config: RunnableConfig = {"configurable": {"thread_id": thread_id}}
result = graph.invoke(
    {"text_1": "original text 1", "text_2": "original text 2"}, config=config
)

# Resume with mapping of interrupt IDs to values
resume_map = {
    i.interrupt_id: f"human input for prompt {i.value}"
    for i in parent.get_state(thread_config).interrupts
}
print(graph.invoke(Command(resume=resume_map), config=config))
# > {'text_1': 'edited text for original text 1', 'text_2': 'edited text for original text 2'}

常见模式

下面我们展示了可以使用 interruptCommand 实现的不同设计模式。

批准或拒绝

image

根据人工的批准或拒绝,图可以继续执行操作或采取替代路径。

在关键步骤(如 API 调用)之前暂停图,以审查和批准操作。如果操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。

API 参考:interrupt | Command

from typing import Literal
from langgraph.types import interrupt, Command

def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
    is_approved = interrupt(
        {
            "question": "Is this correct?",
            # Surface the output that should be
            # reviewed and approved by the human.
            "llm_output": state["llm_output"]
        }
    )

    if is_approved:
        return Command(goto="some_node")
    else:
        return Command(goto="another_node")

# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)

# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with either an approval or rejection.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)
扩展示例:使用中断批准或拒绝
from typing import Literal, TypedDict
import uuid

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

# Define the shared graph state
class State(TypedDict):
    llm_output: str
    decision: str

# Simulate an LLM output node
def generate_llm_output(state: State) -> State:
    return {"llm_output": "This is the generated output."}

# Human approval node
def human_approval(state: State) -> Command[Literal["approved_path", "rejected_path"]]:
    decision = interrupt({
        "question": "Do you approve the following output?",
        "llm_output": state["llm_output"]
    })

    if decision == "approve":
        return Command(goto="approved_path", update={"decision": "approved"})
    else:
        return Command(goto="rejected_path", update={"decision": "rejected"})

# Next steps after approval
def approved_node(state: State) -> State:
    print("✅ Approved path taken.")
    return state

# Alternative path after rejection
def rejected_node(state: State) -> State:
    print("❌ Rejected path taken.")
    return state

# Build the graph
builder = StateGraph(State)
builder.add_node("generate_llm_output", generate_llm_output)
builder.add_node("human_approval", human_approval)
builder.add_node("approved_path", approved_node)
builder.add_node("rejected_path", rejected_node)

builder.set_entry_point("generate_llm_output")
builder.add_edge("generate_llm_output", "human_approval")
builder.add_edge("approved_path", END)
builder.add_edge("rejected_path", END)

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Run until interrupt
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])
# Output:
# Interrupt(value={'question': 'Do you approve the following output?', 'llm_output': 'This is the generated output.'}, ...)

# Simulate resuming with human input
# To test rejection, replace resume="approve" with resume="reject"
final_result = graph.invoke(Command(resume="approve"), config=config)
print(final_result)

审查和编辑状态

image

人工可以审查和编辑图的状态。这对于纠正错误或用附加信息更新状态很有用。

API 参考:interrupt

from langgraph.types import interrupt

def human_editing(state: State):
    ...
    result = interrupt(
        # Interrupt information to surface to the client.
        # Can be any JSON serializable value.
        {
            "task": "Review the output from the LLM and make any necessary edits.",
            "llm_generated_summary": state["llm_generated_summary"]
        }
    )

    # Update the state with the edited text
    return {
        "llm_generated_summary": result["edited_text"]
    }

# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)

...

# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with the edited text.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
    Command(resume={"edited_text": "The edited text"}),
    config=thread_config
)
扩展示例:使用中断编辑状态
from typing import TypedDict
import uuid

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

# Define the graph state
class State(TypedDict):
    summary: str

# Simulate an LLM summary generation
def generate_summary(state: State) -> State:
    return {
        "summary": "The cat sat on the mat and looked at the stars."
    }

# Human editing node
def human_review_edit(state: State) -> State:
    result = interrupt({
        "task": "Please review and edit the generated summary if necessary.",
        "generated_summary": state["summary"]
    })
    return {
        "summary": result["edited_summary"]
    }

# Simulate downstream use of the edited summary
def downstream_use(state: State) -> State:
    print(f"✅ Using edited summary: {state['summary']}")
    return state

# Build the graph
builder = StateGraph(State)
builder.add_node("generate_summary", generate_summary)
builder.add_node("human_review_edit", human_review_edit)
builder.add_node("downstream_use", downstream_use)

builder.set_entry_point("generate_summary")
builder.add_edge("generate_summary", "human_review_edit")
builder.add_edge("human_review_edit", "downstream_use")
builder.add_edge("downstream_use", END)

# Set up in-memory checkpointing for interrupt support
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Invoke the graph until it hits the interrupt
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)

# Output interrupt payload
print(result["__interrupt__"])
# Example output:
# > [
# >     Interrupt(
# >         value={
# >             'task': 'Please review and edit the generated summary if necessary.',
# >             'generated_summary': 'The cat sat on the mat and looked at the stars.'
# >         },
# >         id='...'
# >     )
# > ]

# Resume the graph with human-edited input
edited_summary = "The cat lay on the rug, gazing peacefully at the night sky."
resumed_result = graph.invoke(
    Command(resume={"edited_summary": edited_summary}),
    config=config
)
print(resumed_result)

审查工具调用

image

人工可以在继续之前审查和编辑来自 LLM 的输出。这在 LLM 请求的工具调用可能敏感或需要人工监督的应用中尤其关键。

为工具添加人工审批步骤:

  1. 在工具中使用 interrupt() 暂停执行。
  2. 使用 Command 根据人工输入继续执行。

API 参考:InMemorySaver | interrupt | create_react_agent

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
from langgraph.prebuilt import create_react_agent

# An example of a sensitive tool that requires human review / approval
def book_hotel(hotel_name: str):
    """Book a hotel"""
    response = interrupt(  # (1)!
        f"Trying to call `book_hotel` with args {{'hotel_name': {hotel_name}}}. "
        "Please approve or suggest edits."
    )
    if response["type"] == "accept":
        pass
    elif response["type"] == "edit":
        hotel_name = response["args"]["hotel_name"]
    else:
        raise ValueError(f"Unknown response type: {response['type']}")
    return f"Successfully booked a stay at {hotel_name}."

checkpointer = InMemorySaver() # (2)!

agent = create_react_agent(
    model="anthropic:claude-3-5-sonnet-latest",
    tools=[book_hotel],
    checkpointer=checkpointer, # (3)!
)
  1. interrupt 函数在特定节点处暂停智能体图。在这种情况下,我们在工具函数的开头调用 interrupt(),这会暂停执行该工具的节点处的图。interrupt() 内部的信息(例如,工具调用)可以呈现给人工,然后图可以用用户输入(工具调用的批准、编辑或反馈)来恢复。
  2. InMemorySaver 用于在工具调用循环的每一步存储智能体状态。这启用了短期记忆“人在环路”功能。在此示例中,我们使用 InMemorySaver 将智能体状态存储在内存中。在生产应用程序中,智能体状态将存储在数据库中。
  3. 使用 checkpointer 初始化智能体。

使用 stream() 方法运行智能体,传入 config 对象以指定线程 ID。这允许智能体在将来的调用中恢复相同的对话。

config = {
   "configurable": {
      "thread_id": "1"
   }
}

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "book a stay at McKittrick hotel"}]},
    config
):
    print(chunk)
    print("\n")

您应该会看到智能体运行直到遇到 interrupt() 调用,此时它会暂停并等待人工输入。

使用 Command 根据人工输入恢复智能体。

API 参考:Command

from langgraph.types import Command

for chunk in agent.stream(
    Command(resume={"type": "accept"}),  # (1)!
    # Command(resume={"type": "edit", "args": {"hotel_name": "McKittrick Hotel"}}),
    config
):
    print(chunk)
    print("\n")
  1. interrupt 函数Command 对象结合使用,以使用人工提供的值恢复图。

为任何工具添加中断

您可以创建一个包装器,为*任何*工具添加中断。下面的示例提供了一个与 Agent Inbox UIAgent Chat UI 兼容的参考实现。

为任何工具添加“人在环路”的包装器
from typing import Callable
from langchain_core.tools import BaseTool, tool as create_tool
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterruptConfig, HumanInterrupt

def add_human_in_the_loop(
    tool: Callable | BaseTool,
    *,
    interrupt_config: HumanInterruptConfig = None,
) -> BaseTool:
    """Wrap a tool to support human-in-the-loop review."""
    if not isinstance(tool, BaseTool):
        tool = create_tool(tool)

    if interrupt_config is None:
        interrupt_config = {
            "allow_accept": True,
            "allow_edit": True,
            "allow_respond": True,
        }

    @create_tool(  # (1)!
        tool.name,
        description=tool.description,
        args_schema=tool.args_schema
    )
    def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
        request: HumanInterrupt = {
            "action_request": {
                "action": tool.name,
                "args": tool_input
            },
            "config": interrupt_config,
            "description": "Please review the tool call"
        }
        response = interrupt([request])[0]  # (2)!
        # approve the tool call
        if response["type"] == "accept":
            tool_response = tool.invoke(tool_input, config)
        # update tool call args
        elif response["type"] == "edit":
            tool_input = response["args"]["args"]
            tool_response = tool.invoke(tool_input, config)
        # respond to the LLM with user feedback
        elif response["type"] == "response":
            user_feedback = response["args"]
            tool_response = user_feedback
        else:
            raise ValueError(f"Unsupported interrupt response type: {response['type']}")

        return tool_response

    return call_tool_with_interrupt
  1. 这个包装器创建了一个新工具,它在执行被包装的工具**之前**调用 interrupt()
  2. interrupt() 正在使用 Agent Inbox UI 所期望的特殊输入和输出格式: - 一个 HumanInterrupt 对象列表被发送到 AgentInbox 以向最终用户呈现中断信息 - 恢复值由 AgentInbox 以列表形式提供(即 Command(resume=[...])

您可以使用此包装器为任何工具添加 interrupt(),而无需在工具*内部*添加它。

API 参考:InMemorySaver | create_react_agent

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent

checkpointer = InMemorySaver()

def book_hotel(hotel_name: str):
   """Book a hotel"""
   return f"Successfully booked a stay at {hotel_name}."


agent = create_react_agent(
    model="anthropic:claude-3-5-sonnet-latest",
    tools=[
        add_human_in_the_loop(book_hotel), # (1)!
    ],
    checkpointer=checkpointer,
)

config = {"configurable": {"thread_id": "1"}}

# Run the agent
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "book a stay at McKittrick hotel"}]},
    config
):
    print(chunk)
    print("\n")
  1. add_human_in_the_loop 包装器用于向工具添加 interrupt()。这允许智能体在继续进行工具调用之前暂停执行并等待人工输入。

您应该会看到智能体运行直到遇到 interrupt() 调用,此时它会暂停并等待人工输入。

使用 Command 根据人工输入恢复智能体。

API 参考:Command

from langgraph.types import Command

for chunk in agent.stream(
    Command(resume=[{"type": "accept"}]),
    # Command(resume=[{"type": "edit", "args": {"args": {"hotel_name": "McKittrick Hotel"}}}]),
    config
):
    print(chunk)
    print("\n")

验证人工输入

如果您需要在图本身内部(而不是在客户端)验证人工提供的输入,您可以通过在单个节点内使用多个中断调用来实现这一点。

API 参考:interrupt

from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""
    question = "What is your age?"

    while True:
        answer = interrupt(question)

        # Validate answer, if the answer isn't valid ask for input again.
        if not isinstance(answer, int) or answer < 0:
            question = f"'{answer} is not a valid age. What is your age?"
            answer = None
            continue
        else:
            # If the answer is valid, we can proceed.
            break

    print(f"The human in the loop is {answer} years old.")
    return {
        "age": answer
    }
扩展示例:验证用户输入
from typing import TypedDict
import uuid

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

# Define graph state
class State(TypedDict):
    age: int

# Node that asks for human input and validates it
def get_valid_age(state: State) -> State:
    prompt = "Please enter your age (must be a non-negative integer)."

    while True:
        user_input = interrupt(prompt)

        # Validate the input
        try:
            age = int(user_input)
            if age < 0:
                raise ValueError("Age must be non-negative.")
            break  # Valid input received
        except (ValueError, TypeError):
            prompt = f"'{user_input}' is not valid. Please enter a non-negative integer for age."

    return {"age": age}

# Node that uses the valid input
def report_age(state: State) -> State:
    print(f"✅ Human is {state['age']} years old.")
    return state

# Build the graph
builder = StateGraph(State)
builder.add_node("get_valid_age", get_valid_age)
builder.add_node("report_age", report_age)

builder.set_entry_point("get_valid_age")
builder.add_edge("get_valid_age", "report_age")
builder.add_edge("report_age", END)

# Create the graph with a memory checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Run the graph until the first interrupt
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])  # First prompt: "Please enter your age..."

# Simulate an invalid input (e.g., string instead of integer)
result = graph.invoke(Command(resume="not a number"), config=config)
print(result["__interrupt__"])  # Follow-up prompt with validation message

# Simulate a second invalid input (e.g., negative number)
result = graph.invoke(Command(resume="-10"), config=config)
print(result["__interrupt__"])  # Another retry

# Provide valid input
final_result = graph.invoke(Command(resume="25"), config=config)
print(final_result)  # Should include the valid age

使用中断进行调试

要调试和测试图,请使用静态中断(也称为静态断点)来逐个节点地逐步执行图,或在特定节点处暂停图的执行。静态中断在节点执行之前或之后定义的点触发。您可以在编译时或运行时通过指定 interrupt_beforeinterrupt_after 来设置静态中断。

警告

静态中断**不**推荐用于“人在环路”工作流。请改用动态中断

graph = graph_builder.compile( # (1)!
    interrupt_before=["node_a"], # (2)!
    interrupt_after=["node_b", "node_c"], # (3)!
    checkpointer=checkpointer, # (4)!
)

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Run the graph until the breakpoint
graph.invoke(inputs, config=thread_config) # (5)!

# Resume the graph
graph.invoke(None, config=thread_config) # (6)!
  1. 断点是在 compile 时设置的。
  2. interrupt_before 指定了在节点执行前应该暂停执行的节点。
  3. interrupt_after 指定了在节点执行后应该暂停执行的节点。
  4. 需要一个检查点来启用断点。
  5. 图表运行直到遇到第一个断点。
  6. 通过为输入传入 None 来恢复图表。这将使图表运行直到遇到下一个断点。
graph.invoke( # (1)!
    inputs,
    interrupt_before=["node_a"], # (2)!
    interrupt_after=["node_b", "node_c"] # (3)!
    config={
        "configurable": {"thread_id": "some_thread"}
    },
)

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Run the graph until the breakpoint
graph.invoke(inputs, config=config) # (4)!

# Resume the graph
graph.invoke(None, config=config) # (5)!
  1. graph.invoke 使用 interrupt_beforeinterrupt_after 参数调用。这是一个运行时配置,可以为每次调用更改。
  2. interrupt_before 指定了在节点执行前应该暂停执行的节点。
  3. interrupt_after 指定了在节点执行后应该暂停执行的节点。
  4. 图表运行直到遇到第一个断点。
  5. 通过为输入传入 None 来恢复图表。这将使图表运行直到遇到下一个断点。

注意

您不能在运行时为**子图**设置静态断点。如果您有子图,则必须在编译时设置断点。

设置静态断点
from IPython.display import Image, display
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    input: str


def step_1(state):
    print("---Step 1---")
    pass


def step_2(state):
    print("---Step 2---")
    pass


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up a checkpointer
checkpointer = InMemorySaver() # (1)!

graph = builder.compile(
    checkpointer=checkpointer, # (2)!
    interrupt_before=["step_3"] # (3)!
)

# View
display(Image(graph.get_graph().draw_mermaid_png()))


# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    print(event)

# This will run until the breakpoint
# You can get the state of the graph at this point
print(graph.get_state(config))

# You can continue the graph execution by passing in `None` for the input
for event in graph.stream(None, thread, stream_mode="values"):
    print(event)

在 LangGraph Studio 中使用静态中断

您可以使用 LangGraph Studio 来调试您的图。您可以在 UI 中设置静态断点,然后运行图。您还可以使用 UI 在执行的任何点检查图的状态。

image

LangGraph Studio 对使用 langgraph dev 本地部署的应用程序是免费的。

使用中断进行调试

要调试和测试图,请使用静态中断(也称为静态断点)来逐个节点地逐步执行图,或在特定节点处暂停图的执行。静态中断在节点执行之前或之后定义的点触发。您可以在编译时或运行时通过指定 interrupt_beforeinterrupt_after 来设置静态中断。

警告

静态中断**不**推荐用于“人在环路”工作流。请改用动态中断

graph = graph_builder.compile( # (1)!
    interrupt_before=["node_a"], # (2)!
    interrupt_after=["node_b", "node_c"], # (3)!
    checkpointer=checkpointer, # (4)!
)

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Run the graph until the breakpoint
graph.invoke(inputs, config=thread_config) # (5)!

# Resume the graph
graph.invoke(None, config=thread_config) # (6)!
  1. 断点是在 compile 时设置的。
  2. interrupt_before 指定了在节点执行前应该暂停执行的节点。
  3. interrupt_after 指定了在节点执行后应该暂停执行的节点。
  4. 需要一个检查点来启用断点。
  5. 图表运行直到遇到第一个断点。
  6. 通过为输入传入 None 来恢复图表。这将使图表运行直到遇到下一个断点。
graph.invoke( # (1)!
    inputs,
    interrupt_before=["node_a"], # (2)!
    interrupt_after=["node_b", "node_c"] # (3)!
    config={
        "configurable": {"thread_id": "some_thread"}
    },
)

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Run the graph until the breakpoint
graph.invoke(inputs, config=config) # (4)!

# Resume the graph
graph.invoke(None, config=config) # (5)!
  1. graph.invoke 使用 interrupt_beforeinterrupt_after 参数调用。这是一个运行时配置,可以为每次调用更改。
  2. interrupt_before 指定了在节点执行前应该暂停执行的节点。
  3. interrupt_after 指定了在节点执行后应该暂停执行的节点。
  4. 图表运行直到遇到第一个断点。
  5. 通过为输入传入 None 来恢复图表。这将使图表运行直到遇到下一个断点。

注意

您不能在运行时为**子图**设置静态断点。如果您有子图,则必须在编译时设置断点。

设置静态断点
from IPython.display import Image, display
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    input: str


def step_1(state):
    print("---Step 1---")
    pass


def step_2(state):
    print("---Step 2---")
    pass


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up a checkpointer
checkpointer = InMemorySaver() # (1)!

graph = builder.compile(
    checkpointer=checkpointer, # (2)!
    interrupt_before=["step_3"] # (3)!
)

# View
display(Image(graph.get_graph().draw_mermaid_png()))


# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    print(event)

# This will run until the breakpoint
# You can get the state of the graph at this point
print(graph.get_state(config))

# You can continue the graph execution by passing in `None` for the input
for event in graph.stream(None, thread, stream_mode="values"):
    print(event)

在 LangGraph Studio 中使用静态中断

您可以使用 LangGraph Studio 来调试您的图。您可以在 UI 中设置静态断点,然后运行图。您还可以使用 UI 在执行的任何点检查图的状态。

image

LangGraph Studio 对使用 langgraph dev 本地部署的应用程序是免费的。

注意事项

在使用“人在环路”时,有一些注意事项需要牢记。

与有副作用的代码一起使用

将具有副作用的代码(例如 API 调用)放在 interrupt 之后或单独的节点中,以避免重复,因为每次恢复节点时都会重新触发这些代码。

from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""

    answer = interrupt(question)

    api_call(answer) # OK as it's after the interrupt
from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""

    answer = interrupt(question)

    return {
        "answer": answer
    }

def api_call_node(state: State):
    api_call(...) # OK as it's in a separate node

与作为函数调用的子图一起使用

当将子图作为函数调用时,父图将从触发 interrupt 的子图被调用的**节点的开头**恢复执行。类似地,**子图**将从调用 interrupt() 函数的**节点的开头**恢复。

def node_in_parent_graph(state: State):
    some_code()  # <-- This will re-execute when the subgraph is resumed.
    # Invoke a subgraph as a function.
    # The subgraph contains an `interrupt` call.
    subgraph_result = subgraph.invoke(some_input)
    ...
扩展示例:父图和子图的执行流程

假设我们有一个包含 3 个节点的父图:

父图: node_1node_2 (子图调用) → node_3

子图有 3 个节点,其中第二个节点包含一个 interrupt

子图: sub_node_1sub_node_2 (interrupt) → sub_node_3

当恢复图时,执行将按以下方式进行:

  1. 在父图中**跳过 node_1**(已执行,图状态已保存在快照中)。
  2. 从头开始**重新执行父图中的 node_2**。
  3. 在子图中**跳过 sub_node_1**(已执行,图状态已保存在快照中)。
  4. 从头开始**重新执行子图中的 sub_node_2**。
  5. 继续执行 sub_node_3 及后续节点。

这是一个简化的示例代码,您可以用它来理解子图如何与中断一起工作。它会计算每个节点被进入的次数并打印计数。

import uuid
from typing import TypedDict

from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver


class State(TypedDict):
    """The graph state."""
    state_counter: int


counter_node_in_subgraph = 0

def node_in_subgraph(state: State):
    """A node in the sub-graph."""
    global counter_node_in_subgraph
    counter_node_in_subgraph += 1  # This code will **NOT** run again!
    print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")

counter_human_node = 0

def human_node(state: State):
    global counter_human_node
    counter_human_node += 1 # This code will run again!
    print(f"Entered human_node in sub-graph a total of {counter_human_node} times")
    answer = interrupt("what is your name?")
    print(f"Got an answer of {answer}")


checkpointer = InMemorySaver()

subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)


counter_parent_node = 0

def parent_node(state: State):
    """This parent node will invoke the subgraph."""
    global counter_parent_node

    counter_parent_node += 1 # This code will run again on resuming!
    print(f"Entered `parent_node` a total of {counter_parent_node} times")

    # Please note that we're intentionally incrementing the state counter
    # in the graph state as well to demonstrate that the subgraph update
    # of the same key will not conflict with the parent graph (until
    subgraph_state = subgraph.invoke(state)
    return subgraph_state


builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")

# A checkpointer must be enabled for interrupts to work!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
      "thread_id": uuid.uuid4(),
    }
}

for chunk in graph.stream({"state_counter": 1}, config):
    print(chunk)

print('--- Resuming ---')

for chunk in graph.stream(Command(resume="35"), config):
    print(chunk)

这将打印出:

Entered `parent_node` a total of 1 times
Entered `node_in_subgraph` a total of 1 times
Entered human_node in sub-graph a total of 1 times
{'__interrupt__': (Interrupt(value='what is your name?', id='...'),)}
--- Resuming ---
Entered `parent_node` a total of 2 times
Entered human_node in sub-graph a total of 2 times
Got an answer of 35
{'parent_node': {'state_counter': 1}}

在单个节点中使用多个中断

在**单个**节点内使用多个中断对于像验证人工输入这样的模式可能很有帮助。但是,如果不小心处理,在同一节点中使用多个中断可能会导致意外行为。

当一个节点包含多个中断调用时,LangGraph 会为执行该节点的任务维护一个恢复值列表。每当执行恢复时,它都从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是**严格基于索引的**,因此节点内中断调用的顺序至关重要。

为避免问题,请不要在执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为这些更改可能导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 改变状态或依赖全局变量来动态修改节点结构。

扩展示例:引入非确定性的不正确代码
import uuid
from typing import TypedDict, Optional

from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver


class State(TypedDict):
    """The graph state."""

    age: Optional[str]
    name: Optional[str]


def human_node(state: State):
    if not state.get('name'):
        name = interrupt("what is your name?")
    else:
        name = "N/A"

    if not state.get('age'):
        age = interrupt("what is your age?")
    else:
        age = "N/A"

    print(f"Name: {name}. Age: {age}")

    return {
        "age": age,
        "name": name,
    }


builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")

# A checkpointer must be enabled for interrupts to work!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
        "thread_id": uuid.uuid4(),
    }
}

for chunk in graph.stream({"age": None, "name": None}, config):
    print(chunk)

for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
    print(chunk)
{'__interrupt__': (Interrupt(value='what is your name?', id='...'),)}
Name: N/A. Age: John
{'human_node': {'age': 'John', 'name': 'N/A'}}