跳到内容

人机环路

本指南使用新的 interrupt 函数。

从 LangGraph 0.2.57 版本开始,推荐使用 interrupt 函数 来设置断点,因为它简化了人机环路模式。

如果您正在寻找先前版本的概念指南,该指南依赖于静态断点和 NodeInterrupt 异常,请点击此处

人机环路(或“人在环路中”)工作流程将人工输入集成到自动化流程中,从而允许在关键阶段进行决策、验证或更正。这在基于 LLM 的应用程序中尤其有用,因为底层模型可能会产生偶尔的不准确性。在低容错率的场景(如合规性、决策制定或内容生成)中,人工参与通过支持审查、更正或覆盖模型输出,确保了可靠性。

用例

人机环路工作流程在基于 LLM 的应用程序中的主要用例包括

  1. 🛠️ 审查工具调用:人工可以在工具执行之前审查、编辑或批准 LLM 请求的工具调用。
  2. ✅ 验证 LLM 输出:人工可以审查、编辑或批准 LLM 生成的内容。
  3. 💡 提供上下文:使 LLM 能够显式请求人工输入以进行澄清或补充细节,或支持多轮对话。

interrupt

LangGraph 中的 interrupt 函数 通过在特定节点暂停图的执行,向人工呈现信息,并使用他们的输入恢复图的执行,从而实现人机环路工作流程。此函数对于批准、编辑或收集额外输入等任务非常有用。interrupt 函数Command 对象结合使用,以使用人工提供的值恢复图的执行。

from langgraph.types import interrupt

def human_node(state: State):
    value = interrupt(
        # Any JSON serializable value to surface to the human.
        # For example, a question or a piece of text or a set of keys in the state
       {
          "text_to_revise": state["some_text"]
       }
    )
    # Update the state with the human's input or route the graph based on the input.
    return {
        "some_text": value
    }

graph = graph_builder.compile(
    checkpointer=checkpointer # Required for `interrupt` to work
)

# Run the graph until the interrupt
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(some_input, config=thread_config)

# Resume the graph with the human's input
graph.invoke(Command(resume=value_from_human), config=thread_config)

API 参考: interrupt

{'some_text': 'Edited text'}

警告

中断功能强大且符合人体工程学。但是,虽然它们在开发者体验方面可能类似于 Python 的 input() 函数,但重要的是要注意,它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。因此,中断通常最好放置在节点的开头或专用节点中。请阅读从中断处恢复部分以了解更多详情。

完整代码

这是一个如何在图中使用 interrupt 的完整示例,如果您想查看实际代码。

from typing import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command

class State(TypedDict):
   """The graph state."""
   some_text: str

def human_node(state: State):
   value = interrupt(
      # Any JSON serializable value to surface to the human.
      # For example, a question or a piece of text or a set of keys in the state
      {
         "text_to_revise": state["some_text"]
      }
   )
   return {
      # Update the state with the human's input
      "some_text": value
   }


# Build the graph
graph_builder = StateGraph(State)
# Add the human-node to the graph
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")

# A checkpointer is required for `interrupt` to work.
checkpointer = MemorySaver()
graph = graph_builder.compile(
   checkpointer=checkpointer
)

# Pass a thread ID to the graph to run it.
thread_config = {"configurable": {"thread_id": uuid.uuid4()}}

# Using stream() to directly surface the `__interrupt__` information.
for chunk in graph.stream({"some_text": "Original text"}, config=thread_config):
   print(chunk)

# Resume using Command
for chunk in graph.stream(Command(resume="Edited text"), config=thread_config):
   print(chunk)
{'__interrupt__': (
      Interrupt(
         value={'question': 'Please revise the text', 'some_text': 'Original text'}, 
         resumable=True, 
         ns=['human_node:10fe492f-3688-c8c6-0d0a-ec61a43fecd6'], 
         when='during'
      ),
   )
}
{'human_node': {'some_text': 'Edited text'}}

要求

要在图中使用 interrupt,您需要

  1. 指定检查点 以在每个步骤后保存图的状态。
  2. 在适当的位置调用 interrupt()。有关示例,请参阅设计模式部分。
  3. 使用 线程 ID运行图,直到命中 interrupt
  4. 使用 invoke/ainvoke/stream/astream恢复执行(请参阅Command 原语)。

设计模式

通常,您可以使用人机环路工作流程执行三种不同的操作

  1. 批准或拒绝:在关键步骤(例如 API 调用)之前暂停图的执行,以审查和批准该操作。如果该操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。此模式通常涉及根据人工输入路由图的执行。
  2. 编辑图状态:暂停图的执行以审查和编辑图状态。这对于纠正错误或使用其他信息更新状态非常有用。此模式通常涉及使用人工输入更新状态。
  3. 获取输入:在图中的特定步骤显式请求人工输入。这对于收集额外信息或上下文以告知 Agent 的决策过程或支持多轮对话非常有用。

下面我们展示了可以使用这些操作实现的不同设计模式。

批准或拒绝

image

根据人工的批准或拒绝,图可以继续执行该操作或采取替代路径。

在关键步骤(例如 API 调用)之前暂停图的执行,以审查和批准该操作。如果该操作被拒绝,您可以阻止图执行该步骤,并可能采取替代操作。

from typing import Literal
from langgraph.types import interrupt, Command

def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
    is_approved = interrupt(
        {
            "question": "Is this correct?",
            # Surface the output that should be
            # reviewed and approved by the human.
            "llm_output": state["llm_output"]
        }
    )

    if is_approved:
        return Command(goto="some_node")
    else:
        return Command(goto="another_node")

# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)

# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with either an approval or rejection.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)

API 参考: interrupt | Command

有关更详细的示例,请参阅如何审查工具调用

审查和编辑状态

image

人工可以审查和编辑图的状态。这对于纠正错误或使用其他信息更新状态非常有用。
from langgraph.types import interrupt

def human_editing(state: State):
    ...
    result = interrupt(
        # Interrupt information to surface to the client.
        # Can be any JSON serializable value.
        {
            "task": "Review the output from the LLM and make any necessary edits.",
            "llm_generated_summary": state["llm_generated_summary"]
        }
    )

    # Update the state with the edited text
    return {
        "llm_generated_summary": result["edited_text"] 
    }

# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)

...

# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with the edited text.
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
    Command(resume={"edited_text": "The edited text"}), 
    config=thread_config
)

API 参考: interrupt

有关更详细的示例,请参阅如何使用 interrupt 等待用户输入

审查工具调用

image

人工可以在继续之前审查和编辑 LLM 的输出。这在 LLM 请求的工具调用可能敏感或需要人工监督的应用程序中尤其重要。
def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
    # This is the value we'll be providing via Command(resume=<human_review>)
    human_review = interrupt(
        {
            "question": "Is this correct?",
            # Surface tool calls for review
            "tool_call": tool_call
        }
    )

    review_action, review_data = human_review

    # Approve the tool call and continue
    if review_action == "continue":
        return Command(goto="run_tool")

    # Modify the tool call manually and then continue
    elif review_action == "update":
        ...
        updated_msg = get_updated_msg(review_data)
        # Remember that to modify an existing message you will need
        # to pass the message with a matching ID.
        return Command(goto="run_tool", update={"messages": [updated_message]})

    # Give natural language feedback, and then pass that back to the agent
    elif review_action == "feedback":
        ...
        feedback_msg = get_feedback_msg(review_data)
        return Command(goto="call_llm", update={"messages": [feedback_msg]})

有关更详细的示例,请参阅如何审查工具调用

多轮对话

image

多轮对话架构,其中 agent人工节点来回循环,直到 agent 决定将对话移交给另一个 agent 或系统的另一部分。

多轮对话涉及 agent 和人工之间的多次来回交互,这可以使 agent 以对话方式从人工那里收集更多信息。

此设计模式在由多个 agent组成的 LLM 应用程序中非常有用。一个或多个 agent 可能需要与人工进行多轮对话,其中人工在对话的不同阶段提供输入或反馈。为了简单起见,下面的 agent 实现被图示为一个单节点,但实际上它可能是由多个节点组成的更大图的一部分,并且包括条件边。

在此模式中,每个 agent 都有自己的人机节点来收集用户输入。这可以通过使用唯一名称命名人机节点(例如,“agent 1 的人机节点”、“agent 2 的人机节点”)或通过使用子图来实现,其中子图包含人机节点和 agent 节点。

from langgraph.types import interrupt

def human_input(state: State):
    human_message = interrupt("human_input")
    return {
        "messages": [
            {
                "role": "human",
                "content": human_message
            }
        ]
    }

def agent(state: State):
    # Agent logic
    ...

graph_builder.add_node("human_input", human_input)
graph_builder.add_edge("human_input", "agent")
graph = graph_builder.compile(checkpointer=checkpointer)

# After running the graph and hitting the interrupt, the graph will pause.
# Resume it with the human's input.
graph.invoke(
    Command(resume="hello!"),
    config=thread_config
)

在此模式中,单个人机节点用于收集多个 agent 的用户输入。活动 agent 是从状态确定的,因此在收集人工输入后,图可以路由到正确的 agent。

from langgraph.types import interrupt

def human_node(state: MessagesState) -> Command[Literal["agent_1", "agent_2", ...]]:
    """A node for collecting user input."""
    user_input = interrupt(value="Ready for user input.")

    # Determine the **active agent** from the state, so 
    # we can route to the correct agent after collecting input.
    # For example, add a field to the state or use the last active agent.
    # or fill in `name` attribute of AI messages generated by the agents.
    active_agent = ... 

    return Command(
        update={
            "messages": [{
                "role": "human",
                "content": user_input,
            }]
        },
        goto=active_agent,
    )

有关更详细的示例,请参阅如何实现多轮对话

验证人工输入

如果您需要在图本身内验证人工提供的输入(而不是在客户端),您可以通过在单个节点内使用多个 interrupt 调用来实现。

from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""
    question = "What is your age?"

    while True:
        answer = interrupt(question)

        # Validate answer, if the answer isn't valid ask for input again.
        if not isinstance(answer, int) or answer < 0:
            question = f"'{answer} is not a valid age. What is your age?"
            answer = None
            continue
        else:
            # If the answer is valid, we can proceed.
            break

    print(f"The human in the loop is {answer} years old.")
    return {
        "age": answer
    }

API 参考: interrupt

Command 原语

当使用 interrupt 函数时,图将在中断处暂停并等待用户输入。

可以使用 Command 原语恢复图的执行,该原语可以通过 invokeainvokestreamastream 方法传递。

Command 原语提供了几个选项来控制和修改恢复期间的图状态

  1. 将值传递给 interrupt:使用 Command(resume=value) 向图提供数据,例如用户的响应。执行从使用 interrupt 的节点开始处恢复,但是,这次 interrupt(...) 调用将返回在 Command(resume=value) 中传递的值,而不是暂停图的执行。

    # Resume graph execution with the user's input.
    graph.invoke(Command(resume={"age": "25"}), thread_config)
    
  2. 更新图状态:使用 Command(update=update) 修改图状态。请注意,恢复从使用 interrupt 的节点开始处开始。执行从使用 interrupt 的节点开始处恢复,但使用更新后的状态。

    # Update the graph state and resume.
    # You must provide a `resume` value if using an `interrupt`.
    graph.invoke(Command(update={"foo": "bar"}, resume="Let's go!!!"), thread_config)
    

通过利用 Command,您可以恢复图的执行,处理用户输入,并动态调整图的状态。

invokeainvoke 一起使用

当您使用 streamastream 运行图时,您将收到一个 Interrupt 事件,告知您 interrupt 已被触发。

invokeainvoke 不返回中断信息。要访问此信息,您必须使用 get_state 方法在调用 invokeainvoke 后检索图状态。

# Run the graph up to the interrupt 
result = graph.invoke(inputs, thread_config)
# Get the graph state to get interrupt information.
state = graph.get_state(thread_config)
# Print the state values
print(state.values)
# Print the pending tasks
print(state.tasks)
# Resume the graph with the user's input.
graph.invoke(Command(resume={"age": "25"}), thread_config)
{'foo': 'bar'} # State values
(
    PregelTask(
        id='5d8ffc92-8011-0c9b-8b59-9d3545b7e553', 
        name='node_foo', 
        path=('__pregel_pull', 'node_foo'), 
        error=None, 
        interrupts=(Interrupt(value='value_in_interrupt', resumable=True, ns=['node_foo:5d8ffc92-8011-0c9b-8b59-9d3545b7e553'], when='during'),), state=None, 
        result=None
    ),
) # Pending tasks. interrupts 

如何从中断处恢复?

警告

interrupt 恢复不同于 Python 的 input() 函数,在 Python 的 input() 函数中,执行从调用 input() 函数的确切位置恢复。

使用 interrupt 的一个关键方面是理解恢复的工作原理。当您在 interrupt 后恢复执行时,图执行从触发最后一个 interrupt图节点开头开始。

从节点开头到 interrupt所有代码都将重新执行。

counter = 0
def node(state: State):
    # All the code from the beginning of the node to the interrupt will be re-executed
    # when the graph resumes.
    global counter
    counter += 1
    print(f"> Entered the node: {counter} # of times")
    # Pause the graph and wait for user input.
    answer = interrupt()
    print("The value of counter is:", counter)
    ...

恢复图时,计数器将再次递增,从而产生以下输出

> Entered the node: 2 # of times
The value of counter is: 2

常见陷阱

副作用

将具有副作用的代码(例如 API 调用)放置在 interrupt之后,以避免重复,因为每次节点从中断处恢复时都会重新触发这些代码。

当节点从 interrupt 恢复时,此代码将再次重新执行 API 调用。

如果 API 调用不是幂等的或只是成本很高,则这可能会成为问题。

from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""
    api_call(...) # This code will be re-executed when the node is resumed.
    answer = interrupt(question)
from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""

    answer = interrupt(question)

    api_call(answer) # OK as it's after the interrupt
from langgraph.types import interrupt

def human_node(state: State):
    """Human node with validation."""

    answer = interrupt(question)

    return {
        "answer": answer
    }

def api_call_node(state: State):
    api_call(...) # OK as it's in a separate node

作为函数调用的子图

作为函数调用子图时,父图将从调用子图的节点开头(以及触发 interrupt 的位置)恢复执行。同样,子图将从调用 interrupt() 函数的节点开头恢复。

例如,

def node_in_parent_graph(state: State):
    some_code()  # <-- This will re-execute when the subgraph is resumed.
    # Invoke a subgraph as a function.
    # The subgraph contains an `interrupt` call.
    subgraph_result = subgraph.invoke(some_input)
    ...
示例:父图和子图执行流程

假设我们有一个包含 3 个节点的父图

父图node_1node_2(子图调用)→ node_3

子图有 3 个节点,其中第二个节点包含 interrupt

子图sub_node_1sub_node_2interrupt)→ sub_node_3

当恢复图时,执行将按如下方式进行

  1. 跳过父图中的 node_1(已执行,图状态已保存在快照中)。
  2. 从头开始重新执行父图中的 node_2
  3. 跳过子图中的 sub_node_1(已执行,图状态已保存在快照中)。
  4. 从头开始重新执行子图中的 sub_node_2
  5. 继续执行 sub_node_3 和后续节点。

这是一个缩写示例代码,您可以使用它来了解子图如何与中断一起工作。它计算每个节点被输入的次数并打印计数。

import uuid
from typing import TypedDict

from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
   """The graph state."""
   state_counter: int


counter_node_in_subgraph = 0

def node_in_subgraph(state: State):
   """A node in the sub-graph."""
   global counter_node_in_subgraph
   counter_node_in_subgraph += 1  # This code will **NOT** run again!
   print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")

counter_human_node = 0

def human_node(state: State):
   global counter_human_node
   counter_human_node += 1 # This code will run again!
   print(f"Entered human_node in sub-graph a total of {counter_human_node} times")
   answer = interrupt("what is your name?")
   print(f"Got an answer of {answer}")


checkpointer = MemorySaver()

subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)


counter_parent_node = 0

def parent_node(state: State):
   """This parent node will invoke the subgraph."""
   global counter_parent_node

   counter_parent_node += 1 # This code will run again on resuming!
   print(f"Entered `parent_node` a total of {counter_parent_node} times")

   # Please note that we're intentionally incrementing the state counter
   # in the graph state as well to demonstrate that the subgraph update
   # of the same key will not conflict with the parent graph (until
   subgraph_state = subgraph.invoke(state)
   return subgraph_state


builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")

# A checkpointer must be enabled for interrupts to work!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
   "configurable": {
      "thread_id": uuid.uuid4(),
   }
}

for chunk in graph.stream({"state_counter": 1}, config):
   print(chunk)

print('--- Resuming ---')

for chunk in graph.stream(Command(resume="35"), config):
   print(chunk)

这将打印输出

Entered `parent_node` a total of 1 times
Entered `node_in_subgraph` a total of 1 times
Entered human_node in sub-graph a total of 1 times
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- Resuming ---
Entered `parent_node` a total of 2 times
Entered human_node in sub-graph a total of 2 times
Got an answer of 35
{'parent_node': {'state_counter': 1}}

使用多个中断

单个节点中使用多个中断对于诸如验证人工输入之类的模式可能很有用。但是,如果在同一节点中使用多个中断,如果处理不当,可能会导致意外行为。

当节点包含多个中断调用时,LangGraph 会保留特定于执行该节点的任务的恢复值列表。每当执行恢复时,它都会从节点开头开始。对于遇到的每个中断,LangGraph 都会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序至关重要。

为避免问题,请避免在执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能会导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 改变状态或依赖全局变量来动态修改节点的结构。

不正确的代码示例
import uuid
from typing import TypedDict, Optional

from langgraph.graph import StateGraph
from langgraph.constants import START 
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
    """The graph state."""

    age: Optional[str]
    name: Optional[str]


def human_node(state: State):
    if not state.get('name'):
        name = interrupt("what is your name?")
    else:
        name = "N/A"

    if not state.get('age'):
        age = interrupt("what is your age?")
    else:
        age = "N/A"

    print(f"Name: {name}. Age: {age}")

    return {
        "age": age,
        "name": name,
    }


builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")

# A checkpointer must be enabled for interrupts to work!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
        "thread_id": uuid.uuid4(),
    }
}

for chunk in graph.stream({"age": None, "name": None}, config):
    print(chunk)

for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
    print(chunk)
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
Name: N/A. Age: John
{'human_node': {'age': 'John', 'name': 'N/A'}}

其他资源 📚

评论