跳到内容

Pregel

Pregel

基类: PregelProtocol

Pregel 管理 LangGraph 应用程序的运行时行为。

概述

Pregel 将 actor通道 组合成一个单独的应用程序。Actor 从通道读取数据并将数据写入通道。Pregel 将应用程序的执行组织成多个步骤,遵循 Pregel 算法/Bulk Synchronous Parallel 模型。

每个步骤包含三个阶段

  • 计划:确定在此步骤中要执行哪些 actor。例如,在第一步中,选择订阅特殊 输入 通道的 actor;在后续步骤中,选择订阅在上一步中更新的通道的 actor
  • 执行:并行执行所有选定的 actor,直到全部完成,或一个失败,或达到超时。在此阶段,通道更新对 actor 不可见,直到下一步。
  • 更新:使用此步骤中 actor 写入的值更新通道。

重复此过程,直到没有 actor 被选择执行,或达到最大步骤数。

Actor

Actor 是一个 PregelNode。它订阅通道,从中读取数据,并将数据写入其中。它可以被认为是 Pregel 算法中的一个 actorPregelNode 实现了 LangChain 的 Runnable 接口。

通道

通道用于在 actor (PregelNode) 之间进行通信。每个通道都有一个值类型、一个更新类型和一个更新函数 —— 它接受一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或将数据从一个链发送到自身以供后续步骤使用。LangGraph 提供了许多内置通道

基本通道:LastValue 和 Topic
  • LastValue:默认通道,存储发送到通道的最后一个值,适用于输入和输出值,或用于在步骤之间发送数据
  • Topic:可配置的 PubSub Topic,适用于在 actor 之间发送多个值,或用于累积输出。可以配置为去重值,和/或在多个步骤的过程中累积值。
高级通道:Context 和 BinaryOperatorAggregate
  • Context:公开上下文管理器的值,管理其生命周期。适用于访问需要设置和/或拆卸的外部资源。例如:client = Context(httpx.Client)
  • BinaryOperatorAggregate:存储一个持久值,通过将当前值和发送到通道的每个更新应用二元运算符来更新,适用于计算多个步骤的聚合。例如:total = BinaryOperatorAggregate(int, operator.add)

示例

大多数用户将通过 StateGraph (Graph API) 或通过 entrypoint (Functional API) 与 Pregel 交互。

但是,对于 高级 用例,可以直接使用 Pregel。如果您不确定是否需要直接使用 Pregel,那么答案可能是否定的 —— 您应该使用 Graph API 或 Functional API 代替。这些是更高级别的接口,将在底层编译为 Pregel。

以下是一些示例,让您了解它的工作原理

单节点应用程序
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
使用多节点和多输出通道
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | Channel.write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
使用 Topic 通道
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}
使用 BinaryOperatorAggregate 通道
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channel


node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)


def reducer(current, update):
    if current:
        return current + " | " + "update"
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"]
)

app.invoke({"a": "foo"})
{'c': 'foofoo | foofoofoofoo'}
引入循环

此示例演示了如何通过让链写入其订阅的通道来在图中引入循环。执行将继续,直到将 None 值写入通道。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry

example_node = (
    Channel.subscribe_to("value")
    | (lambda x: x + x if len(x) < 10 else None)
    | ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"]
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}

stream_mode: StreamMode = stream_mode class-attribute instance-attribute

流式输出的模式,默认为 'values'。

stream_eager: bool = stream_eager class-attribute instance-attribute

是否强制急切地发出流事件,对于 stream_mode "messages" 和 "custom" 自动开启。

stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels class-attribute instance-attribute

要流式传输的通道,默认为不在保留通道中的所有通道

step_timeout: Optional[float] = step_timeout class-attribute instance-attribute

等待步骤完成的最大时间,以秒为单位。默认为 None。

debug: bool = debug if debug is not None else get_debug() instance-attribute

是否在执行期间打印调试信息。默认为 False。

checkpointer: Checkpointer = checkpointer class-attribute instance-attribute

用于保存和加载图状态的检查点。默认为 None。

store: Optional[BaseStore] = store class-attribute instance-attribute

用于 SharedValues 的内存存储。默认为 None。

retry_policy: Optional[RetryPolicy] = retry_policy class-attribute instance-attribute

运行任务时使用的重试策略。设置为 None 以禁用。

get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot

获取图的当前状态。

aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot async

获取图的当前状态。

update_state(config: RunnableConfig, values: Optional[Union[dict[str, Any], Any]], as_node: Optional[str] = None) -> RunnableConfig

使用给定的值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是模棱两可)。

aupdate_state(config: RunnableConfig, values: dict[str, Any] | Any, as_node: Optional[str] = None) -> RunnableConfig async

使用给定的值异步更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是模棱两可)。

stream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> Iterator[Union[dict[str, Any], Any]]

流式传输单个输入的图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行要使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values": 在每个步骤后发出状态中的所有值。当与函数式 API 一起使用时,值在工作流结束时发出一次。
    • "updates": 仅在每个步骤后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将单独发出。
    • "custom": 使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages": 发出 LLM 消息的 token-by-token,以及节点或任务内部任何 LLM 调用相关的元数据。
    • "debug": 发出调试事件,其中包含每个步骤尽可能多的信息。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要中断之前的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要中断之后的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

Yields

  • Union[dict[str, Any], Any]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> def node_a(state: State):
...     response = llm.invoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> AsyncIterator[Union[dict[str, Any], Any]] async

流式传输单个输入的图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行要使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values": 在每个步骤后发出状态中的所有值。当与函数式 API 一起使用时,值在工作流结束时发出一次。
    • "updates": 仅在每个步骤后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将单独发出。
    • "custom": 使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages": 发出 LLM 消息的 token-by-token,以及节点或任务内部任何 LLM 调用相关的元数据。
    • "debug": 发出调试事件,其中包含每个步骤尽可能多的信息。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要中断之前的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要中断之后的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

Yields

  • AsyncIterator[Union[dict[str, Any], Any]]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> async def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> async def node_a(state: State):
...     response = await llm.ainvoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]

使用单个输入和配置运行图。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。图运行的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]。图运行的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。要从图运行中检索的输出键。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在图运行之前中断的节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在图运行之后中断的节点。

  • debug (Optional[bool], default: None ) –

    可选。启用图运行的调试模式。

  • **kwargs (Any, default: {} ) –

    要传递给图运行的其他关键字参数。

Returns

  • Union[dict[str, Any], Any]

    图运行的输出。如果 stream_mode 为 "values",则返回最新的输出。

  • Union[dict[str, Any], Any]

    如果 stream_mode 不是 "values",则返回输出块列表。

ainvoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any] async

在单个输入上异步调用图。

参数

  • input (Union[dict[str, Any], Any]) –

    计算的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。计算的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    可选。计算的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。结果中要包含的输出键。默认为 None。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要中断之前的节点。默认为 None。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要中断之后的节点。默认为 None。

  • debug (Optional[bool], default: None ) –

    可选。是否启用调试模式。默认为 None。

  • **kwargs (Any, default: {} ) –

    其他关键字参数。

Returns

  • Union[dict[str, Any], Any]

    计算的结果。如果 stream_mode 为 "values",则返回最新值。

  • Union[dict[str, Any], Any]

    如果 stream_mode 为 "chunks",则返回块列表。

PregelNode

基类: Runnable

Pregel 图中的一个节点。这不会被图本身作为 runnable 调用,而是作为容纳为节点创建 PregelExecutableTask 所需组件的容器。

channels: Union[list[str], Mapping[str, str]] = channels instance-attribute

将作为输入传递给 bound 的通道。如果是列表,则将使用第一个非空的列表调用节点。如果是字典,则键是通道的名称,值是在 bound 的输入中使用的键。

triggers: list[str] = list(triggers) instance-attribute

如果写入这些通道中的任何一个,则将在下一步中触发此节点。

mapper: Optional[Callable[[Any], Any]] = mapper instance-attribute

在将其传递给 bound 之前转换输入的函数。

writers: list[Runnable] = writers or [] instance-attribute

将在 bound 之后执行的写入器列表,负责获取 bound 的输出并将其写入适当的通道。

bound: Runnable[Any, Any] = bound if bound is not None else DEFAULT_BOUND instance-attribute

节点的主要逻辑。这将使用来自 channels 的输入调用。

retry_policy: Optional[RetryPolicy] = retry_policy instance-attribute

调用节点时使用的重试策略。

tags: Optional[Sequence[str]] = tags instance-attribute

要附加到节点的标签,用于追踪。

metadata: Optional[Mapping[str, Any]] = metadata instance-attribute

附加到节点的元数据,用于追踪。

subgraphs: Sequence[PregelProtocol] instance-attribute

节点使用的子图。

flat_writers: list[Runnable] cached property

获取应用了优化的写入器。去除连续的 ChannelWrites。

node: Optional[Runnable[Any, Any]] cached property

获取一个组合了 boundwriters 的 runnable。

评论