函数式 API¶
概述¶
函数式 API 允许您将 LangGraph 的关键功能(持久化、内存、人工参与 和 流式处理)添加到您的应用程序中,而只需对现有代码进行最少的更改。
它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流的现有代码中,例如 if
语句、for
循环和函数调用。与许多需要将代码重组为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格的执行模型的情况下合并这些功能。
函数式 API 使用两个关键构建块
@entrypoint
– 将函数标记为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。@task
– 表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 future 的对象,可以等待或同步解析。
这为构建具有状态管理和流式处理的工作流提供了最小的抽象。
提示
对于喜欢更声明式方法的用户,LangGraph 的 图 API 允许您使用图范式定义工作流。两个 API 共享相同的底层运行时,因此您可以在同一应用程序中一起使用它们。请参阅 函数式 API vs. 图 API 部分,了解两种范式的比较。
示例¶
下面我们演示一个简单的应用程序,它编写一篇关于“猫”的文章,并中断以请求人工审核。
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
@task
def write_essay(topic: str) -> str:
"""Write an essay about the given topic."""
time.sleep(1) # A placeholder for a long-running task.
return f"An essay about topic: {topic}"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""A simple workflow that writes an essay and asks for a review."""
essay = write_essay("cat").result()
is_approved = interrupt({
# Any json-serializable payload provided to interrupt as argument.
# It will be surfaced on the client side as an Interrupt when streaming data
# from the workflow.
"essay": essay, # The essay we want reviewed.
# We can add any additional information that we need.
# For example, introduce a key called "action" with some instructions.
"action": "Please approve/reject the essay",
})
return {
"essay": essay, # The essay that was generated
"is_approved": is_approved, # Response from HIL
}
API 参考:entrypoint | task | interrupt
详细解释
此工作流将撰写一篇关于主题“猫”的文章,然后暂停以获取人工审核。工作流可能会无限期地中断,直到提供审核为止。
当工作流恢复时,它将从头开始执行,但由于 write_essay
任务的结果已保存,因此任务结果将从检查点加载,而不是重新计算。
import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
@task
def write_essay(topic: str) -> str:
"""Write an essay about the given topic."""
time.sleep(1) # This is a placeholder for a long-running task.
return f"An essay about topic: {topic}"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""A simple workflow that writes an essay and asks for a review."""
essay = write_essay("cat").result()
is_approved = interrupt({
# Any json-serializable payload provided to interrupt as argument.
# It will be surfaced on the client side as an Interrupt when streaming data
# from the workflow.
"essay": essay, # The essay we want reviewed.
# We can add any additional information that we need.
# For example, introduce a key called "action" with some instructions.
"action": "Please approve/reject the essay",
})
return {
"essay": essay, # The essay that was generated
"is_approved": is_approved, # Response from HIL
}
thread_id = str(uuid.uuid4())
config = {
"configurable": {
"thread_id": thread_id
}
}
for item in workflow.stream("cat", config):
print(item)
{'write_essay': 'An essay about topic: cat'}
{'__interrupt__': (Interrupt(value={'essay': 'An essay about topic: cat', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'], when='during'),)}
文章已撰写完成,可以进行审核。一旦提供审核,我们就可以恢复工作流
from langgraph.types import Command
# Get review from a user (e.g., via a UI)
# In this case, we're using a bool, but this can be any json-serializable value.
human_review = True
for item in workflow.stream(Command(resume=human_review), config):
print(item)
工作流已完成,审核已添加到文章中。
入口点¶
@entrypoint
装饰器可用于从函数创建工作流。它封装了工作流逻辑并管理执行流程,包括处理长时间运行的任务和 中断。
定义¶
入口点是通过使用 @entrypoint
装饰器装饰函数来定义的。
该函数必须接受单个位置参数,该参数充当工作流输入。如果您需要传递多条数据,请使用字典作为第一个参数的输入类型。
使用 entrypoint
装饰函数会生成一个 Pregel
实例,该实例有助于管理工作流的执行(例如,处理流式处理、恢复和检查点)。
您通常需要将 检查点 传递给 @entrypoint
装饰器,以启用持久性并使用 人工参与 等功能。
序列化
入口点的 输入 和 输出 必须是 JSON 可序列化的,以支持检查点。请参阅 序列化 部分了解更多详细信息。
可注入参数¶
在声明 entrypoint
时,您可以请求访问将在运行时自动注入的其他参数。这些参数包括
参数 | 描述 |
---|---|
previous | 访问与给定线程的先前 checkpoint 关联的状态。请参阅 状态管理。 |
store | BaseStore 的实例。对于 长期记忆 很有用。 |
writer | 用于流式传输自定义数据,将自定义数据写入 custom 流。对于 流式传输自定义数据 很有用。 |
config | 用于访问运行时配置。有关信息,请参阅 RunnableConfig。 |
重要提示
使用适当的名称和类型注解声明参数。
请求可注入参数
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore(...) # An instance of InMemoryStore for long-term memory
@entrypoint(
checkpointer=checkpointer, # Specify the checkpointer
store=in_memory_store # Specify the store
)
def my_workflow(
some_input: dict, # The input (e.g., passed via `invoke`)
*,
previous: Any = None, # For short-term memory
store: BaseStore, # For long-term memory
writer: StreamWriter, # For streaming custom data
config: RunnableConfig # For accessing the configuration passed to the entrypoint
) -> ...:
执行¶
使用 @entrypoint
会产生一个 Pregel
对象,可以使用 invoke
、ainvoke
、stream
和 astream
方法执行该对象。
恢复¶
在 中断 后恢复执行可以通过将 resume 值传递给 Command 原始类型来完成。
错误后恢复
要在错误后恢复,请使用 None
和相同的 线程 ID(配置)运行 entrypoint
。
这假设底层的 错误 已解决,并且可以成功进行执行。
状态管理¶
当使用 checkpointer
定义 entrypoint
时,它会在 检查点 中存储同一 线程 ID 上连续调用之间的信息。
这允许使用 previous
参数从先前的调用中访问状态。
默认情况下,previous
参数是先前调用的返回值。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
previous = previous or 0
return number + previous
config = {
"configurable": {
"thread_id": "some_thread_id"
}
}
my_workflow.invoke(1, config) # 1 (previous was None)
my_workflow.invoke(2, config) # 3 (previous was 1 from the previous invocation)
entrypoint.final
¶
entrypoint.final 是一个特殊的原始类型,可以从入口点返回,并允许将 保存在检查点中的值 与 入口点的返回值 解耦。
第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解为 entrypoint.final[return_type, save_type]
。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
previous = previous or 0
# This will return the previous value to the caller, saving
# 2 * number to the checkpoint, which will be used in the next invocation
# for the `previous` parameter.
return entrypoint.final(value=previous, save=2 * number)
config = {
"configurable": {
"thread_id": "1"
}
}
my_workflow.invoke(3, config) # 0 (previous was None)
my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
任务¶
任务表示一个离散的工作单元,例如 API 调用或数据处理步骤。它具有两个关键特性
- 异步执行:任务旨在异步执行,允许多个操作并发运行而不会阻塞。
- 检查点:任务结果将保存到检查点,从而可以从上次保存的状态恢复工作流。(有关更多详细信息,请参阅 持久性)。
定义¶
任务是使用 @task
装饰器定义的,该装饰器包装了一个常规 Python 函数。
from langgraph.func import task
@task()
def slow_computation(input_value):
# Simulate a long-running operation
...
return result
API 参考:task
序列化
任务的 输出 必须是 JSON 可序列化的,以支持检查点。
执行¶
任务只能从 入口点、另一个 任务 或 状态图节点 中调用。
不能 从主要应用程序代码中直接调用任务。
当您调用 任务 时,它会立即返回一个 future 对象。future 是稍后可用的结果的占位符。
要获取 任务 的结果,您可以同步等待它(使用 result()
)或异步等待它(使用 await
)。
何时使用任务¶
在以下场景中,任务很有用
- 检查点:当您需要将长时间运行的操作的结果保存到检查点时,这样您在恢复工作流时就不需要重新计算它。
- 人工参与:如果您正在构建需要人工干预的工作流,则必须使用 任务 来封装任何随机性(例如,API 调用),以确保可以正确恢复工作流。请参阅 确定性 部分了解更多详细信息。
- 并行执行:对于 I/O 密集型任务,任务 启用并行执行,允许多个操作并发运行而不会阻塞(例如,调用多个 API)。
- 可观察性:将操作包装在 任务 中提供了一种跟踪工作流进度和使用 LangSmith 监控单个操作的执行的方法。
- 可重试的工作:当工作需要重试以处理失败或不一致时,任务 提供了一种封装和管理重试逻辑的方法。
序列化¶
LangGraph 中的序列化有两个关键方面
@entrypoint
输入和输出必须是 JSON 可序列化的。@task
输出必须是 JSON 可序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用 Python 原始类型(如字典、列表、字符串、数字和布尔值)以确保您的输入和输出是可序列化的。
序列化确保可以可靠地保存和恢复工作流状态,例如任务结果和中间值。这对于启用人工参与交互、容错和并行执行至关重要。
如果提供不可序列化的输入或输出,则当工作流配置了检查点时,将导致运行时错误。
确定性¶
要利用 人工参与 等功能,任何随机性都应封装在 任务 内部。这保证了当执行暂停(例如,对于人工参与)然后恢复时,即使 任务 结果是非确定性的,它也会遵循相同的步骤顺序。
LangGraph 通过持久化 任务 和 子图 结果来实现此行为。设计良好的工作流可确保恢复执行遵循相同的步骤顺序,从而允许正确检索先前计算的结果,而无需重新执行它们。这对于长时间运行的 任务 或具有非确定性结果的 任务 特别有用,因为它避免了重复先前完成的工作,并允许从基本相同的状态恢复
虽然工作流的不同运行可能会产生不同的结果,但恢复 特定 运行应始终遵循相同的记录步骤顺序。这使 LangGraph 能够有效地查找在图中断之前执行的 任务 和 子图 结果,并避免重新计算它们。
幂等性¶
幂等性确保多次运行同一操作会产生相同的结果。这有助于防止因步骤因故障而重新运行时重复 API 调用和冗余处理。始终将 API 调用放在 任务 函数内部以进行检查点,并将它们设计为幂等的,以防重新执行。如果 任务 开始但未成功完成,则可能会发生重新执行。然后,如果恢复工作流,则 任务 将再次运行。使用幂等性键或验证现有结果以避免重复。
函数式 API vs. 图 API¶
函数式 API 和 图 API(状态图) 提供了两种不同的范例来创建具有 LangGraph 的应用程序。以下是一些主要区别
- 控制流:函数式 API 不需要考虑图结构。您可以使用标准 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
- 状态管理:GraphAPI 需要声明 状态,并且可能需要定义 reducers 来管理对图状态的更新。
@entrypoint
和@tasks
不需要显式状态管理,因为它们的状态作用域限定在函数内,并且不会在函数之间共享。 - 检查点:两个 API 都生成和使用检查点。在 图 API 中,每次 超步 后都会生成新的检查点。在 函数式 API 中,当执行任务时,其结果将保存到与给定入口点关联的现有检查点,而不是创建新的检查点。
- 可视化:图 API 可以轻松地将工作流可视化为图形,这对于调试、理解工作流以及与他人共享非常有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。
常见陷阱¶
处理副作用¶
将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会多次执行它们。
在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时将再次执行它。
在此示例中,副作用封装在任务中,确保了恢复时执行的一致性。
from langgraph.func import task
@task
def write_to_file():
with open("output.txt", "w") as f:
f.write("Side effect executed")
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
# The side effect is now encapsulated in a task.
write_to_file().result()
value = interrupt("question")
return value
非确定性控制流¶
可能会每次给出不同结果的操作(例如,获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。
- 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → ...
- 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新的随机数 (7) → ...
当使用具有多个中断调用的人工参与工作流时,这一点尤其重要。LangGraph 保留每个任务/入口点的恢复值列表。当遇到中断时,它会与相应的恢复值匹配。此匹配严格基于索引,因此恢复值的顺序应与中断的顺序匹配。
如果在恢复时未维护执行顺序,则一个 interrupt
调用可能会与错误的 resume
值匹配,从而导致不正确的结果。
请阅读有关 确定性 的部分以了解更多详细信息。
在此示例中,工作流使用当前时间来确定要执行哪个任务。这是非确定性的,因为工作流的结果取决于执行时的时间。
from langgraph.func import entrypoint
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
t1 = time.time()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}
在此示例中,工作流使用输入 t0
来确定要执行哪个任务。这是确定性的,因为工作流的结果仅取决于输入。
import time
from langgraph.func import task
@task
def get_time() -> float:
return time.time()
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
t1 = get_time().result()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}
模式¶
以下是一些简单的模式,显示了如何使用函数式 API 的示例。
定义 entrypoint
时,输入仅限于函数的第一个参数。要传递多个输入,您可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = inputs["value"]
another_value = inputs["another_value"]
...
my_workflow.invoke({"value": 1, "another_value": 2})
并行执行¶
可以通过并发调用任务并等待结果来并行执行任务。这对于提高 IO 密集型任务(例如,为 LLM 调用 API)的性能非常有用。
@task
def add_one(number: int) -> int:
return number + 1
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
调用子图¶
函数式 API 和 图 API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时。
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
builder = StateGraph()
...
some_graph = builder.compile()
@entrypoint()
def some_workflow(some_input: dict) -> int:
# Call a graph defined using the graph API
result_1 = some_graph.invoke(...)
# Call another graph defined using the graph API
result_2 = another_graph.invoke(...)
return {
"result_1": result_1,
"result_2": result_2
}
API 参考:entrypoint | StateGraph
调用其他入口点¶
您可以从 入口点 或 任务 中调用其他 入口点。
@entrypoint() # Will automatically use the checkpointer from the parent entrypoint
def some_other_workflow(inputs: dict) -> int:
return inputs["value"]
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = some_other_workflow.invoke({"value": 1})
return value
流式传输自定义数据¶
您可以使用 StreamWriter
类型从 入口点 流式传输自定义数据。这允许您将自定义数据写入 custom
流。
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
@task
def add_one(x):
return x + 1
@task
def add_two(x):
return x + 2
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter) -> int:
"""A simple workflow that adds one and two to a number."""
writer("hello") # Write some data to the `custom` stream
add_one(inputs['number']).result() # Will write data to the `updates` stream
writer("world") # Write some more data to the `custom` stream
add_two(inputs['number']).result() # Will write data to the `updates` stream
return 5
config = {
"configurable": {
"thread_id": "1"
}
}
for chunk in main.stream({"number": 1}, stream_mode=["custom", "updates"], config=config):
print(chunk)
API 参考:MemorySaver | entrypoint | task | StreamWriter
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
重要提示
writer
参数在运行时自动注入。仅当参数名称以该确切名称出现在函数签名中时,才会注入该参数。
重试策略¶
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy
attempts = 0
# Let's configure the RetryPolicy to retry on ValueError.
# The default RetryPolicy is optimized for retrying specific network errors.
retry_policy = RetryPolicy(retry_on=ValueError)
@task(retry=retry_policy)
def get_info():
global attempts
attempts += 1
if attempts < 2:
raise ValueError('Failure')
return "OK"
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
return get_info().result()
config = {
"configurable": {
"thread_id": "1"
}
}
main.invoke({'any_input': 'foobar'}, config=config)
API 参考:MemorySaver | entrypoint | task | RetryPolicy
错误后恢复¶
import time
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
# This variable is just used for demonstration purposes to simulate a network failure.
# It's not something you will have in your actual code.
attempts = 0
@task()
def get_info():
"""
Simulates a task that fails once before succeeding.
Raises an exception on the first attempt, then returns "OK" on subsequent tries.
"""
global attempts
attempts += 1
if attempts < 2:
raise ValueError("Failure") # Simulate a failure on the first attempt
return "OK"
# Initialize an in-memory checkpointer for persistence
checkpointer = MemorySaver()
@task
def slow_task():
"""
Simulates a slow-running task by introducing a 1-second delay.
"""
time.sleep(1)
return "Ran slow task."
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
"""
Main workflow function that runs the slow_task and get_info tasks sequentially.
Parameters:
- inputs: Dictionary containing workflow input values.
- writer: StreamWriter for streaming custom data.
The workflow first executes `slow_task` and then attempts to execute `get_info`,
which will fail on the first invocation.
"""
slow_task_result = slow_task().result() # Blocking call to slow_task
get_info().result() # Exception will be raised here on the first attempt
return slow_task_result
# Workflow execution configuration with a unique thread identifier
config = {
"configurable": {
"thread_id": "1" # Unique identifier to track workflow execution
}
}
# This invocation will take ~1 second due to the slow_task execution
try:
# First invocation will raise an exception due to the `get_info` task failing
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass # Handle the failure gracefully
API 参考:MemorySaver | entrypoint | task | StreamWriter
当我们恢复执行时,我们将不需要重新运行 slow_task
,因为其结果已保存在检查点中。
人工参与¶
函数式 API 使用 interrupt
函数和 Command
原始类型支持 人工参与 工作流。
请参阅以下示例以获取更多详细信息
- 如何等待用户输入(函数式 API):展示如何使用函数式 API 实现简单的人工参与工作流。
- 如何审查工具调用(函数式 API):指南演示了如何在使用 LangGraph 函数式 API 的 ReAct Agent 中实现人工参与工作流。
短期记忆¶
使用 previous 参数和可选地使用 entrypoint.final
原始类型的 状态管理 可用于实现 短期记忆。
请参阅以下操作指南以获取更多详细信息
- 如何添加线程级持久性(函数式 API):展示如何将线程级持久性添加到函数式 API 工作流并实现一个简单的聊天机器人。
长期记忆¶
长期记忆 允许跨不同 线程 ID 存储信息。这有助于在一个对话中学习有关给定用户的信息并在另一个对话中使用它。
请参阅以下操作指南以获取更多详细信息
- 如何添加跨线程持久性(函数式 API):展示如何将跨线程持久性添加到函数式 API 工作流并实现一个简单的聊天机器人。
工作流¶
- 工作流和 Agent 指南,其中包含更多关于如何使用函数式 API 构建工作流的示例。
Agents¶
- 如何从头开始创建 React Agent(函数式 API):展示如何使用函数式 API 从头开始创建一个简单的 React Agent。
- 如何构建多 Agent 网络:展示如何使用函数式 API 构建多 Agent 网络。
- 如何在多 Agent 应用程序中添加多轮对话(函数式 API):允许最终用户与一个或多个 Agent 进行多轮对话。