设置¶
首先,让我们安装所需的包
在 [1]
已复制!
%%capture --no-stderr
%pip install -U langgraph
%%capture --no-stderr %pip install -U langgraph
并行节点扇出和扇入¶
在此示例中,我们从 Node A
扇出到 B and C
,然后扇入到 D
。使用我们的状态,我们指定归约器加操作。这将组合或累积状态中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接起来。
请注意,LangGraph 使用 Annotated
类型来指定状态中特定键的归约器函数:它维护原始类型 (list
) 用于类型检查,但允许将归约器函数 (add
) 附加到类型而不会改变类型本身。
在 [3]
已复制!
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
import operator from typing import Annotated, Any from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict): # operator.add 归约器函数使这成为仅追加的聚合: Annotated[list, operator.add] class ReturnNodeValue: def __init__(self, node_secret: str): self._value = node_secret def __call__(self, state: State) -> Any: print(f"Adding {self._value} to {state['aggregate']}") return {"aggregate": [self._value]} builder = StateGraph(State) builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_edge(START, "a") builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "d") builder.add_edge("c", "d") builder.add_edge("d", END) graph = builder.compile()
在 [4]
已复制!
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display display(Image(graph.get_graph().draw_mermaid_png()))
使用归约器,您可以看到每个节点中添加的值都被累积起来。
在 [5]
已复制!
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] Adding I'm D to ["I'm A", "I'm B", "I'm C"]
Out[5]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm D"]}
带额外步骤的并行节点扇出和扇入¶
上面的示例展示了如何在每条路径只有一步的情况下进行扇出和扇入。但是,如果一条路径有多个步骤呢?
在 [6]
已复制!
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
import operator from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph class State(TypedDict): # operator.add 归约器函数使这成为仅追加的聚合: Annotated[list, operator.add] builder = StateGraph(State) builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_edge(START, "a") builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("b2", ReturnNodeValue("I'm B2")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) builder.add_edge("a", "b") builder.add_edge("a", "c") builder.add_edge("b", "b2") builder.add_edge(["b2", "c"], "d") builder.add_edge("d", END) graph = builder.compile()
在 [7]
已复制!
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display display(Image(graph.get_graph().draw_mermaid_png()))
在 [8]
已复制!
graph.invoke({"aggregate": []})
graph.invoke({"aggregate": []})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] Adding I'm B2 to ["I'm A", "I'm B", "I'm C"] Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
Out[8]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
条件分支¶
如果您的扇出不是确定性的,您可以直接使用 add_conditional_edges。
如果您有一个已知的 "接收器" 节点,条件分支将在之后路由到该节点,您可以在创建条件边时提供 then=<最终节点名称>
。
在 [9]
已复制!
import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import END, START, StateGraph
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
which: str
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_node("e", ReturnNodeValue("I'm E"))
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
"a",
route_bc_or_cd,
intermediates,
)
for node in intermediates:
builder.add_edge(node, "e")
builder.add_edge("e", END)
graph = builder.compile()
import operator from typing import Annotated, Sequence from typing_extensions import TypedDict from langgraph.graph import END, START, StateGraph class State(TypedDict): # operator.add 归约器函数使这成为仅追加的聚合: Annotated[list, operator.add] which: str builder = StateGraph(State) builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_edge(START, "a") builder.add_node("b", ReturnNodeValue("I'm B")) builder.add_node("c", ReturnNodeValue("I'm C")) builder.add_node("d", ReturnNodeValue("I'm D")) builder.add_node("e", ReturnNodeValue("I'm E")) def route_bc_or_cd(state: State) -> Sequence[str]: if state["which"] == "cd": return ["c", "d"] return ["b", "c"] intermediates = ["b", "c", "d"] builder.add_conditional_edges( "a", route_bc_or_cd, intermediates, ) for node in intermediates: builder.add_edge(node, "e") builder.add_edge("e", END) graph = builder.compile()
在 [10]
已复制!
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display display(Image(graph.get_graph().draw_mermaid_png()))
在 [11]
已复制!
graph.invoke({"aggregate": [], "which": "bc"})
graph.invoke({"aggregate": [], "which": "bc"})
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] Adding I'm E to ["I'm A", "I'm B", "I'm C"]
Out[11]
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm E"], 'which': 'bc'}
在 [12]
已复制!
graph.invoke({"aggregate": [], "which": "cd"})
graph.invoke({"aggregate": [], "which": "cd"})
Adding I'm A to [] Adding I'm C to ["I'm A"] Adding I'm D to ["I'm A"] Adding I'm E to ["I'm A", "I'm C", "I'm D"]
Out[12]
{'aggregate': ["I'm A", "I'm C", "I'm D", "I'm E"], 'which': 'cd'}
稳定排序¶
扇出时,节点并行运行为单个 "超级步骤"。来自每个超级步骤的更新都在超级步骤完成后按顺序应用于状态。
如果您需要来自并行超级步骤的一致、预先确定的更新顺序,您应该将输出(以及一个标识键)写入状态中的一个单独字段,然后通过从每个扇出节点到交汇点的常规 edge
将它们组合在 "接收器" 节点中。
例如,假设我想按 "可靠性" 对并行步骤的输出进行排序。
在 [13]
已复制!
import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
def reduce_fanouts(left, right):
if left is None:
left = []
if not right:
# Overwrite
return []
return left + right
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
fanout_values: Annotated[list, reduce_fanouts]
which: str
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
class ParallelReturnNodeValue:
def __init__(
self,
node_secret: str,
reliability: float,
):
self._value = node_secret
self._reliability = reliability
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']} in parallel.")
return {
"fanout_values": [
{
"value": [self._value],
"reliability": self._reliability,
}
]
}
builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.9))
builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.1))
builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.3))
def aggregate_fanout_values(state: State) -> Any:
# Sort by reliability
ranked_values = sorted(
state["fanout_values"], key=lambda x: x["reliability"], reverse=True
)
return {
"aggregate": [x["value"] for x in ranked_values] + ["I'm E"],
"fanout_values": [],
}
builder.add_node("e", aggregate_fanout_values)
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
intermediates = ["b", "c", "d"]
builder.add_conditional_edges("a", route_bc_or_cd, intermediates)
for node in intermediates:
builder.add_edge(node, "e")
builder.add_edge("e", END)
graph = builder.compile()
import operator from typing import Annotated, Sequence from typing_extensions import TypedDict from langgraph.graph import StateGraph def reduce_fanouts(left, right): if left is None: left = [] if not right: # 覆盖返回 [] return left + right class State(TypedDict): # operator.add 归约器函数使这成为仅追加的聚合: Annotated[list, operator.add] fanout_values: Annotated[list, reduce_fanouts] which: str builder = StateGraph(State) builder.add_node("a", ReturnNodeValue("I'm A")) builder.add_edge(START, "a") class ParallelReturnNodeValue: def __init__( self, node_secret: str, reliability: float, ): self._value = node_secret self._reliability = reliability def __call__(self, state: State) -> Any: print(f"Adding {self._value} to {state['aggregate']} in parallel.") return { "fanout_values": [ { "value": [self._value], "reliability": self._reliability, } ] } builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.9)) builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.1)) builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.3)) def aggregate_fanout_values(state: State) -> Any: # 按可靠性排序 ranked_values = sorted( state["fanout_values"], key=lambda x: x["reliability"], reverse=True ) return { "aggregate": [x["value"] for x in ranked_values] + ["I'm E"], "fanout_values": [], } builder.add_node("e", aggregate_fanout_values) def route_bc_or_cd(state: State) -> Sequence[str]: if state["which"] == "cd": return ["c", "d"] return ["b", "c"] intermediates = ["b", "c", "d"] builder.add_conditional_edges("a", route_bc_or_cd, intermediates) for node in intermediates: builder.add_edge(node, "e") builder.add_edge("e", END) graph = builder.compile()
在 [14]
已复制!
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
from IPython.display import Image, display display(Image(graph.get_graph().draw_mermaid_png()))
在 [15]
已复制!
graph.invoke({"aggregate": [], "which": "bc", "fanout_values": []})
graph.invoke({"aggregate": [], "which": "bc", "fanout_values": []})
Adding I'm A to [] Adding I'm B to ["I'm A"] in parallel. Adding I'm C to ["I'm A"] in parallel.
Out[15]
{'aggregate': ["I'm A", ["I'm B"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'bc'}
在 [16]
已复制!
graph.invoke({"aggregate": [], "which": "cd"})
graph.invoke({"aggregate": [], "which": "cd"})
Adding I'm A to [] Adding I'm C to ["I'm A"] in parallel. Adding I'm D to ["I'm A"] in parallel.
Out[16]
{'aggregate': ["I'm A", ["I'm D"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'cd'}