跳至内容

类型

All = Literal['*'] 模块属性

表示图应在所有节点上中断的特殊值。

StreamMode = Literal['values', 'updates', 'debug', 'messages', 'custom'] 模块属性

流方法应如何发出输出。

  • 'values':为每个步骤发出状态的所有值。
  • 'updates':仅发出节点名称和**在**每个步骤**之后**由节点返回的更新。
  • 'debug':为每个步骤发出调试事件。
  • 'messages':逐个标记发出 LLM 消息。
  • 'custom':发出自定义输出,每个节点的 write: StreamWriter 关键字参数。

StreamWriter = Callable[[Any], None] 模块属性

可调用对象,接受单个参数并将其写入输出流。如果作为关键字参数请求,则始终注入到节点中,但在不使用 stream_mode="custom" 时,它是一个无操作。

RetryPolicy

基类:NamedTuple

重试节点的配置。

源代码位于 libs/langgraph/langgraph/types.py
class RetryPolicy(NamedTuple):
    """Configuration for retrying nodes."""

    initial_interval: float = 0.5
    """Amount of time that must elapse before the first retry occurs. In seconds."""
    backoff_factor: float = 2.0
    """Multiplier by which the interval increases after each retry."""
    max_interval: float = 128.0
    """Maximum amount of time that may elapse between retries. In seconds."""
    max_attempts: int = 3
    """Maximum number of attempts to make before giving up, including the first."""
    jitter: bool = True
    """Whether to add random jitter to the interval between retries."""
    retry_on: Union[
        Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]
    ] = default_retry_on
    """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry."""

initial_interval: float = 0.5 类属性 实例属性

第一次重试发生前必须经过的时间。以秒为单位。

backoff_factor: float = 2.0 类属性 实例属性

每次重试后间隔增加的倍数。

max_interval: float = 128.0 类属性 实例属性

重试之间可能经过的最大时间。以秒为单位。

max_attempts: int = 3 类属性 实例属性

在放弃之前尝试的最大次数,包括第一次尝试。

jitter: bool = True 类属性 实例属性

是否在重试之间的间隔中添加随机抖动。

retry_on: Union[Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]] = default_retry_on 类属性 实例属性

应触发重试的异常类的列表,或一个对于应触发重试的异常返回 True 的可调用对象。

CachePolicy

基类:NamedTuple

缓存节点的配置。

源代码位于 libs/langgraph/langgraph/types.py
class CachePolicy(NamedTuple):
    """Configuration for caching nodes."""

    pass

Interrupt 数据类

源代码位于 libs/langgraph/langgraph/types.py
@dataclass
class Interrupt:
    value: Any
    when: Literal["during"] = "during"

PregelTask

基类:NamedTuple

源代码位于 libs/langgraph/langgraph/types.py
class PregelTask(NamedTuple):
    id: str
    name: str
    path: tuple[Union[str, int], ...]
    error: Optional[Exception] = None
    interrupts: tuple[Interrupt, ...] = ()
    state: Union[None, RunnableConfig, "StateSnapshot"] = None

PregelExecutableTask

基类:NamedTuple

源代码位于 libs/langgraph/langgraph/types.py
class PregelExecutableTask(NamedTuple):
    name: str
    input: Any
    proc: Runnable
    writes: deque[tuple[str, Any]]
    config: RunnableConfig
    triggers: list[str]
    retry_policy: Optional[RetryPolicy]
    cache_policy: Optional[CachePolicy]
    id: str
    path: tuple[Union[str, int], ...]
    scheduled: bool = False

StateSnapshot

基类:NamedTuple

图在步骤开始时的状态快照。

源代码位于 libs/langgraph/langgraph/types.py
class StateSnapshot(NamedTuple):
    """Snapshot of the state of the graph at the beginning of a step."""

    values: Union[dict[str, Any], Any]
    """Current values of channels"""
    next: tuple[str, ...]
    """The name of the node to execute in each task for this step."""
    config: RunnableConfig
    """Config used to fetch this snapshot"""
    metadata: Optional[CheckpointMetadata]
    """Metadata associated with this snapshot"""
    created_at: Optional[str]
    """Timestamp of snapshot creation"""
    parent_config: Optional[RunnableConfig]
    """Config used to fetch the parent snapshot, if any"""
    tasks: tuple[PregelTask, ...]
    """Tasks to execute in this step. If already attempted, may contain an error."""

values: Union[dict[str, Any], Any] 实例属性

通道的当前值

next: tuple[str, ...] 实例属性

在此步骤中每个任务要执行的节点的名称。

config: RunnableConfig 实例属性

用于获取此快照的配置

metadata: Optional[CheckpointMetadata] 实例属性

与此快照关联的元数据

created_at: Optional[str] 实例属性

快照创建时间戳

parent_config: Optional[RunnableConfig] 实例属性

用于获取父快照(如果有)的配置

tasks: tuple[PregelTask, ...] 实例属性

在此步骤中要执行的任务。如果已尝试,则可能包含错误。

Send

要发送到图中特定节点的消息或数据包。

Send 类用于 StateGraph 的条件边,以在下一步动态调用具有自定义状态的节点。

重要的是,发送的状态可能与核心图的状态不同,从而允许灵活和动态的工作流管理。

一个这样的例子是“map-reduce”工作流,其中您的图并行多次调用同一个节点,并使用不同的状态,然后将结果聚合回主图的状态。

属性

  • node (str) –

    要发送消息的目标节点的名称。

  • arg (Any) –

    要发送到目标节点的状态或消息。

示例

>>> from typing import Annotated
>>> import operator
>>> class OverallState(TypedDict):
...     subjects: list[str]
...     jokes: Annotated[list[str], operator.add]
...
>>> from langgraph.types import Send
>>> from langgraph.graph import END, START
>>> def continue_to_jokes(state: OverallState):
...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
...
>>> from langgraph.graph import StateGraph
>>> builder = StateGraph(OverallState)
>>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
>>> builder.add_conditional_edges(START, continue_to_jokes)
>>> builder.add_edge("generate_joke", END)
>>> graph = builder.compile()
>>>
>>> # Invoking with two subjects results in a generated joke for each
>>> graph.invoke({"subjects": ["cats", "dogs"]})
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
源代码位于 libs/langgraph/langgraph/types.py
class Send:
    """A message or packet to send to a specific node in the graph.

    The `Send` class is used within a `StateGraph`'s conditional edges to
    dynamically invoke a node with a custom state at the next step.

    Importantly, the sent state can differ from the core graph's state,
    allowing for flexible and dynamic workflow management.

    One such example is a "map-reduce" workflow where your graph invokes
    the same node multiple times in parallel with different states,
    before aggregating the results back into the main graph's state.

    Attributes:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.

    Examples:
        >>> from typing import Annotated
        >>> import operator
        >>> class OverallState(TypedDict):
        ...     subjects: list[str]
        ...     jokes: Annotated[list[str], operator.add]
        ...
        >>> from langgraph.types import Send
        >>> from langgraph.graph import END, START
        >>> def continue_to_jokes(state: OverallState):
        ...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
        ...
        >>> from langgraph.graph import StateGraph
        >>> builder = StateGraph(OverallState)
        >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
        >>> builder.add_conditional_edges(START, continue_to_jokes)
        >>> builder.add_edge("generate_joke", END)
        >>> graph = builder.compile()
        >>>
        >>> # Invoking with two subjects results in a generated joke for each
        >>> graph.invoke({"subjects": ["cats", "dogs"]})
        {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
    """

    __slots__ = ("node", "arg")

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        """
        Initialize a new instance of the Send class.

        Args:
            node (str): The name of the target node to send the message to.
            arg (Any): The state or message to send to the target node.
        """
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

__init__(node: str, arg: Any) -> None

初始化 Send 类的新实例。

参数

  • node (str) –

    要发送消息的目标节点的名称。

  • arg (Any) –

    要发送到目标节点的状态或消息。

源代码位于 libs/langgraph/langgraph/types.py
def __init__(self, /, node: str, arg: Any) -> None:
    """
    Initialize a new instance of the Send class.

    Args:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.
    """
    self.node = node
    self.arg = arg

注释