跳到内容

检查点保存器

名称 描述
CheckpointMetadata

与检查点关联的元数据。

检查点

在给定时间点的状态快照。

BaseCheckpointSaver

创建图检查点保存器的基类。

函数

名称 描述
create_checkpoint

为给定的通道创建检查点。

CheckpointMetadata

基础类: TypedDict

与检查点关联的元数据。

属性

名称 类型 描述
source Literal['input', 'loop', 'update', 'fork']

检查点的来源。

step 整数

检查点的步骤编号。

parents dict[str, str]

父检查点的 ID。

source instance-attribute

source: Literal['input', 'loop', 'update', 'fork']

检查点的来源。

  • "input": 检查点是根据 invoke/stream/batch 的输入创建的。
  • "loop": 检查点是在 pregel 循环内部创建的。
  • "update": 检查点是通过手动更新状态创建的。
  • "fork": 检查点是作为另一个检查点的副本创建的。

step instance-attribute

step: int

检查点的步骤编号。

第一个 "input" 检查点为 -1。第一个 "loop" 检查点为 0。之后的第 n 个检查点为 ...。

parents instance-attribute

parents: dict[str, str]

父检查点的 ID。

从检查点命名空间到检查点 ID 的映射。

Checkpoint

基础类: TypedDict

在给定时间点的状态快照。

属性

名称 类型 描述
v 整数

检查点格式的版本。当前为 1。

id str

检查点的 ID。此 ID 既唯一又单调递增。

ts str

检查点的时间戳,格式为 ISO 8601。

channel_values dict[str, Any]

检查点创建时各通道的值。

channel_versions ChannelVersions

检查点创建时各通道的版本。

versions_seen dict[str, ChannelVersions]

从节点 ID 到通道名称再到所见版本的映射。

v instance-attribute

v: int

检查点格式的版本。当前为 1。

id instance-attribute

id: str

检查点的 ID。此 ID 既唯一又单调递增,因此可用于对检查点从头到尾进行排序。

ts instance-attribute

ts: str

检查点的时间戳,格式为 ISO 8601。

channel_values instance-attribute

channel_values: dict[str, Any]

检查点创建时各通道的值。从通道名称到反序列化的通道快照值的映射。

channel_versions instance-attribute

channel_versions: ChannelVersions

检查点创建时各通道的版本。键是通道名称,值是每个通道的单调递增版本字符串。

versions_seen instance-attribute

versions_seen: dict[str, ChannelVersions]

从节点 ID 到通道名称再到所见版本的映射。这用于跟踪每个节点已见过的通道版本,以确定接下来要执行哪些节点。

BaseCheckpointSaver

Bases: Generic[V]

创建图检查点保存器的基类。

检查点保存器允许 LangGraph 代理在多次交互内部和之间持久化其状态。

属性

名称 类型 描述
serde SerializerProtocol

用于编码/解码检查点的序列化器。

注意

创建自定义检查点保存器时,请考虑实现异步版本以避免阻塞主线程。

方法

名称 描述
获取

使用给定的配置获取检查点。

get_tuple

使用给定的配置获取检查点元组。

list

列出符合给定条件的检查点。

放置

存储检查点及其配置和元数据。

put_writes

存储链接到检查点的中间写入。

delete_thread

删除与特定线程 ID 关联的所有检查点和写入操作。

异步获取

使用给定的配置异步获取检查点。

aget_tuple

使用给定的配置异步获取检查点元组。

alist

异步列出符合给定条件的检查点。

aput

异步存储检查点及其配置和元数据。

aput_writes

异步存储链接到检查点的中间写入操作。

adelete_thread

删除与特定线程 ID 关联的所有检查点和写入操作。

get_next_version

为通道生成下一个版本 ID。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

使用给定的配置获取检查点元组。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:请求的检查点元组,如果未找到则为 None。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

列出符合给定条件的检查点。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

额外的筛选条件。

None
before RunnableConfig | None

列出在此配置之前创建的检查点。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
Iterator[CheckpointTuple]

Iterator[CheckpointTuple]:匹配的检查点元组的迭代器。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

存储检查点及其配置和元数据。

参数

名称 类型 描述 默认值
config RunnableConfig

检查点的配置。

必填
checkpoint 检查点

要存储的检查点。

必填
metadata CheckpointMetadata

检查点的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

存储链接到检查点的中间写入。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

delete_thread

delete_thread(thread_id: str) -> None

删除与特定线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

应删除其检查点的线程 ID。

必填

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

使用给定的配置异步获取检查点元组。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:请求的检查点元组,如果未找到则为 None。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

异步列出符合给定条件的检查点。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

列出在此配置之前创建的检查点。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
AsyncIterator[CheckpointTuple]

AsyncIterator[CheckpointTuple]:匹配的检查点元组的异步迭代器。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

异步存储检查点及其配置和元数据。

参数

名称 类型 描述 默认值
config RunnableConfig

检查点的配置。

必填
checkpoint 检查点

要存储的检查点。

必填
metadata CheckpointMetadata

检查点的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

异步存储链接到检查点的中间写入操作。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与特定线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

应删除其检查点的线程 ID。

必填

get_next_version

get_next_version(current: V | None, channel: None) -> V

为通道生成下一个版本 ID。

默认为使用整数版本,每次递增 1。如果你重写此方法,可以使用字符串/整数/浮点数版本,只要它们是单调递增的即可。

参数

名称 类型 描述 默认值
current V | None

当前版本标识符(int、float 或 str)。

必填
channel None

已弃用的参数,为向后兼容而保留。

必填

返回

名称 类型 描述
V V

下一个版本标识符,必须是递增的。

create_checkpoint

create_checkpoint(
    checkpoint: Checkpoint,
    channels: Mapping[str, ChannelProtocol] | None,
    step: int,
    *,
    id: str | None = None
) -> Checkpoint

为给定的通道创建检查点。

名称 描述
SerializerProtocol

用于对象序列化和反序列化的协议。

CipherProtocol

用于数据加密和解密的协议。

SerializerProtocol

Bases: UntypedSerializerProtocol, Protocol

用于对象序列化和反序列化的协议。

  • dumps:将对象序列化为字节。
  • dumps_typed:将对象序列化为元组(类型,字节)。
  • loads:从字节反序列化对象。
  • loads_typed:从元组(类型,字节)反序列化对象。

有效的实现包括 `pickle`、`json` 和 `orjson` 模块。

CipherProtocol

Bases: Protocol

用于数据加密和解密的协议。- `encrypt`:加密明文。- `decrypt`:解密密文。

方法

名称 描述
encrypt

加密明文。返回一个元组(密码名称,密文)。

decrypt

解密密文。返回明文。

encrypt

encrypt(plaintext: bytes) -> tuple[str, bytes]

加密明文。返回一个元组(密码名称,密文)。

decrypt

decrypt(ciphername: str, ciphertext: bytes) -> bytes

解密密文。返回明文。

名称 描述
JsonPlusSerializer

使用 ormsgpack 的序列化器,并备有扩展 JSON 序列化器作为后备。

JsonPlusSerializer

Bases: SerializerProtocol

使用 ormsgpack 的序列化器,并备有扩展 JSON 序列化器作为后备。

名称 描述
EncryptedSerializer

使用加密协议对数据进行加密和解密的序列化器。

EncryptedSerializer

Bases: SerializerProtocol

使用加密协议对数据进行加密和解密的序列化器。

方法

名称 描述
dumps_typed

将对象序列化为元组(类型,字节)并加密字节。

from_pycryptodome_aes

使用 AES 加密创建一个 EncryptedSerializer。

dumps_typed

dumps_typed(obj: Any) -> tuple[str, bytes]

将对象序列化为元组(类型,字节)并加密字节。

from_pycryptodome_aes classmethod

from_pycryptodome_aes(
    serde: SerializerProtocol = JsonPlusSerializer(),
    **kwargs: Any
) -> EncryptedSerializer

使用 AES 加密创建一个 EncryptedSerializer。

名称 描述
InMemorySaver

一个内存中的检查点保存器。

PersistentDict

具有与 shelve 和 anydbm 兼容的 API 的持久化字典。

InMemorySaver

Bases: BaseCheckpointSaver[str], AbstractContextManager, AbstractAsyncContextManager

一个内存中的检查点保存器。

此检查点保存器使用 defaultdict 将检查点存储在内存中。

注意

仅在调试或测试目的时使用 `InMemorySaver`。对于生产用例,我们建议安装 langgraph-checkpoint-postgres 并使用 `PostgresSaver` / `AsyncPostgresSaver`。

如果您正在使用 LangGraph 平台,则无需指定检查点保存器。正确的托管检查点保存器将自动使用。

参数

名称 类型 描述 默认值
serde SerializerProtocol | None

用于序列化和反序列化检查点的序列化器。默认为 None。

None

示例

    import asyncio

    from langgraph.checkpoint.memory import InMemorySaver
    from langgraph.graph import StateGraph

    builder = StateGraph(int)
    builder.add_node("add_one", lambda x: x + 1)
    builder.set_entry_point("add_one")
    builder.set_finish_point("add_one")

    memory = InMemorySaver()
    graph = builder.compile(checkpointer=memory)
    coro = graph.ainvoke(1, {"configurable": {"thread_id": "thread-1"}})
    asyncio.run(coro)  # Output: 2

方法

名称 描述
get_tuple

从内存存储中获取检查点元组。

list

从内存存储中列出检查点。

放置

将检查点保存到内存存储中。

put_writes

将写入操作列表保存到内存存储中。

delete_thread

删除与线程 ID 关联的所有检查点和写入操作。

aget_tuple

get_tuple 的异步版本。

alist

list 的异步版本。

aput

put 的异步版本。

aput_writes

put_writes 的异步版本。

adelete_thread

删除与线程 ID 关联的所有检查点和写入操作。

获取

使用给定的配置获取检查点。

异步获取

使用给定的配置异步获取检查点。

属性

名称 类型 描述
config_specs list

定义检查点保存器的配置选项。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

从内存存储中获取检查点元组。

此方法根据提供的配置从内存存储中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和时间戳的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

从内存存储中列出检查点。

此方法根据提供的条件从内存存储中检索检查点元组列表。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

列出在此配置之前创建的检查点。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
CheckpointTuple

Iterator[CheckpointTuple]:匹配的检查点元组的迭代器。

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

将检查点保存到内存存储中。

此方法将检查点保存到内存存储中。检查点与提供的配置关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新版本

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

包含已保存检查点时间戳的更新配置。

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

将写入操作列表保存到内存存储中。

此方法将写入操作列表保存到内存存储中。写入操作与提供的配置关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与写入操作关联的配置。

必填
writes Sequence[tuple[str, Any]]

要保存的写入操作。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

返回

名称 类型 描述
RunnableConfig None

包含已保存写入操作时间戳的更新配置。

delete_thread

delete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

get_tuple 的异步版本。

此方法是 get_tuple 的异步包装器,它使用 asyncio 在单独的线程中运行同步方法。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

list 的异步版本。

此方法是 list 的异步包装器,它使用 asyncio 在单独的线程中运行同步方法。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于列出检查点的配置。

必填

返回

类型 描述
AsyncIterator[CheckpointTuple]

AsyncIterator[CheckpointTuple]:检查点元组的异步迭代器。

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

put 的异步版本。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新版本

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

包含已保存检查点时间戳的更新配置。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

put_writes 的异步版本。

此方法是 put_writes 的异步包装器,它使用 asyncio 在单独的线程中运行同步方法。

参数

名称 类型 描述 默认值
config RunnableConfig

要与写入操作关联的配置。

必填
writes Sequence[tuple[str, Any]]

要保存的写入操作,每个都为 (channel, value) 对。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

返回

类型 描述
None

None

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

PersistentDict

Bases: defaultdict

具有与 shelve 和 anydbm 兼容的 API 的持久化字典。

字典保存在内存中,因此字典操作的运行速度与常规字典一样快。

写入磁盘的操作会延迟到 close 或 sync 时(类似于 gdbm 的快速模式)。

输入文件格式会自动发现。输出文件格式可在 pickle、json 和 csv 之间选择。所有三种序列化格式都由快速的 C 语言实现支持。

改编自 https://code.activestate.com/recipes/576642-persistent-dict-with-multiple-standard-file-format/

方法

名称 描述
sync

将字典写入磁盘

sync

sync() -> None

将字典写入磁盘

模块

名称 描述
aio
utils

名称 描述
SqliteSaver

一个将检查点存储在 SQLite 数据库中的检查点保存器。

SqliteSaver

Bases: BaseCheckpointSaver[str]

一个将检查点存储在 SQLite 数据库中的检查点保存器。

注意

此类适用于轻量级的同步用例(演示和小项目),并且不能扩展到多线程。对于支持 `async` 的类似 sqlite 保存器,请考虑使用 AsyncSqliteSaver

参数

名称 类型 描述 默认值
conn Connection

SQLite 数据库连接。

必填
serde Optional[SerializerProtocol]

用于序列化和反序列化检查点的序列化器。默认为 JsonPlusSerializerCompat。

None

示例

>>> import sqlite3
>>> from langgraph.checkpoint.sqlite import SqliteSaver
>>> from langgraph.graph import StateGraph
>>>
>>> builder = StateGraph(int)
>>> builder.add_node("add_one", lambda x: x + 1)
>>> builder.set_entry_point("add_one")
>>> builder.set_finish_point("add_one")
>>> # Create a new SqliteSaver instance
>>> # Note: check_same_thread=False is OK as the implementation uses a lock
>>> # to ensure thread safety.
>>> conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
>>> memory = SqliteSaver(conn)
>>> graph = builder.compile(checkpointer=memory)
>>> config = {"configurable": {"thread_id": "1"}}
>>> graph.get_state(config)
>>> result = graph.invoke(3, config)
>>> graph.get_state(config)
StateSnapshot(values=4, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '0c62ca34-ac19-445d-bbb0-5b4984975b2a'}}, parent_config=None)

方法

名称 描述
from_conn_string

从连接字符串创建一个新的 SqliteSaver 实例。

setup

设置检查点数据库。

cursor

获取 SQLite 数据库的游标。

get_tuple

从数据库中获取检查点元组。

list

从数据库中列出检查点。

放置

将检查点保存到数据库。

put_writes

存储链接到检查点的中间写入。

delete_thread

删除与线程 ID 关联的所有检查点和写入操作。

aget_tuple

异步地从数据库中获取检查点元组。

alist

异步地从数据库中列出检查点。

aput

异步地将检查点保存到数据库。

get_next_version

为通道生成下一个版本 ID。

获取

使用给定的配置获取检查点。

异步获取

使用给定的配置异步获取检查点。

aput_writes

异步存储链接到检查点的中间写入操作。

adelete_thread

删除与特定线程 ID 关联的所有检查点和写入操作。

属性

名称 类型 描述
config_specs list

定义检查点保存器的配置选项。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

from_conn_string classmethod

from_conn_string(conn_string: str) -> Iterator[SqliteSaver]

从连接字符串创建一个新的 SqliteSaver 实例。

参数

名称 类型 描述 默认值
conn_string str

SQLite 连接字符串。

必填

返回

名称 类型 描述
SqliteSaver SqliteSaver

一个新的 SqliteSaver 实例。

示例

In memory:

    with SqliteSaver.from_conn_string(":memory:") as memory:
        ...

To disk:

    with SqliteSaver.from_conn_string("checkpoints.sqlite") as memory:
        ...

setup

setup() -> None

设置检查点数据库。

此方法会在 SQLite 数据库中创建必要的表(如果它们尚不存在)。它在需要时会自动调用,用户不应直接调用。

cursor

cursor(transaction: bool = True) -> Iterator[Cursor]

获取 SQLite 数据库的游标。

此方法返回 SQLite 数据库的游标。它由 SqliteSaver 内部使用,用户不应直接调用。

参数

名称 类型 描述 默认值
transaction bool

游标关闭时是否提交事务。默认为 True。

True

返回

类型 描述
Cursor

sqlite3.Cursor: SQLite 数据库的游标。

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

从数据库中获取检查点元组。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和检查点 ID 的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

示例

Basic:
>>> config = {"configurable": {"thread_id": "1"}}
>>> checkpoint_tuple = memory.get_tuple(config)
>>> print(checkpoint_tuple)
CheckpointTuple(...)

With checkpoint ID:

>>> config = {
...    "configurable": {
...        "thread_id": "1",
...        "checkpoint_ns": "",
...        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875",
...    }
... }
>>> checkpoint_tuple = memory.get_tuple(config)
>>> print(checkpoint_tuple)
CheckpointTuple(...)

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

从数据库中列出检查点。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于列出检查点的配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。默认为 None。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。默认为 None。

None

返回

类型 描述
CheckpointTuple

Iterator[CheckpointTuple]:检查点元组的迭代器。

示例

>>> from langgraph.checkpoint.sqlite import SqliteSaver
>>> with SqliteSaver.from_conn_string(":memory:") as memory:
... # Run a graph, then list the checkpoints
>>>     config = {"configurable": {"thread_id": "1"}}
>>>     checkpoints = list(memory.list(config, limit=2))
>>> print(checkpoints)
[CheckpointTuple(...), CheckpointTuple(...)]
>>> config = {"configurable": {"thread_id": "1"}}
>>> before = {"configurable": {"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"}}
>>> with SqliteSaver.from_conn_string(":memory:") as memory:
... # Run a graph, then list the checkpoints
>>>     checkpoints = list(memory.list(config, before=before))
>>> print(checkpoints)
[CheckpointTuple(...), ...]

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

将检查点保存到数据库。

此方法将检查点保存到 SQLite 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

示例

>>> from langgraph.checkpoint.sqlite import SqliteSaver
>>> with SqliteSaver.from_conn_string(":memory:") as memory:
>>>     config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
>>>     checkpoint = {"ts": "2024-05-04T06:32:42.235444+00:00", "id": "1ef4f797-8335-6428-8001-8a1503f9b875", "channel_values": {"key": "value"}}
>>>     saved_config = memory.put(config, checkpoint, {"source": "input", "step": 1, "writes": {"key": "value"}}, {})
>>> print(saved_config)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef4f797-8335-6428-8001-8a1503f9b875'}}

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

存储链接到检查点的中间写入。

此方法将与检查点关联的中间写入操作保存到 SQLite 数据库。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表,每个都为 (channel, value) 对。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

delete_thread

delete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

异步地从数据库中获取检查点元组。

注意

SqliteSaver 类不支持此异步方法。请改用 get_tuple(),或考虑使用 AsyncSqliteSaver

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

异步地从数据库中列出检查点。

注意

SqliteSaver 类不支持此异步方法。请改用 list(),或考虑使用 AsyncSqliteSaver

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

异步地将检查点保存到数据库。

注意

SqliteSaver 类不支持此异步方法。请改用 put(),或考虑使用 AsyncSqliteSaver

get_next_version

get_next_version(current: str | None, channel: None) -> str

为通道生成下一个版本 ID。

此方法根据通道的当前版本为其创建新的版本标识符。

参数

名称 类型 描述 默认值
current Optional[str]

通道的当前版本标识符。

必填

返回

名称 类型 描述
str str

下一个版本标识符,保证单调递增。

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

异步存储链接到检查点的中间写入操作。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与特定线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

应删除其检查点的线程 ID。

必填

名称 描述
AsyncSqliteSaver

一个将检查点存储在 SQLite 数据库中的异步检查点保存器。

AsyncSqliteSaver

Bases: BaseCheckpointSaver[str]

一个将检查点存储在 SQLite 数据库中的异步检查点保存器。

此类提供了一个使用 SQLite 数据库保存和检索检查点的异步接口。它专为异步环境设计,与同步替代方案相比,在 I/O 密集型操作中性能更佳。

属性

名称 类型 描述
conn Connection

异步 SQLite 数据库连接。

serde SerializerProtocol

用于编码/解码检查点的序列化器。

提示

需要 aiosqlite 包。使用 `pip install aiosqlite` 安装。

警告

虽然此类支持异步检查点,但不建议用于生产工作负载,因为 SQLite 的写入性能有限。对于生产用途,请考虑使用更强大的数据库,如 PostgreSQL。

提示

请记住在执行代码后**关闭数据库连接**,否则,您可能会看到图在执行后“挂起”(因为程序在连接关闭前不会退出)。

最简单的方法是如示例所示使用 `async with` 语句。

async with AsyncSqliteSaver.from_conn_string("checkpoints.sqlite") as saver:
    # Your code here
    graph = builder.compile(checkpointer=saver)
    config = {"configurable": {"thread_id": "thread-1"}}
    async for event in graph.astream_events(..., config, version="v1"):
        print(event)

示例

在 StateGraph 中使用

>>> import asyncio
>>>
>>> from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
>>> from langgraph.graph import StateGraph
>>>
>>> async def main():
>>>     builder = StateGraph(int)
>>>     builder.add_node("add_one", lambda x: x + 1)
>>>     builder.set_entry_point("add_one")
>>>     builder.set_finish_point("add_one")
>>>     async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as memory:
>>>         graph = builder.compile(checkpointer=memory)
>>>         coro = graph.ainvoke(1, {"configurable": {"thread_id": "thread-1"}})
>>>         print(await asyncio.gather(coro))
>>>
>>> asyncio.run(main())
Output: [2]
原始用法

>>> import asyncio
>>> import aiosqlite
>>> from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
>>>
>>> async def main():
>>>     async with aiosqlite.connect("checkpoints.db") as conn:
...         saver = AsyncSqliteSaver(conn)
...         config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
...         checkpoint = {"ts": "2023-05-03T10:00:00Z", "data": {"key": "value"}, "id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}
...         saved_config = await saver.aput(config, checkpoint, {}, {})
...         print(saved_config)
>>> asyncio.run(main())
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '0c62ca34-ac19-445d-bbb0-5b4984975b2a'}}

方法

名称 描述
from_conn_string

从连接字符串创建一个新的 AsyncSqliteSaver 实例。

get_tuple

从数据库中获取检查点元组。

list

异步地从数据库中列出检查点。

放置

将检查点保存到数据库。

delete_thread

删除与线程 ID 关联的所有检查点和写入操作。

setup

异步设置检查点数据库。

aget_tuple

异步地从数据库中获取检查点元组。

alist

异步地从数据库中列出检查点。

aput

异步地将检查点保存到数据库。

aput_writes

异步存储链接到检查点的中间写入操作。

adelete_thread

删除与线程 ID 关联的所有检查点和写入操作。

get_next_version

为通道生成下一个版本 ID。

获取

使用给定的配置获取检查点。

异步获取

使用给定的配置异步获取检查点。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

from_conn_string async classmethod

from_conn_string(
    conn_string: str,
) -> AsyncIterator[AsyncSqliteSaver]

从连接字符串创建一个新的 AsyncSqliteSaver 实例。

参数

名称 类型 描述 默认值
conn_string str

SQLite 连接字符串。

必填

返回

名称 类型 描述
AsyncSqliteSaver AsyncIterator[AsyncSqliteSaver]

一个新的 AsyncSqliteSaver 实例。

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

从数据库中获取检查点元组。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和检查点 ID 的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

异步地从数据库中列出检查点。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
CheckpointTuple

Iterator[CheckpointTuple]:匹配的检查点元组的迭代器。

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

将检查点保存到数据库。

此方法将检查点保存到 SQLite 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

delete_thread

delete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

setup async

setup() -> None

异步设置检查点数据库。

此方法会在 SQLite 数据库中创建必要的表(如果它们尚不存在)。它在需要时会自动调用,用户不应直接调用。

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

异步地从数据库中获取检查点元组。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和检查点 ID 的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

异步地从数据库中列出检查点。

此方法根据提供的配置从 SQLite 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
AsyncIterator[CheckpointTuple]

AsyncIterator[CheckpointTuple]:匹配检查点元组的异步迭代器。

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

异步地将检查点保存到数据库。

此方法将检查点保存到 SQLite 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

异步存储链接到检查点的中间写入操作。

此方法将与检查点关联的中间写入操作保存到数据库。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表,每个都为 (channel, value) 对。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

get_next_version

get_next_version(current: str | None, channel: None) -> str

为通道生成下一个版本 ID。

此方法根据通道的当前版本为其创建新的版本标识符。

参数

名称 类型 描述 默认值
current Optional[str]

通道的当前版本标识符。

必填

返回

名称 类型 描述
str str

下一个版本标识符,保证单调递增。

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

名称 描述
PostgresSaver

将检查点存储在 Postgres 数据库中的检查点保存器。

PostgresSaver

Bases: BasePostgresSaver

将检查点存储在 Postgres 数据库中的检查点保存器。

方法

名称 描述
from_conn_string

从连接字符串创建一个新的 PostgresSaver 实例。

setup

异步设置检查点数据库。

list

从数据库中列出检查点。

get_tuple

从数据库中获取检查点元组。

放置

将检查点保存到数据库。

put_writes

存储链接到检查点的中间写入。

delete_thread

删除与线程 ID 关联的所有检查点和写入操作。

获取

使用给定的配置获取检查点。

异步获取

使用给定的配置异步获取检查点。

aget_tuple

使用给定的配置异步获取检查点元组。

alist

异步列出符合给定条件的检查点。

aput

异步存储检查点及其配置和元数据。

aput_writes

异步存储链接到检查点的中间写入操作。

adelete_thread

删除与特定线程 ID 关联的所有检查点和写入操作。

属性

名称 类型 描述
config_specs list

定义检查点保存器的配置选项。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

from_conn_string classmethod

from_conn_string(
    conn_string: str, *, pipeline: bool = False
) -> Iterator[PostgresSaver]

从连接字符串创建一个新的 PostgresSaver 实例。

参数

名称 类型 描述 默认值
conn_string str

Postgres 连接信息字符串。

必填
pipeline bool

是否使用 Pipeline

False

返回

名称 类型 描述
PostgresSaver Iterator[PostgresSaver]

一个新的 PostgresSaver 实例。

setup

setup() -> None

异步设置检查点数据库。

此方法会在 Postgres 数据库中创建必要的表(如果它们尚不存在)并运行数据库迁移。用户在首次使用检查点保存器时必须直接调用此方法。

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

从数据库中列出检查点。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于列出检查点的配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。默认为 None。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。默认为 None。

None

返回

类型 描述
CheckpointTuple

Iterator[CheckpointTuple]:检查点元组的迭代器。

示例

>>> from langgraph.checkpoint.postgres import PostgresSaver
>>> DB_URI = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"
>>> with PostgresSaver.from_conn_string(DB_URI) as memory:
... # Run a graph, then list the checkpoints
>>>     config = {"configurable": {"thread_id": "1"}}
>>>     checkpoints = list(memory.list(config, limit=2))
>>> print(checkpoints)
[CheckpointTuple(...), CheckpointTuple(...)]
>>> config = {"configurable": {"thread_id": "1"}}
>>> before = {"configurable": {"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"}}
>>> with PostgresSaver.from_conn_string(DB_URI) as memory:
... # Run a graph, then list the checkpoints
>>>     checkpoints = list(memory.list(config, before=before))
>>> print(checkpoints)
[CheckpointTuple(...), ...]

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

从数据库中获取检查点元组。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和时间戳的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

示例

Basic:
>>> config = {"configurable": {"thread_id": "1"}}
>>> checkpoint_tuple = memory.get_tuple(config)
>>> print(checkpoint_tuple)
CheckpointTuple(...)

With timestamp:

>>> config = {
...    "configurable": {
...        "thread_id": "1",
...        "checkpoint_ns": "",
...        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875",
...    }
... }
>>> checkpoint_tuple = memory.get_tuple(config)
>>> print(checkpoint_tuple)
CheckpointTuple(...)

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

将检查点保存到数据库。

此方法将检查点保存到 Postgres 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

示例

>>> from langgraph.checkpoint.postgres import PostgresSaver
>>> DB_URI = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"
>>> with PostgresSaver.from_conn_string(DB_URI) as memory:
>>>     config = {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}
>>>     checkpoint = {"ts": "2024-05-04T06:32:42.235444+00:00", "id": "1ef4f797-8335-6428-8001-8a1503f9b875", "channel_values": {"key": "value"}}
>>>     saved_config = memory.put(config, checkpoint, {"source": "input", "step": 1, "writes": {"key": "value"}}, {})
>>> print(saved_config)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef4f797-8335-6428-8001-8a1503f9b875'}}

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

存储链接到检查点的中间写入。

此方法将与检查点关联的中间写入操作保存到 Postgres 数据库。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表。

必填
task_id str

创建写入操作的任务的标识符。

必填

delete_thread

delete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

使用给定的配置异步获取检查点元组。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:请求的检查点元组,如果未找到则为 None。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

异步列出符合给定条件的检查点。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

列出在此配置之前创建的检查点。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
AsyncIterator[CheckpointTuple]

AsyncIterator[CheckpointTuple]:匹配的检查点元组的异步迭代器。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

异步存储检查点及其配置和元数据。

参数

名称 类型 描述 默认值
config RunnableConfig

检查点的配置。

必填
checkpoint 检查点

要存储的检查点。

必填
metadata CheckpointMetadata

检查点的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

异步存储链接到检查点的中间写入操作。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

抛出

类型 描述
NotImplementedError

在您的自定义检查点保存器中实现此方法。

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与特定线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

应删除其检查点的线程 ID。

必填

名称 描述
AsyncPostgresSaver

将检查点存储在 Postgres 数据库中的异步检查点保存器。

AsyncPostgresSaver

Bases: BasePostgresSaver

将检查点存储在 Postgres 数据库中的异步检查点保存器。

方法

名称 描述
from_conn_string

从连接字符串创建一个新的 AsyncPostgresSaver 实例。

setup

异步设置检查点数据库。

alist

异步地从数据库中列出检查点。

aget_tuple

异步地从数据库中获取检查点元组。

aput

异步地将检查点保存到数据库。

aput_writes

异步存储链接到检查点的中间写入操作。

adelete_thread

删除与线程 ID 关联的所有检查点和写入操作。

list

从数据库中列出检查点。

get_tuple

从数据库中获取检查点元组。

放置

将检查点保存到数据库。

put_writes

存储链接到检查点的中间写入。

delete_thread

删除与线程 ID 关联的所有检查点和写入操作。

获取

使用给定的配置获取检查点。

异步获取

使用给定的配置异步获取检查点。

属性

名称 类型 描述
config_specs list

定义检查点保存器的配置选项。

config_specs property

config_specs: list

定义检查点保存器的配置选项。

返回

名称 类型 描述
list list

配置字段规范列表。

from_conn_string async classmethod

from_conn_string(
    conn_string: str,
    *,
    pipeline: bool = False,
    serde: SerializerProtocol | None = None
) -> AsyncIterator[AsyncPostgresSaver]

从连接字符串创建一个新的 AsyncPostgresSaver 实例。

参数

名称 类型 描述 默认值
conn_string str

Postgres 连接信息字符串。

必填
pipeline bool

是否使用 AsyncPipeline

False

返回

名称 类型 描述
AsyncPostgresSaver AsyncIterator[AsyncPostgresSaver]

一个新的 AsyncPostgresSaver 实例。

setup async

setup() -> None

异步设置检查点数据库。

此方法会在 Postgres 数据库中创建必要的表(如果它们尚不存在)并运行数据库迁移。用户在首次使用检查点保存器时必须直接调用此方法。

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

异步地从数据库中列出检查点。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
AsyncIterator[CheckpointTuple]

AsyncIterator[CheckpointTuple]:匹配检查点元组的异步迭代器。

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

异步地从数据库中获取检查点元组。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和 "checkpoint_id" 的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

异步地将检查点保存到数据库。

此方法将检查点保存到 Postgres 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

异步存储链接到检查点的中间写入操作。

此方法将与检查点关联的中间写入操作保存到数据库。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表,每个都为 (channel, value) 对。

必填
task_id str

创建写入操作的任务的标识符。

必填

adelete_thread async

adelete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

从数据库中列出检查点。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组列表。检查点按检查点 ID 降序排列(最新优先)。

参数

名称 类型 描述 默认值
config RunnableConfig | None

用于筛选检查点的基本配置。

必填
过滤器 dict[str, Any] | None

元数据的额外筛选条件。

None
before RunnableConfig | None

如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。

None
限制 int | None

要返回的最大检查点数量。

None

返回

类型 描述
CheckpointTuple

Iterator[CheckpointTuple]:匹配的检查点元组的迭代器。

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

从数据库中获取检查点元组。

此方法根据提供的配置从 Postgres 数据库中检索检查点元组。如果配置包含 "checkpoint_id" 键,则检索具有匹配线程 ID 和 "checkpoint_id" 的检查点。否则,检索给定线程 ID 的最新检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

用于检索检查点的配置。

必填

返回

类型 描述
CheckpointTuple | None

Optional[CheckpointTuple]:检索到的检查点元组,如果未找到匹配的检查点,则为 None。

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

将检查点保存到数据库。

此方法将检查点保存到 Postgres 数据库中。检查点与提供的配置及其父配置(如有)相关联。

参数

名称 类型 描述 默认值
config RunnableConfig

要与检查点关联的配置。

必填
checkpoint 检查点

要保存的检查点。

必填
metadata CheckpointMetadata

要与检查点一起保存的额外元数据。

必填
new_versions ChannelVersions

此次写入时的新通道版本。

必填

返回

名称 类型 描述
RunnableConfig RunnableConfig

存储检查点后的更新配置。

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

存储链接到检查点的中间写入。

此方法将与检查点关联的中间写入操作保存到数据库。

参数

名称 类型 描述 默认值
config RunnableConfig

相关检查点的配置。

必填
writes Sequence[tuple[str, Any]]

要存储的写入操作列表,每个都为 (channel, value) 对。

必填
task_id str

创建写入操作的任务的标识符。

必填
task_path str

创建写入操作的任务的路径。

''

delete_thread

delete_thread(thread_id: str) -> None

删除与线程 ID 关联的所有检查点和写入操作。

参数

名称 类型 描述 默认值
thread_id str

要删除的线程 ID。

必填

返回

类型 描述
None

None

get

get(config: RunnableConfig) -> Checkpoint | None

使用给定的配置获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。

aget async

aget(config: RunnableConfig) -> Checkpoint | None

使用给定的配置异步获取检查点。

参数

名称 类型 描述 默认值
config RunnableConfig

指定要检索哪个检查点的配置。

必填

返回

类型 描述
Checkpoint | None

Optional[Checkpoint]:请求的检查点,如果未找到则为 None。