如何使用 MongoDB 创建自定义检查点¶
在创建 LangGraph 代理时,您还可以对其进行设置,使其持久保存其状态。这使您可以执行诸如多次与代理交互并使其记住先前交互之类的操作。
此参考实现展示了如何使用 MongoDB 作为持久保存检查点状态的后端。请确保您在端口 27017
上运行 MongoDB,以便完成本指南。
注意
这是一个**参考**实现。只要它符合 BaseCheckpointSaver 接口,您就可以使用不同的数据库实现自己的检查点,或修改此检查点。
为了演示目的,我们在 预建的创建 React 代理 中添加了持久化。
一般来说,您可以将检查点添加到您构建的任何自定义图中,方法如下:
from langgraph.graph import StateGraph
builder = StateGraph(....)
# ... define the graph
checkpointer = # mongodb checkpointer (see examples below)
graph = builder.compile(checkpointer=checkpointer)
...
设置¶
首先,让我们安装所需的软件包并设置 API 密钥
在 [1]
已复制!
%%capture --no-stderr
%pip install -U pymongo motor langgraph
%%capture --no-stderr %pip install -U pymongo motor langgraph
在 [ ]
已复制!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
检查点实现¶
MongoDBSaver¶
以下是 MongoDBSaver 的实现(用于图的同步使用,即 .invoke()
、.stream()
)。MongoDBSaver 实施了任何检查点所需的四种方法
.put
- 使用其配置和元数据存储检查点。.put_writes
- 存储与检查点关联的中间写入(即挂起的写入)。.get_tuple
- 使用给定配置(thread_id
和checkpoint_id
)获取检查点元组。.list
- 列出与给定配置和筛选条件匹配的检查点。
在 [3]
已复制!
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncIterator, Dict, Iterator, Optional, Sequence, Tuple
from langchain_core.runnables import RunnableConfig
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import MongoClient, UpdateOne
from pymongo.database import Database as MongoDatabase
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
ChannelVersions,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
get_checkpoint_id,
)
class MongoDBSaver(BaseCheckpointSaver):
"""A checkpoint saver that stores checkpoints in a MongoDB database."""
client: MongoClient
db: MongoDatabase
def __init__(
self,
client: MongoClient,
db_name: str,
) -> None:
super().__init__()
self.client = client
self.db = self.client[db_name]
@classmethod
@contextmanager
def from_conn_info(
cls, *, host: str, port: int, db_name: str
) -> Iterator["MongoDBSaver"]:
client = None
try:
client = MongoClient(host=host, port=port)
yield MongoDBSaver(client, db_name)
finally:
if client:
client.close()
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get a checkpoint tuple from the database.
This method retrieves a checkpoint tuple from the MongoDB database based on the
provided config. If the config contains a "checkpoint_id" key, the checkpoint with
the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint
for the given thread ID is retrieved.
Args:
config (RunnableConfig): The config to use for retrieving the checkpoint.
Returns:
Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
if checkpoint_id := get_checkpoint_id(config):
query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
else:
query = {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns}
result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1).limit(1)
for doc in result:
config_values = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc["checkpoint_id"],
}
checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"]))
serialized_writes = self.db["checkpoint_writes"].find(config_values)
pending_writes = [
(
doc["task_id"],
doc["channel"],
self.serde.loads_typed((doc["type"], doc["value"])),
)
for doc in serialized_writes
]
return CheckpointTuple(
{"configurable": config_values},
checkpoint,
self.serde.loads(doc["metadata"]),
(
{
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc["parent_checkpoint_id"],
}
}
if doc.get("parent_checkpoint_id")
else None
),
pending_writes,
)
def list(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[Dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> Iterator[CheckpointTuple]:
"""List checkpoints from the database.
This method retrieves a list of checkpoint tuples from the MongoDB database based
on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
Args:
config (RunnableConfig): The config to use for listing the checkpoints.
filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. Defaults to None.
before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]): The maximum number of checkpoints to return. Defaults to None.
Yields:
Iterator[CheckpointTuple]: An iterator of checkpoint tuples.
"""
query = {}
if config is not None:
query = {
"thread_id": config["configurable"]["thread_id"],
"checkpoint_ns": config["configurable"].get("checkpoint_ns", ""),
}
if filter:
for key, value in filter.items():
query[f"metadata.{key}"] = value
if before is not None:
query["checkpoint_id"] = {"$lt": before["configurable"]["checkpoint_id"]}
result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1)
if limit is not None:
result = result.limit(limit)
for doc in result:
checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"]))
yield CheckpointTuple(
{
"configurable": {
"thread_id": doc["thread_id"],
"checkpoint_ns": doc["checkpoint_ns"],
"checkpoint_id": doc["checkpoint_id"],
}
},
checkpoint,
self.serde.loads(doc["metadata"]),
(
{
"configurable": {
"thread_id": doc["thread_id"],
"checkpoint_ns": doc["checkpoint_ns"],
"checkpoint_id": doc["parent_checkpoint_id"],
}
}
if doc.get("parent_checkpoint_id")
else None
),
)
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""Save a checkpoint to the database.
This method saves a checkpoint to the MongoDB database. The checkpoint is associated
with the provided config and its parent config (if any).
Args:
config (RunnableConfig): The config to associate with the checkpoint.
checkpoint (Checkpoint): The checkpoint to save.
metadata (CheckpointMetadata): Additional metadata to save with the checkpoint.
new_versions (ChannelVersions): New channel versions as of this write.
Returns:
RunnableConfig: Updated configuration after storing the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = checkpoint["id"]
type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
doc = {
"parent_checkpoint_id": config["configurable"].get("checkpoint_id"),
"type": type_,
"checkpoint": serialized_checkpoint,
"metadata": self.serde.dumps(metadata),
}
upsert_query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
# Perform your operations here
self.db["checkpoints"].update_one(upsert_query, {"$set": doc}, upsert=True)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
def put_writes(
self,
config: RunnableConfig,
writes: Sequence[Tuple[str, Any]],
task_id: str,
) -> None:
"""Store intermediate writes linked to a checkpoint.
This method saves intermediate writes associated with a checkpoint to the MongoDB database.
Args:
config (RunnableConfig): Configuration of the related checkpoint.
writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair.
task_id (str): Identifier for the task creating the writes.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = config["configurable"]["checkpoint_id"]
operations = []
for idx, (channel, value) in enumerate(writes):
upsert_query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"task_id": task_id,
"idx": idx,
}
type_, serialized_value = self.serde.dumps_typed(value)
operations.append(
UpdateOne(
upsert_query,
{
"$set": {
"channel": channel,
"type": type_,
"value": serialized_value,
}
},
upsert=True,
)
)
self.db["checkpoint_writes"].bulk_write(operations)
from contextlib import asynccontextmanager, contextmanager from typing import Any, AsyncIterator, Dict, Iterator, Optional, Sequence, Tuple from langchain_core.runnables import RunnableConfig from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from pymongo import MongoClient, UpdateOne from pymongo.database import Database as MongoDatabase from langgraph.checkpoint.base import ( BaseCheckpointSaver, ChannelVersions, Checkpoint, CheckpointMetadata, CheckpointTuple, get_checkpoint_id, ) class MongoDBSaver(BaseCheckpointSaver): """存储检查点到 MongoDB 数据库的检查点保存器。""" client: MongoClient db: MongoDatabase def __init__( self, client: MongoClient, db_name: str, ) -> None: super().__init__() self.client = client self.db = self.client[db_name] @classmethod @contextmanager def from_conn_info( cls, *, host: str, port: int, db_name: str ) -> Iterator["MongoDBSaver"]: client = None try: client = MongoClient(host=host, port=port) yield MongoDBSaver(client, db_name) finally: if client: client.close() def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: """从数据库获取检查点元组。此方法从 MongoDB 数据库检索检查点元组,该元组基于提供的配置。如果配置包含 "checkpoint_id" 密钥,则检索具有匹配线程 ID 和检查点 ID 的检查点。否则,将检索给定线程 ID 的最新检查点。 Args: config (RunnableConfig): 用于检索检查点的配置。 Returns: Optional[CheckpointTuple]: 检索到的检查点元组,如果未找到匹配的检查点,则为 None。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") if checkpoint_id := get_checkpoint_id(config): query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } else: query = {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns} result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1).limit(1) for doc in result: config_values = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": doc["checkpoint_id"], } checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"])) serialized_writes = self.db["checkpoint_writes"].find(config_values) pending_writes = [ ( doc["task_id"], doc["channel"], self.serde.loads_typed((doc["type"], doc["value"])), ) for doc in serialized_writes ] return CheckpointTuple( {"configurable": config_values}, checkpoint, self.serde.loads(doc["metadata"]), ( { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": doc["parent_checkpoint_id"], } } if doc.get("parent_checkpoint_id") else None ), pending_writes, ) def list( self, config: Optional[RunnableConfig], *, filter: Optional[Dict[str, Any]] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None, ) -> Iterator[CheckpointTuple]: """从数据库中列出检查点。此方法从 MongoDB 数据库检索检查点元组列表,该列表基于提供的配置。检查点按检查点 ID 降序排序(最新的在最前面)。 Args: config (RunnableConfig): 用于列出检查点的配置。 filter (Optional[Dict[str, Any]]): 元数据的附加筛选条件。默认为 None。 before (Optional[RunnableConfig]): 如果提供,则仅返回指定检查点 ID 之前的检查点。默认为 None。 limit (Optional[int]): 要返回的检查点的最大数量。默认为 None。 Yields: Iterator[CheckpointTuple]: 检查点元组的迭代器。 """ query = {} if config is not None: query = { "thread_id": config["configurable"]["thread_id"], "checkpoint_ns": config["configurable"].get("checkpoint_ns", ""), } if filter: for key, value in filter.items(): query[f"metadata.{key}"] = value if before is not None: query["checkpoint_id"] = {"$lt": before["configurable"]["checkpoint_id"]} result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1) if limit is not None: result = result.limit(limit) for doc in result: checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"])) yield CheckpointTuple( { "configurable": { "thread_id": doc["thread_id"], "checkpoint_ns": doc["checkpoint_ns"], "checkpoint_id": doc["checkpoint_id"], } }, checkpoint, self.serde.loads(doc["metadata"]), ( { "configurable": { "thread_id": doc["thread_id"], "checkpoint_ns": doc["checkpoint_ns"], "checkpoint_id": doc["parent_checkpoint_id"], } } if doc.get("parent_checkpoint_id") else None ), ) def put( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: """将检查点保存到数据库。此方法将检查点保存到 MongoDB 数据库。检查点与提供的配置及其父配置(如果有)相关联。 Args: config (RunnableConfig): 与检查点相关联的配置。 checkpoint (Checkpoint): 要保存的检查点。 metadata (CheckpointMetadata): 要与检查点一起保存的附加元数据。 new_versions (ChannelVersions): 这次写入的新的通道版本。 Returns: RunnableConfig: 存储检查点后更新的配置。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = checkpoint["id"] type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) doc = { "parent_checkpoint_id": config["configurable"].get("checkpoint_id"), "type": type_, "checkpoint": serialized_checkpoint, "metadata": self.serde.dumps(metadata), } upsert_query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } # 执行您的操作 self.db["checkpoints"].update_one(upsert_query, {"$set": doc}, upsert=True) return { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } } def put_writes( self, config: RunnableConfig, writes: Sequence[Tuple[str, Any]], task_id: str, ) -> None: """存储与检查点关联的中间写入。此方法将与检查点关联的中间写入保存到 MongoDB 数据库。 Args: config (RunnableConfig): 相关检查点的配置。 writes (Sequence[Tuple[str, Any]]): 要存储的写入列表,每个写入都是 (channel, value) 对。 task_id (str): 创建写入的任务的标识符。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = config["configurable"]["checkpoint_id"] operations = [] for idx, (channel, value) in enumerate(writes): upsert_query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, "task_id": task_id, "idx": idx, } type_, serialized_value = self.serde.dumps_typed(value) operations.append( UpdateOne( upsert_query, { "$set": { "channel": channel, "type": type_, "value": serialized_value, } }, upsert=True, ) ) self.db["checkpoint_writes"].bulk_write(operations)
AsyncMongoDBSaver¶
以下是 AsyncMongoDBSaver 的参考实现(用于图的异步使用,即 .ainvoke()
、.astream()
)。AsyncMongoDBSaver 实施了任何异步检查点所需的四种方法
.aput
- 使用其配置和元数据存储检查点。.aput_writes
- 存储与检查点关联的中间写入(即挂起的写入)。.aget_tuple
- 使用给定配置(thread_id
和checkpoint_id
)获取检查点元组。.alist
- 列出与给定配置和筛选条件匹配的检查点。
在 [4]
已复制!
class AsyncMongoDBSaver(BaseCheckpointSaver):
"""A checkpoint saver that stores checkpoints in a MongoDB database asynchronously."""
client: AsyncIOMotorClient
db: AsyncIOMotorDatabase
def __init__(
self,
client: AsyncIOMotorClient,
db_name: str,
) -> None:
super().__init__()
self.client = client
self.db = self.client[db_name]
@classmethod
@asynccontextmanager
async def from_conn_info(
cls, *, host: str, port: int, db_name: str
) -> AsyncIterator["AsyncMongoDBSaver"]:
client = None
try:
client = AsyncIOMotorClient(host=host, port=port)
yield AsyncMongoDBSaver(client, db_name)
finally:
if client:
client.close()
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get a checkpoint tuple from the database asynchronously.
This method retrieves a checkpoint tuple from the MongoDB database based on the
provided config. If the config contains a "checkpoint_id" key, the checkpoint with
the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint
for the given thread ID is retrieved.
Args:
config (RunnableConfig): The config to use for retrieving the checkpoint.
Returns:
Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
if checkpoint_id := get_checkpoint_id(config):
query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
else:
query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
}
result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1).limit(1)
async for doc in result:
config_values = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc["checkpoint_id"],
}
checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"]))
serialized_writes = self.db["checkpoint_writes"].find(config_values)
pending_writes = [
(
doc["task_id"],
doc["channel"],
self.serde.loads_typed((doc["type"], doc["value"])),
)
async for doc in serialized_writes
]
return CheckpointTuple(
{"configurable": config_values},
checkpoint,
self.serde.loads(doc["metadata"]),
(
{
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc["parent_checkpoint_id"],
}
}
if doc.get("parent_checkpoint_id")
else None
),
pending_writes,
)
async def alist(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[Dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> AsyncIterator[CheckpointTuple]:
"""List checkpoints from the database asynchronously.
This method retrieves a list of checkpoint tuples from the MongoDB database based
on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
Args:
config (Optional[RunnableConfig]): Base configuration for filtering checkpoints.
filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata.
before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]): Maximum number of checkpoints to return.
Yields:
AsyncIterator[CheckpointTuple]: An asynchronous iterator of matching checkpoint tuples.
"""
query = {}
if config is not None:
query = {
"thread_id": config["configurable"]["thread_id"],
"checkpoint_ns": config["configurable"].get("checkpoint_ns", ""),
}
if filter:
for key, value in filter.items():
query[f"metadata.{key}"] = value
if before is not None:
query["checkpoint_id"] = {"$lt": before["configurable"]["checkpoint_id"]}
result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1)
if limit is not None:
result = result.limit(limit)
async for doc in result:
checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"]))
yield CheckpointTuple(
{
"configurable": {
"thread_id": doc["thread_id"],
"checkpoint_ns": doc["checkpoint_ns"],
"checkpoint_id": doc["checkpoint_id"],
}
},
checkpoint,
self.serde.loads(doc["metadata"]),
(
{
"configurable": {
"thread_id": doc["thread_id"],
"checkpoint_ns": doc["checkpoint_ns"],
"checkpoint_id": doc["parent_checkpoint_id"],
}
}
if doc.get("parent_checkpoint_id")
else None
),
)
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""Save a checkpoint to the database asynchronously.
This method saves a checkpoint to the MongoDB database. The checkpoint is associated
with the provided config and its parent config (if any).
Args:
config (RunnableConfig): The config to associate with the checkpoint.
checkpoint (Checkpoint): The checkpoint to save.
metadata (CheckpointMetadata): Additional metadata to save with the checkpoint.
new_versions (ChannelVersions): New channel versions as of this write.
Returns:
RunnableConfig: Updated configuration after storing the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = checkpoint["id"]
type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
doc = {
"parent_checkpoint_id": config["configurable"].get("checkpoint_id"),
"type": type_,
"checkpoint": serialized_checkpoint,
"metadata": self.serde.dumps(metadata),
}
upsert_query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
# Perform your operations here
await self.db["checkpoints"].update_one(
upsert_query, {"$set": doc}, upsert=True
)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
async def aput_writes(
self,
config: RunnableConfig,
writes: Sequence[Tuple[str, Any]],
task_id: str,
) -> None:
"""Store intermediate writes linked to a checkpoint asynchronously.
This method saves intermediate writes associated with a checkpoint to the database.
Args:
config (RunnableConfig): Configuration of the related checkpoint.
writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair.
task_id (str): Identifier for the task creating the writes.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = config["configurable"]["checkpoint_id"]
operations = []
for idx, (channel, value) in enumerate(writes):
upsert_query = {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"task_id": task_id,
"idx": idx,
}
type_, serialized_value = self.serde.dumps_typed(value)
operations.append(
UpdateOne(
upsert_query,
{
"$set": {
"channel": channel,
"type": type_,
"value": serialized_value,
}
},
upsert=True,
)
)
await self.db["checkpoint_writes"].bulk_write(operations)
class AsyncMongoDBSaver(BaseCheckpointSaver): """一个异步将检查点存储到 MongoDB 数据库的检查点保存器。""" client: AsyncIOMotorClient db: AsyncIOMotorDatabase def __init__( self, client: AsyncIOMotorClient, db_name: str, ) -> None: super().__init__() self.client = client self.db = self.client[db_name] @classmethod @asynccontextmanager async def from_conn_info( cls, *, host: str, port: int, db_name: str ) -> AsyncIterator["AsyncMongoDBSaver"]: client = None try: client = AsyncIOMotorClient(host=host, port=port) yield AsyncMongoDBSaver(client, db_name) finally: if client: client.close() async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: """异步从数据库获取检查点元组。 此方法从 MongoDB 数据库中根据提供的配置检索检查点元组。 如果配置包含 "checkpoint_id" 键,则检索与匹配的线程 ID 和检查点 ID 相对应的检查点。 否则,检索给定线程 ID 的最新检查点。 Args: config (RunnableConfig): 用于检索检查点的配置。 Returns: Optional[CheckpointTuple]: 检索到的检查点元组,如果未找到匹配的检查点,则为 None。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") if checkpoint_id := get_checkpoint_id(config): query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } else: query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, } result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1).limit(1) async for doc in result: config_values = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": doc["checkpoint_id"], } checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"])) serialized_writes = self.db["checkpoint_writes"].find(config_values) pending_writes = [ ( doc["task_id"], doc["channel"], self.serde.loads_typed((doc["type"], doc["value"])), ) async for doc in serialized_writes ] return CheckpointTuple( {"configurable": config_values}, checkpoint, self.serde.loads(doc["metadata"]), ( { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": doc["parent_checkpoint_id"], } } if doc.get("parent_checkpoint_id") else None ), pending_writes, ) async def alist( self, config: Optional[RunnableConfig], *, filter: Optional[Dict[str, Any]] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None, ) -> AsyncIterator[CheckpointTuple]: """异步从数据库中列出检查点。 此方法根据提供的配置从 MongoDB 数据库中检索检查点元组列表。 检查点按检查点 ID 降序排序(最新的排在最前面)。 Args: config (Optional[RunnableConfig]): 用于筛选检查点的基本配置。 filter (Optional[Dict[str, Any]]): 元数据的额外筛选条件。 before (Optional[RunnableConfig]): 如果提供,则仅返回指定检查点 ID 之前的检查点。 默认为 None。 limit (Optional[int]): 要返回的最大检查点数量。 Yields: AsyncIterator[CheckpointTuple]: 匹配检查点元组的异步迭代器。 """ query = {} if config is not None: query = { "thread_id": config["configurable"]["thread_id"], "checkpoint_ns": config["configurable"].get("checkpoint_ns", ""), } if filter: for key, value in filter.items(): query[f"metadata.{key}"] = value if before is not None: query["checkpoint_id"] = {"$lt": before["configurable"]["checkpoint_id"]} result = self.db["checkpoints"].find(query).sort("checkpoint_id", -1) if limit is not None: result = result.limit(limit) async for doc in result: checkpoint = self.serde.loads_typed((doc["type"], doc["checkpoint"])) yield CheckpointTuple( { "configurable": { "thread_id": doc["thread_id"], "checkpoint_ns": doc["checkpoint_ns"], "checkpoint_id": doc["checkpoint_id"], } }, checkpoint, self.serde.loads(doc["metadata"]), ( { "configurable": { "thread_id": doc["thread_id"], "checkpoint_ns": doc["checkpoint_ns"], "checkpoint_id": doc["parent_checkpoint_id"], } } if doc.get("parent_checkpoint_id") else None ), ) async def aput( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: """异步将检查点保存到数据库。 此方法将检查点保存到 MongoDB 数据库。 检查点与提供的配置及其父配置(如果有)相关联。 Args: config (RunnableConfig): 与检查点关联的配置。 checkpoint (Checkpoint): 要保存的检查点。 metadata (CheckpointMetadata): 与检查点一起保存的额外元数据。 new_versions (ChannelVersions): 这次写入的新的通道版本。 Returns: RunnableConfig: 存储检查点后更新的配置。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = checkpoint["id"] type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) doc = { "parent_checkpoint_id": config["configurable"].get("checkpoint_id"), "type": type_, "checkpoint": serialized_checkpoint, "metadata": self.serde.dumps(metadata), } upsert_query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } # 在这里执行您的操作 await self.db["checkpoints"].update_one( upsert_query, {"$set": doc}, upsert=True ) return { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } } async def aput_writes( self, config: RunnableConfig, writes: Sequence[Tuple[str, Any]], task_id: str, ) -> None: """异步存储与检查点相关的中间写入。 此方法将与检查点关联的中间写入保存到数据库。 Args: config (RunnableConfig): 相关检查点的配置。 writes (Sequence[Tuple[str, Any]]): 要存储的写入列表,每个写入都以 (通道,值) 对的形式。 task_id (str): 创建写入的任务标识符。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = config["configurable"]["checkpoint_id"] operations = [] for idx, (channel, value) in enumerate(writes): upsert_query = { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, "task_id": task_id, "idx": idx, } type_, serialized_value = self.serde.dumps_typed(value) operations.append( UpdateOne( upsert_query, { "$set": { "channel": channel, "type": type_, "value": serialized_value, } }, upsert=True, ) ) await self.db["checkpoint_writes"].bulk_write(operations)
为图设置模型和工具¶
In [5]
已复制!
from typing import Literal
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
from typing import Literal from langchain_core.runnables import ConfigurableField from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent @tool def get_weather(city: Literal["nyc", "sf"]): """使用此工具获取天气信息。""" if city == "nyc": return "纽约的天气可能多云" elif city == "sf": return "旧金山永远阳光明媚" else: raise AssertionError("未知城市") tools = [get_weather] model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
使用同步连接¶
In [6]
已复制!
with MongoDBSaver.from_conn_info(
host="localhost", port=27017, db_name="checkpoints"
) as checkpointer:
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
latest_checkpoint = checkpointer.get(config)
latest_checkpoint_tuple = checkpointer.get_tuple(config)
checkpoint_tuples = list(checkpointer.list(config))
with MongoDBSaver.from_conn_info( host="localhost", port=27017, db_name="checkpoints" ) as checkpointer: graph = create_react_agent(model, tools=tools, checkpointer=checkpointer) config = {"configurable": {"thread_id": "1"}} res = graph.invoke({"messages": [("human", "旧金山的天气怎么样")]}, config) latest_checkpoint = checkpointer.get(config) latest_checkpoint_tuple = checkpointer.get_tuple(config) checkpoint_tuples = list(checkpointer.list(config))
In [7]
已复制!
latest_checkpoint
latest_checkpoint
Out[7]
{'v': 1, 'ts': '2024-08-09T16:19:39.102711+00:00', 'id': '1ef566b2-d2a8-6cdc-8003-cc4d1980d188', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='2d124101-696d-450f-bc9f-d8fdcc564101', tool_call_id='call_Y7PzHb7LrIdiTnO5UiSfelt3'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-87c76dd2-33f4-433e-986a-9405cfe88c88-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}
In [8]
已复制!
latest_checkpoint_tuple
latest_checkpoint_tuple
Out[8]
CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-d2a8-6cdc-8003-cc4d1980d188'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:39.102711+00:00', 'id': '1ef566b2-d2a8-6cdc-8003-cc4d1980d188', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='2d124101-696d-450f-bc9f-d8fdcc564101', tool_call_id='call_Y7PzHb7LrIdiTnO5UiSfelt3'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-87c76dd2-33f4-433e-986a-9405cfe88c88-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-87c76dd2-33f4-433e-986a-9405cfe88c88-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-cdf7-6b98-8002-997748cc5052'}}, pending_writes=[])
In [9]
已复制!
checkpoint_tuples
checkpoint_tuples
Out[9]
[CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-d2a8-6cdc-8003-cc4d1980d188'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:39.102711+00:00', 'id': '1ef566b2-d2a8-6cdc-8003-cc4d1980d188', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='2d124101-696d-450f-bc9f-d8fdcc564101', tool_call_id='call_Y7PzHb7LrIdiTnO5UiSfelt3'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-87c76dd2-33f4-433e-986a-9405cfe88c88-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-87c76dd2-33f4-433e-986a-9405cfe88c88-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-cdf7-6b98-8002-997748cc5052'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-cdf7-6b98-8002-997748cc5052'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:38.610752+00:00', 'id': '1ef566b2-cdf7-6b98-8002-997748cc5052', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='2d124101-696d-450f-bc9f-d8fdcc564101', tool_call_id='call_Y7PzHb7LrIdiTnO5UiSfelt3')], 'tools': 'tools'}, 'channel_versions': {'__start__': 2, 'messages': 4, 'start:agent': 3, 'agent': 4, 'branch:agent:should_continue:tools': 4, 'tools': 4}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'tools': {'messages': [ToolMessage(content="It's always sunny in sf", name='get_weather', id='2d124101-696d-450f-bc9f-d8fdcc564101', tool_call_id='call_Y7PzHb7LrIdiTnO5UiSfelt3')]}}, 'step': 2}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-cde3-6c60-8001-28d4cc36978d'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-cde3-6c60-8001-28d4cc36978d'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:38.602590+00:00', 'id': '1ef566b2-cde3-6c60-8001-28d4cc36978d', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})], 'agent': 'agent', 'branch:agent:should_continue:tools': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 3, 'start:agent': 3, 'agent': 3, 'branch:agent:should_continue:tools': 3}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-cd1d3187-470f-4ebd-938f-527a61824045-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_Y7PzHb7LrIdiTnO5UiSfelt3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})]}}, 'step': 1}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-c72c-6fca-8000-aac6e4f4b809'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-c72c-6fca-8000-aac6e4f4b809'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:37.898584+00:00', 'id': '1ef566b2-c72c-6fca-8000-aac6e4f4b809', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f4227353-e0e5-43a9-984a-e4b9e2d8e7b8')], 'start:agent': '__start__'}, 'channel_versions': {'__start__': 2, 'messages': 2, 'start:agent': 2}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': None, 'step': 0}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-c72a-6af4-bfff-919b9dc6abfe'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b2-c72a-6af4-bfff-919b9dc6abfe'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:37.897642+00:00', 'id': '1ef566b2-c72a-6af4-bfff-919b9dc6abfe', 'channel_values': {'messages': [], '__start__': {'messages': [['human', "what's the weather in sf"]]}}, 'channel_versions': {'__start__': 1}, 'versions_seen': {'__input__': {}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'input', 'writes': {'messages': [['human', "what's the weather in sf"]]}, 'step': -1}, parent_config=None, pending_writes=None)]
使用异步连接¶
In [10]
已复制!
async with AsyncMongoDBSaver.from_conn_info(
host="localhost", port=27017, db_name="checkpoints"
) as checkpointer:
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "2"}}
res = await graph.ainvoke(
{"messages": [("human", "what's the weather in nyc")]}, config
)
latest_checkpoint = await checkpointer.aget(config)
latest_checkpoint_tuple = await checkpointer.aget_tuple(config)
checkpoint_tuples = [c async for c in checkpointer.alist(config)]
async with AsyncMongoDBSaver.from_conn_info( host="localhost", port=27017, db_name="checkpoints" ) as checkpointer: graph = create_react_agent(model, tools=tools, checkpointer=checkpointer) config = {"configurable": {"thread_id": "2"}} res = await graph.ainvoke( {"messages": [("human", "纽约的天气怎么样")]}, config ) latest_checkpoint = await checkpointer.aget(config) latest_checkpoint_tuple = await checkpointer.aget_tuple(config) checkpoint_tuples = [c async for c in checkpointer.alist(config)]
In [11]
已复制!
latest_checkpoint
latest_checkpoint
Out[11]
{'v': 1, 'ts': '2024-08-09T16:19:48.212051+00:00', 'id': '1ef566b3-2988-664c-8003-5974c59c6bda', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='56d4e46b-6cb3-4efe-b369-27b666e62348', tool_call_id='call_IJvXEELx7Ir3kASCqr9dbvhU'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-dcacbc70-b213-4ddc-ac08-c0d17b2766d8-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}
In [12]
已复制!
latest_checkpoint_tuple
latest_checkpoint_tuple
Out[12]
CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-2988-664c-8003-5974c59c6bda'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:48.212051+00:00', 'id': '1ef566b3-2988-664c-8003-5974c59c6bda', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='56d4e46b-6cb3-4efe-b369-27b666e62348', tool_call_id='call_IJvXEELx7Ir3kASCqr9dbvhU'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-dcacbc70-b213-4ddc-ac08-c0d17b2766d8-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-dcacbc70-b213-4ddc-ac08-c0d17b2766d8-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-23c9-64ea-8002-036c32979035'}}, pending_writes=[])
In [13]
已复制!
checkpoint_tuples
checkpoint_tuples
Out[13]
[CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-2988-664c-8003-5974c59c6bda'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:48.212051+00:00', 'id': '1ef566b3-2988-664c-8003-5974c59c6bda', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='56d4e46b-6cb3-4efe-b369-27b666e62348', tool_call_id='call_IJvXEELx7Ir3kASCqr9dbvhU'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-dcacbc70-b213-4ddc-ac08-c0d17b2766d8-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-dcacbc70-b213-4ddc-ac08-c0d17b2766d8-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-23c9-64ea-8002-036c32979035'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-23c9-64ea-8002-036c32979035'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:47.609498+00:00', 'id': '1ef566b3-23c9-64ea-8002-036c32979035', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='56d4e46b-6cb3-4efe-b369-27b666e62348', tool_call_id='call_IJvXEELx7Ir3kASCqr9dbvhU')], 'tools': 'tools'}, 'channel_versions': {'__start__': 2, 'messages': 4, 'start:agent': 3, 'agent': 4, 'branch:agent:should_continue:tools': 4, 'tools': 4}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'tools': {'messages': [ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='56d4e46b-6cb3-4efe-b369-27b666e62348', tool_call_id='call_IJvXEELx7Ir3kASCqr9dbvhU')]}}, 'step': 2}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-23b5-6de6-8001-a39c8ce6fd93'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-23b5-6de6-8001-a39c8ce6fd93'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:47.601527+00:00', 'id': '1ef566b3-23b5-6de6-8001-a39c8ce6fd93', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})], 'agent': 'agent', 'branch:agent:should_continue:tools': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 3, 'start:agent': 3, 'agent': 3, 'branch:agent:should_continue:tools': 3}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}, 'agent': {'start:agent': 2}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-b5da58b5-8f75-485d-af29-bfdeb09b0d94-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_IJvXEELx7Ir3kASCqr9dbvhU', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})]}}, 'step': 1}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-1d6c-6a02-8000-158f156ffce3'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-1d6c-6a02-8000-158f156ffce3'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:46.942389+00:00', 'id': '1ef566b3-1d6c-6a02-8000-158f156ffce3', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='1ae4b12f-b1cb-4d55-a754-42cf1c2fbcd5')], 'start:agent': '__start__'}, 'channel_versions': {'__start__': 2, 'messages': 2, 'start:agent': 2}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': 1}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': None, 'step': 0}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-1d67-61e2-bfff-d91abbcc3a09'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef566b3-1d67-61e2-bfff-d91abbcc3a09'}}, checkpoint={'v': 1, 'ts': '2024-08-09T16:19:46.940133+00:00', 'id': '1ef566b3-1d67-61e2-bfff-d91abbcc3a09', 'channel_values': {'messages': [], '__start__': {'messages': [['human', "what's the weather in nyc"]]}}, 'channel_versions': {'__start__': 1}, 'versions_seen': {'__input__': {}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'input', 'writes': {'messages': [['human', "what's the weather in nyc"]]}, 'step': -1}, parent_config=None, pending_writes=None)]