持久化执行¶
持久化执行是一种技术,在这种技术中,进程或工作流会在关键点保存其进度,从而允许它暂停并在稍后从中断处精确地恢复。这在需要人工介入的场景中特别有用,用户可以在继续之前检查、验证或修改流程;在可能遇到中断或错误(例如,调用LLM超时)的长时间运行任务中也很有用。通过保留已完成的工作,持久化执行使进程能够在不重新处理先前步骤的情况下恢复——即使在长时间延迟(例如,一周后)之后也是如此。
LangGraph 的内置持久化层为工作流提供了持久化执行,确保每个执行步骤的状态都保存到持久存储中。此功能保证了如果工作流被中断——无论是由于系统故障还是为了人工介入交互——它都可以从其最后记录的状态恢复。
提示
如果您正在使用带有检查点(checkpointer)的 LangGraph,那么您已经启用了持久化执行。您可以在任何时候暂停和恢复工作流,即使在中断或失败之后也是如此。为了充分利用持久化执行,请确保您的工作流被设计为确定性的和幂等的,并将任何副作用或非确定性操作包装在任务中。您可以从StateGraph (Graph API) 和 函数式 API 中使用任务。
要求¶
要在 LangGraph 中利用持久化执行,您需要:
- 通过指定一个将保存工作流进度的检查点(checkpointer),在您的工作流中启用持久化。
-
在执行工作流时指定一个线程标识符。这将跟踪工作流特定实例的执行历史。
-
将任何非确定性操作(例如,随机数生成)或有副作用的操作(例如,文件写入、API 调用)包装在任务中,以确保当工作流恢复时,这些操作不会在特定运行中重复,而是从持久化层检索其结果。有关更多信息,请参阅确定性与一致性重放。
确定性与一致性重放¶
当您恢复一个工作流运行时,代码并不会从执行停止的同一行代码处恢复;相反,它会确定一个合适的起点,从那里继续执行。这意味着工作流将从起点开始重放所有步骤,直到达到它被停止的位置。
因此,当您为持久化执行编写工作流时,必须将任何非确定性操作(例如,随机数生成)和任何有副作用的操作(例如,文件写入、API调用)包装在任务或节点中。
为确保您的工作流是确定性的并且可以一致地重放,请遵循以下准则:
- 避免重复工作:如果一个节点包含多个有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在单独的任务中。这确保了当工作流恢复时,这些操作不会重复,并且它们的结果会从持久化层中检索。
- 封装非确定性操作: 将任何可能产生非确定性结果的代码(例如,随机数生成)包装在任务或节点中。这确保了在恢复时,工作流会以相同的输出遵循完全相同的已记录步骤序列。
- 使用幂等操作:尽可能确保副作用(例如,API调用、文件写入)是幂等的。这意味着如果一个操作在工作流中失败后重试,它将具有与第一次执行时相同的效果。这对于导致数据写入的操作尤其重要。如果一个任务开始但未能成功完成,工作流的恢复将重新运行该任务,并依赖于已记录的结果来保持一致性。使用幂等性密钥或验证现有结果以避免意外的重复,确保工作流执行平稳且可预测。
有关一些需要避免的陷阱示例,请参阅函数式 API 中的常见陷阱部分,该部分展示了如何使用任务来构建代码以避免这些问题。同样的原则也适用于StateGraph (Graph API)。
持久化模式¶
LangGraph 支持三种持久化模式,允许您根据应用程序的要求在性能和数据一致性之间进行平衡。持久化模式从最不持久到最持久如下:
更高的持久化模式会给工作流执行增加更多的开销。
在 v0.6.0 中添加
使用 durability
参数代替 checkpoint_during
(在 v0.6.0 中已弃用)进行持久化策略管理
durability="async"
替代checkpoint_during=True
durability="exit"
替代checkpoint_during=False
进行持久化策略管理,映射关系如下:
checkpoint_during=True
->durability="async"
checkpoint_during=False
->durability="exit"
"exit"
¶
仅当图执行完成时(无论是成功还是出错)才会持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此您无法从执行中的故障中恢复或中断图的执行。
"async"
¶
在下一步执行的同时异步持久化更改。这提供了良好的性能和持久性,但存在一个小风险,即如果进程在执行期间崩溃,检查点可能无法写入。
"sync"
¶
在下一步开始之前同步持久化更改。这确保了每个检查点在继续执行之前都被写入,以一些性能开销为代价提供了高持久性。
您可以在调用任何图执行方法时指定持久化模式:
在节点中使用任务¶
如果一个节点包含多个操作,您可能会发现将每个操作转换为一个任务比将操作重构为单独的节点更容易。
from typing import NotRequired
from typing_extensions import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests
# Define a TypedDict to represent the state
class State(TypedDict):
url: str
result: NotRequired[str]
def call_api(state: State):
"""Example node that makes an API request."""
result = requests.get(state['url']).text[:100] # Side-effect
return {
"result": result
}
# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# Specify a checkpointer
checkpointer = InMemorySaver()
# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}
# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests
# Define a TypedDict to represent the state
class State(TypedDict):
urls: list[str]
result: NotRequired[list[str]]
@task
def _make_request(url: str):
"""Make a request."""
return requests.get(url).text[:100]
def call_api(state: State):
"""Example node that makes an API request."""
requests = [_make_request(url) for url in state['urls']]
results = [request.result() for request in requests]
return {
"results": results
}
# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# Specify a checkpointer
checkpointer = InMemorySaver()
# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}
# Invoke the graph
graph.invoke({"urls": ["https://www.example.com"]}, config)
恢复工作流¶
一旦您在工作流中启用了持久化执行,您可以在以下场景中恢复执行:
- 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新后的状态恢复它。有关更多详细信息,请参阅人工介入。
- 从故障中恢复: 在发生异常(例如,LLM 提供商中断)后,自动从最后一个成功的检查点恢复工作流。这涉及使用相同的线程标识符执行工作流,并为其提供一个
None
作为输入值(请参阅这个使用函数式 API 的示例)。
恢复工作流的起点¶
- 如果您使用的是 StateGraph (Graph API),起点是执行停止的节点的开始处。
- 如果您在节点内调用子图,起点将是调用已停止子图的父节点。在子图内部,起点将是执行停止的特定节点。
- 如果您使用的是函数式 API,起点是执行停止的入口点(entrypoint)的开始处。