LangGraph 运行时¶
Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行。
编译一个 StateGraph 或创建一个 entrypoint 都会产生一个 Pregel 实例,该实例可以用输入来调用。
本指南从高层次解释了运行时,并提供了直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时以谷歌的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
概述¶
在 LangGraph 中,Pregel 将行动者 (actors) 和通道 (channels) 组合成一个单一的应用程序。行动者从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行模型,将应用程序的执行组织成多个步骤。
每个步骤包含三个阶段
- 计划 (Plan):确定在此步骤中要执行哪些行动者。例如,在第一步中,选择订阅特殊输入通道的行动者;在后续步骤中,选择订阅上一步中更新过的通道的行动者。
- 执行 (Execution):并行执行所有选定的行动者,直到全部完成、其中一个失败或达到超时。在此阶段,通道的更新对行动者不可见,直到下一步骤。
- 更新 (Update):用此步骤中行动者写入的值更新通道。
重复以上步骤,直到没有行动者被选中执行,或达到最大步骤数。
行动者 (Actors)¶
一个行动者是一个 PregelNode
。它订阅通道,从中读取数据,并向其写入数据。可以将其视为 Pregel 算法中的一个行动者。PregelNodes
实现了 LangChain 的 Runnable 接口。
通道 (Channels)¶
通道用于在行动者 (PregelNodes) 之间进行通信。每个通道都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从一个链发送给它自己。LangGraph 提供了许多内置通道:
- LastValue:默认通道,存储发送到通道的最后一个值,适用于输入和输出值,或用于将数据从一个步骤发送到下一个步骤。
- Topic:一个可配置的发布/订阅主题 (PubSub Topic),适用于在行动者之间发送多个值,或用于累积输出。可以配置为对值进行去重,或在多个步骤中累积值。
- BinaryOperatorAggregate:存储一个持久值,通过将一个二元操作符应用于当前值和发送到通道的每个更新来更新该值,适用于在多个步骤中计算聚合值;例如,
total = BinaryOperatorAggregate(int, operator.add)
示例¶
虽然大多数用户会通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,让您了解 Pregel API。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
这个示例演示了如何使用 BinaryOperatorAggregate 通道来实现一个归约器 (reducer)。
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
这个示例演示了如何在图中引入一个循环,即让一个链向其订阅的通道写入数据。执行将持续进行,直到向通道写入一个 None
值。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder().subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry("value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
高阶 API¶
LangGraph 提供了两种用于创建 Pregel 应用程序的高阶 API:StateGraph (图 API) 和 函数式 API。
StateGraph (图 API) 是一个更高层次的抽象,它简化了 Pregel 应用程序的创建。它允许您定义一个由节点和边组成的图。当您编译该图时,StateGraph API 会自动为您创建 Pregel 应用程序。
from typing import TypedDict, Optional
from langgraph.constants import START
from langgraph.graph import StateGraph
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
def score_essay(essay: Essay):
return {
"score": 10
}
builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
编译后的 Pregel 实例将与一个节点和通道列表相关联。您可以通过打印它们来检查节点和通道。
您将会看到类似这样的内容
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
您应该会看到类似这样的内容
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
在函数式 API中,您可以使用 entrypoint
来创建 Pregel 应用程序。entrypoint
装饰器允许您定义一个接收输入并返回输出的函数。
from typing import TypedDict, Optional
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint
class Essay(TypedDict):
topic: str
content: Optional[str]
score: Optional[float]
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
return {
"content": f"Essay about {essay['topic']}",
}
print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}