跳至内容

通道

BaseChannel

基类: Generic[Value, Update, C], ABC

源代码位于 libs/langgraph/langgraph/channels/base.py
class BaseChannel(Generic[Value, Update, C], ABC):
    __slots__ = ("key", "typ")

    def __init__(self, typ: Type[Any], key: str = "") -> None:
        self.typ = typ
        self.key = key

    @property
    @abstractmethod
    def ValueType(self) -> Any:
        """The type of the value stored in the channel."""

    @property
    @abstractmethod
    def UpdateType(self) -> Any:
        """The type of the update received by the channel."""

    # serialize/deserialize methods

    def checkpoint(self) -> Optional[C]:
        """Return a serializable representation of the channel's current state.
        Raises EmptyChannelError if the channel is empty (never updated yet),
        or doesn't support checkpoints."""
        return self.get()

    @abstractmethod
    def from_checkpoint(self, checkpoint: Optional[C]) -> Self:
        """Return a new identical channel, optionally initialized from a checkpoint.
        If the checkpoint contains complex data structures, they should be copied."""

    # state methods

    @abstractmethod
    def update(self, values: Sequence[Update]) -> bool:
        """Update the channel's value with the given sequence of updates.
        The order of the updates in the sequence is arbitrary.
        This method is called by Pregel for all channels at the end of each step.
        If there are no updates, it is called with an empty sequence.
        Raises InvalidUpdateError if the sequence of updates is invalid.
        Returns True if the channel was updated, False otherwise."""

    @abstractmethod
    def get(self) -> Value:
        """Return the current value of the channel.

        Raises EmptyChannelError if the channel is empty (never updated yet)."""

    def consume(self) -> bool:
        """Mark the current value of the channel as consumed. By default, no-op.
        This is called by Pregel before the start of the next step, for all
        channels that triggered a node. If the channel was updated, return True.
        """
        return False

ValueType: Any abstractmethod property

通道中存储的值的类型。

UpdateType: Any abstractmethod property

通道接收的更新的类型。

checkpoint() -> Optional[C]

返回通道当前状态的可序列化表示形式。如果通道为空(尚未更新)或不支持检查点,则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

from_checkpoint(checkpoint: Optional[C]) -> Self abstractmethod

返回一个新的相同通道,可以选择从检查点初始化。如果检查点包含复杂的数据结构,则应复制它们。

源代码位于 libs/langgraph/langgraph/channels/base.py
@abstractmethod
def from_checkpoint(self, checkpoint: Optional[C]) -> Self:
    """Return a new identical channel, optionally initialized from a checkpoint.
    If the checkpoint contains complex data structures, they should be copied."""

update(values: Sequence[Update]) -> bool abstractmethod

使用给定的更新序列更新通道的值。序列中更新的顺序是任意的。Pregel 在每个步骤结束时为所有通道调用此方法。如果没有更新,则使用空序列调用它。如果更新序列无效,则引发 InvalidUpdateError。如果通道已更新,则返回 True,否则返回 False。

源代码位于 libs/langgraph/langgraph/channels/base.py
@abstractmethod
def update(self, values: Sequence[Update]) -> bool:
    """Update the channel's value with the given sequence of updates.
    The order of the updates in the sequence is arbitrary.
    This method is called by Pregel for all channels at the end of each step.
    If there are no updates, it is called with an empty sequence.
    Raises InvalidUpdateError if the sequence of updates is invalid.
    Returns True if the channel was updated, False otherwise."""

get() -> Value abstractmethod

返回通道的当前值。

如果通道为空(尚未更新),则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
@abstractmethod
def get(self) -> Value:
    """Return the current value of the channel.

    Raises EmptyChannelError if the channel is empty (never updated yet)."""

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

Topic

基类: Generic[Value], BaseChannel[Sequence[Value], Union[Value, list[Value]], tuple[set[Value], list[Value]]]

一个可配置的发布/订阅主题。

参数

  • typ (Type[Value]) –

    通道中存储的值的类型。

  • accumulate (bool, 默认值: False ) –

    是否跨步骤累积值。如果为 False,则通道将在每个步骤后清空。

源代码位于 libs/langgraph/langgraph/channels/topic.py
class Topic(
    Generic[Value],
    BaseChannel[
        Sequence[Value], Union[Value, list[Value]], tuple[set[Value], list[Value]]
    ],
):
    """A configurable PubSub Topic.

    Args:
        typ: The type of the value stored in the channel.
        accumulate: Whether to accumulate values across steps. If False, the channel will be emptied after each step.
    """

    __slots__ = ("values", "accumulate")

    def __init__(self, typ: Type[Value], accumulate: bool = False) -> None:
        super().__init__(typ)
        # attrs
        self.accumulate = accumulate
        # state
        self.values = list[Value]()

    def __eq__(self, value: object) -> bool:
        return isinstance(value, Topic) and value.accumulate == self.accumulate

    @property
    def ValueType(self) -> Any:
        """The type of the value stored in the channel."""
        return Sequence[self.typ]  # type: ignore[name-defined]

    @property
    def UpdateType(self) -> Any:
        """The type of the update received by the channel."""
        return Union[self.typ, list[self.typ]]  # type: ignore[name-defined]

    def checkpoint(self) -> tuple[set[Value], list[Value]]:
        return self.values

    def from_checkpoint(self, checkpoint: Optional[list[Value]]) -> Self:
        empty = self.__class__(self.typ, self.accumulate)
        empty.key = self.key
        if checkpoint is not None:
            if isinstance(checkpoint, tuple):
                empty.values = checkpoint[1]
            else:
                empty.values = checkpoint
        return empty

    def update(self, values: Sequence[Union[Value, list[Value]]]) -> None:
        current = list(self.values)
        if not self.accumulate:
            self.values = list[Value]()
        if flat_values := flatten(values):
            self.values.extend(flat_values)
        return self.values != current

    def get(self) -> Sequence[Value]:
        if self.values:
            return list(self.values)
        else:
            raise EmptyChannelError

ValueType: Any property

通道中存储的值的类型。

UpdateType: Any property

通道接收的更新的类型。

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

LastValue

基类: Generic[Value], BaseChannel[Value, Value, Value]

存储接收到的最后一个值,每个步骤最多可以接收一个值。

源代码位于 libs/langgraph/langgraph/channels/last_value.py
class LastValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the last value received, can receive at most one value per step."""

    __slots__ = ("value",)

    def __eq__(self, value: object) -> bool:
        return isinstance(value, LastValue)

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            return False
        if len(values) != 1:
            raise InvalidUpdateError(
                f"At key '{self.key}': Can receive only one value per step. Use an Annotated key to handle multiple values."
            )

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

通道中存储的值的类型。

UpdateType: Type[Value] property

通道接收的更新的类型。

checkpoint() -> Optional[C]

返回通道当前状态的可序列化表示形式。如果通道为空(尚未更新)或不支持检查点,则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

EphemeralValue

基类: Generic[Value], BaseChannel[Value, Value, Value]

存储在紧接其前的步骤中接收到的值,之后清除。

源代码位于 libs/langgraph/langgraph/channels/ephemeral_value.py
class EphemeralValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the value received in the step immediately preceding, clears after."""

    __slots__ = ("value", "guard")

    def __init__(self, typ: Any, guard: bool = True) -> None:
        super().__init__(typ)
        self.guard = guard

    def __eq__(self, value: object) -> bool:
        return isinstance(value, EphemeralValue) and value.guard == self.guard

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ, self.guard)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            try:
                del self.value
                return True
            except AttributeError:
                return False
        if len(values) != 1 and self.guard:
            raise InvalidUpdateError(
                f"At key '{self.key}': EphemeralValue(guard=True) can receive only one value per step. Use guard=False if you want to store any one of multiple values."
            )

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

通道中存储的值的类型。

UpdateType: Type[Value] property

通道接收的更新的类型。

checkpoint() -> Optional[C]

返回通道当前状态的可序列化表示形式。如果通道为空(尚未更新)或不支持检查点,则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

BinaryOperatorAggregate

基类: Generic[Value], BaseChannel[Value, Value, Value]

存储将二元运算符应用于当前值和每个新值的计算结果。

import operator

total = Channels.BinaryOperatorAggregate(int, operator.add)
源代码位于 libs/langgraph/langgraph/channels/binop.py
class BinaryOperatorAggregate(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the result of applying a binary operator to the current value and each new value.

    ```python
    import operator

    total = Channels.BinaryOperatorAggregate(int, operator.add)
    ```
    """

    __slots__ = ("value", "operator")

    def __init__(self, typ: Type[Value], operator: Callable[[Value, Value], Value]):
        super().__init__(typ)
        self.operator = operator
        # special forms from typing or collections.abc are not instantiable
        # so we need to replace them with their concrete counterparts
        typ = _strip_extras(typ)
        if typ in (collections.abc.Sequence, collections.abc.MutableSequence):
            typ = list
        if typ in (collections.abc.Set, collections.abc.MutableSet):
            typ = set
        if typ in (collections.abc.Mapping, collections.abc.MutableMapping):
            typ = dict
        try:
            self.value = typ()
        except Exception:
            pass

    def __eq__(self, value: object) -> bool:
        return isinstance(value, BinaryOperatorAggregate) and (
            value.operator is self.operator
            if value.operator.__name__ != "<lambda>"
            and self.operator.__name__ != "<lambda>"
            else True
        )

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ, self.operator)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if not values:
            return False
        if not hasattr(self, "value"):
            self.value = values[0]
            values = values[1:]
        for value in values:
            self.value = self.operator(self.value, value)
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

通道中存储的值的类型。

UpdateType: Type[Value] property

通道接收的更新的类型。

checkpoint() -> Optional[C]

返回通道当前状态的可序列化表示形式。如果通道为空(尚未更新)或不支持检查点,则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

AnyValue

基类: Generic[Value], BaseChannel[Value, Value, Value]

存储接收到的最后一个值,假设如果接收了多个值,则它们都相等。

源代码位于 libs/langgraph/langgraph/channels/any_value.py
class AnyValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the last value received, assumes that if multiple values are
    received, they are all equal."""

    __slots__ = ("typ", "value")

    def __eq__(self, value: object) -> bool:
        return isinstance(value, AnyValue)

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            try:
                del self.value
                return True
            except AttributeError:
                return False

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] 属性

通道中存储的值的类型。

UpdateType: Type[Value] 属性

通道接收的更新的类型。

checkpoint() -> Optional[C]

返回通道当前状态的可序列化表示形式。如果通道为空(尚未更新)或不支持检查点,则引发 EmptyChannelError。

源代码位于 libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

将通道的当前值标记为已使用。默认情况下,无操作。Pregel 在下一步骤开始之前,为所有触发节点的通道调用此方法。如果通道已更新,则返回 True。

源代码位于 libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

注释