如何使用 Redis 创建自定义检查点¶
创建 LangGraph 代理时,您也可以将其设置以持久保存其状态。这使您可以执行以下操作,例如多次与代理交互并让它记住之前的交互。
此参考实现展示了如何使用 Redis 作为持久保存检查点状态的后端。确保 Redis 在端口 6379
上运行才能完成本指南。
注意
这是一个 **参考** 实现。只要符合 BaseCheckpointSaver 接口,您就可以使用不同的数据库实现自己的检查点或修改此检查点。
为了演示目的,我们将持久性添加到 预建的 create react 代理 中。
一般来说,您可以像这样将检查点添加到您构建的任何自定义图中
from langgraph.graph import StateGraph
builder = StateGraph(....)
# ... define the graph
checkpointer = # redis checkpointer (see examples below)
graph = builder.compile(checkpointer=checkpointer)
...
设置¶
首先,让我们安装所需的软件包并设置 API 密钥
在 [1] 中
已复制!
%%capture --no-stderr
%pip install -U redis langgraph langchain_openai
%%capture --no-stderr %pip install -U redis langgraph langchain_openai
在 [ ] 中
已复制!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
检查点实现¶
定义导入和辅助函数¶
首先,让我们为 RedisSaver
和 AsyncRedisSaver
定义一些导入和共享实用程序
在 [3] 中
已复制!
"""Implementation of a langgraph checkpoint saver using Redis."""
from contextlib import asynccontextmanager, contextmanager
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Iterator,
List,
Optional,
Tuple,
)
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
ChannelVersions,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
PendingWrite,
get_checkpoint_id,
)
from langgraph.checkpoint.serde.base import SerializerProtocol
from redis import Redis
from redis.asyncio import Redis as AsyncRedis
REDIS_KEY_SEPARATOR = ":"
# Utilities shared by both RedisSaver and AsyncRedisSaver
def _make_redis_checkpoint_key(
thread_id: str, checkpoint_ns: str, checkpoint_id: str
) -> str:
return REDIS_KEY_SEPARATOR.join(
["checkpoint", thread_id, checkpoint_ns, checkpoint_id]
)
def _make_redis_checkpoint_writes_key(
thread_id: str,
checkpoint_ns: str,
checkpoint_id: str,
task_id: str,
idx: Optional[int],
) -> str:
if idx is None:
return REDIS_KEY_SEPARATOR.join(
["writes", thread_id, checkpoint_ns, checkpoint_id, task_id]
)
return REDIS_KEY_SEPARATOR.join(
["writes", thread_id, checkpoint_ns, checkpoint_id, task_id, str(idx)]
)
def _parse_redis_checkpoint_key(redis_key: str) -> dict:
namespace, thread_id, checkpoint_ns, checkpoint_id = redis_key.split(
REDIS_KEY_SEPARATOR
)
if namespace != "checkpoint":
raise ValueError("Expected checkpoint key to start with 'checkpoint'")
return {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
def _parse_redis_checkpoint_writes_key(redis_key: str) -> dict:
namespace, thread_id, checkpoint_ns, checkpoint_id, task_id, idx = redis_key.split(
REDIS_KEY_SEPARATOR
)
if namespace != "writes":
raise ValueError("Expected checkpoint key to start with 'checkpoint'")
return {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"task_id": task_id,
"idx": idx,
}
def _filter_keys(
keys: List[str], before: Optional[RunnableConfig], limit: Optional[int]
) -> list:
"""Filter and sort Redis keys based on optional criteria."""
if before:
keys = [
k
for k in keys
if _parse_redis_checkpoint_key(k.decode())["checkpoint_id"]
< before["configurable"]["checkpoint_id"]
]
keys = sorted(
keys,
key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"],
reverse=True,
)
if limit:
keys = keys[:limit]
return keys
def _dump_writes(serde: SerializerProtocol, writes: tuple[str, Any]) -> list[dict]:
"""Serialize pending writes."""
serialized_writes = []
for channel, value in writes:
type_, serialized_value = serde.dumps_typed(value)
serialized_writes.append(
{"channel": channel, "type": type_, "value": serialized_value}
)
return serialized_writes
def _load_writes(
serde: SerializerProtocol, task_id_to_data: dict[tuple[str, str], dict]
) -> list[PendingWrite]:
"""Deserialize pending writes."""
writes = [
(
task_id,
data[b"channel"].decode(),
serde.loads_typed((data[b"type"].decode(), data[b"value"])),
)
for (task_id, _), data in task_id_to_data.items()
]
return writes
def _parse_redis_checkpoint_data(
serde: SerializerProtocol,
key: str,
data: dict,
pending_writes: Optional[List[PendingWrite]] = None,
) -> Optional[CheckpointTuple]:
"""Parse checkpoint data retrieved from Redis."""
if not data:
return None
parsed_key = _parse_redis_checkpoint_key(key)
thread_id = parsed_key["thread_id"]
checkpoint_ns = parsed_key["checkpoint_ns"]
checkpoint_id = parsed_key["checkpoint_id"]
config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
checkpoint = serde.loads_typed((data[b"type"].decode(), data[b"checkpoint"]))
metadata = serde.loads(data[b"metadata"].decode())
parent_checkpoint_id = data.get(b"parent_checkpoint_id", b"").decode()
parent_config = (
{
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": parent_checkpoint_id,
}
}
if parent_checkpoint_id
else None
)
return CheckpointTuple(
config=config,
checkpoint=checkpoint,
metadata=metadata,
parent_config=parent_config,
pending_writes=pending_writes,
)
"""使用 Redis 实现 langgraph 检查点保存器。""" from contextlib import asynccontextmanager, contextmanager from typing import ( Any, AsyncGenerator, AsyncIterator, Iterator, List, Optional, Tuple, ) from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.base import ( BaseCheckpointSaver, ChannelVersions, Checkpoint, CheckpointMetadata, CheckpointTuple, PendingWrite, get_checkpoint_id, ) from langgraph.checkpoint.serde.base import SerializerProtocol from redis import Redis from redis.asyncio import Redis as AsyncRedis REDIS_KEY_SEPARATOR = ":" # RedisSaver 和 AsyncRedisSaver 共享的实用程序 def _make_redis_checkpoint_key( thread_id: str, checkpoint_ns: str, checkpoint_id: str ) -> str: return REDIS_KEY_SEPARATOR.join( ["checkpoint", thread_id, checkpoint_ns, checkpoint_id] ) def _make_redis_checkpoint_writes_key( thread_id: str, checkpoint_ns: str, checkpoint_id: str, task_id: str, idx: Optional[int], ) -> str: if idx is None: return REDIS_KEY_SEPARATOR.join( ["writes", thread_id, checkpoint_ns, checkpoint_id, task_id] ) return REDIS_KEY_SEPARATOR.join( ["writes", thread_id, checkpoint_ns, checkpoint_id, task_id, str(idx)] ) def _parse_redis_checkpoint_key(redis_key: str) -> dict: namespace, thread_id, checkpoint_ns, checkpoint_id = redis_key.split( REDIS_KEY_SEPARATOR ) if namespace != "checkpoint": raise ValueError("Expected checkpoint key to start with 'checkpoint'") return { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } def _parse_redis_checkpoint_writes_key(redis_key: str) -> dict: namespace, thread_id, checkpoint_ns, checkpoint_id, task_id, idx = redis_key.split( REDIS_KEY_SEPARATOR ) if namespace != "writes": raise ValueError("Expected checkpoint key to start with 'checkpoint'") return { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, "task_id": task_id, "idx": idx, } def _filter_keys( keys: List[str], before: Optional[RunnableConfig], limit: Optional[int] ) -> list: """根据可选条件过滤和排序 Redis 密钥。""" if before: keys = [ k for k in keys if _parse_redis_checkpoint_key(k.decode())["checkpoint_id"] < before["configurable"]["checkpoint_id"] ] keys = sorted( keys, key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"], reverse=True, ) if limit: keys = keys[:limit] return keys def _dump_writes(serde: SerializerProtocol, writes: tuple[str, Any]) -> list[dict]: """序列化挂起的写入。""" serialized_writes = [] for channel, value in writes: type_, serialized_value = serde.dumps_typed(value) serialized_writes.append( {"channel": channel, "type": type_, "value": serialized_value} ) return serialized_writes def _load_writes( serde: SerializerProtocol, task_id_to_data: dict[tuple[str, str], dict] ) -> list[PendingWrite]: """反序列化挂起的写入。""" writes = [ ( task_id, data[b"channel"].decode(), serde.loads_typed((data[b"type"].decode(), data[b"value"])), ) for (task_id, _), data in task_id_to_data.items() ] return writes def _parse_redis_checkpoint_data( serde: SerializerProtocol, key: str, data: dict, pending_writes: Optional[List[PendingWrite]] = None, ) -> Optional[CheckpointTuple]: """解析从 Redis 检索到的检查点数据。""" if not data: return None parsed_key = _parse_redis_checkpoint_key(key) thread_id = parsed_key["thread_id"] checkpoint_ns = parsed_key["checkpoint_ns"] checkpoint_id = parsed_key["checkpoint_id"] config = { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } } checkpoint = serde.loads_typed((data[b"type"].decode(), data[b"checkpoint"])) metadata = serde.loads(data[b"metadata"].decode()) parent_checkpoint_id = data.get(b"parent_checkpoint_id", b"").decode() parent_config = ( { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": parent_checkpoint_id, } } if parent_checkpoint_id else None ) return CheckpointTuple( config=config, checkpoint=checkpoint, metadata=metadata, parent_config=parent_config, pending_writes=pending_writes, )
RedisSaver¶
以下是 RedisSaver 的实现(用于图的同步使用,即 .invoke()
、.stream()
)。RedisSaver 实现任何检查点所需的四种方法
.put
- 使用其配置和元数据存储检查点。.put_writes
- 存储与检查点关联的中间写入(即挂起的写入)。.get_tuple
- 使用给定配置(thread_id
和checkpoint_id
)获取检查点元组。.list
- 列出与给定配置和过滤器条件匹配的检查点。
在 [4] 中
已复制!
class RedisSaver(BaseCheckpointSaver):
"""Redis-based checkpoint saver implementation."""
conn: Redis
def __init__(self, conn: Redis):
super().__init__()
self.conn = conn
@classmethod
@contextmanager
def from_conn_info(cls, *, host: str, port: int, db: int) -> Iterator["RedisSaver"]:
conn = None
try:
conn = Redis(host=host, port=port, db=db)
yield RedisSaver(conn)
finally:
if conn:
conn.close()
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""Save a checkpoint to Redis.
Args:
config (RunnableConfig): The config to associate with the checkpoint.
checkpoint (Checkpoint): The checkpoint to save.
metadata (CheckpointMetadata): Additional metadata to save with the checkpoint.
new_versions (ChannelVersions): New channel versions as of this write.
Returns:
RunnableConfig: Updated configuration after storing the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = checkpoint["id"]
parent_checkpoint_id = config["configurable"].get("checkpoint_id")
key = _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
serialized_metadata = self.serde.dumps(metadata)
data = {
"checkpoint": serialized_checkpoint,
"type": type_,
"metadata": serialized_metadata,
"parent_checkpoint_id": parent_checkpoint_id
if parent_checkpoint_id
else "",
}
self.conn.hset(key, mapping=data)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
def put_writes(
self,
config: RunnableConfig,
writes: List[Tuple[str, Any]],
task_id: str,
) -> RunnableConfig:
"""Store intermediate writes linked to a checkpoint.
Args:
config (RunnableConfig): Configuration of the related checkpoint.
writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair.
task_id (str): Identifier for the task creating the writes.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = config["configurable"]["checkpoint_id"]
for idx, data in enumerate(_dump_writes(self.serde, writes)):
key = _make_redis_checkpoint_writes_key(
thread_id, checkpoint_ns, checkpoint_id, task_id, idx
)
self.conn.hset(key, mapping=data)
return config
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get a checkpoint tuple from Redis.
This method retrieves a checkpoint tuple from Redis based on the
provided config. If the config contains a "checkpoint_id" key, the checkpoint with
the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint
for the given thread ID is retrieved.
Args:
config (RunnableConfig): The config to use for retrieving the checkpoint.
Returns:
Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_id = get_checkpoint_id(config)
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_key = self._get_checkpoint_key(
self.conn, thread_id, checkpoint_ns, checkpoint_id
)
if not checkpoint_key:
return None
checkpoint_data = self.conn.hgetall(checkpoint_key)
# load pending writes
checkpoint_id = (
checkpoint_id
or _parse_redis_checkpoint_key(checkpoint_key)["checkpoint_id"]
)
writes_key = _make_redis_checkpoint_writes_key(
thread_id, checkpoint_ns, checkpoint_id, "*", None
)
matching_keys = self.conn.keys(pattern=writes_key)
parsed_keys = [
_parse_redis_checkpoint_writes_key(key.decode()) for key in matching_keys
]
pending_writes = _load_writes(
self.serde,
{
(parsed_key["task_id"], parsed_key["idx"]): self.conn.hgetall(key)
for key, parsed_key in sorted(
zip(matching_keys, parsed_keys), key=lambda x: x[1]["idx"]
)
},
)
return _parse_redis_checkpoint_data(
self.serde, checkpoint_key, checkpoint_data, pending_writes=pending_writes
)
def list(
self,
config: Optional[RunnableConfig],
*,
# TODO: implement filtering
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> Iterator[CheckpointTuple]:
"""List checkpoints from the database.
This method retrieves a list of checkpoint tuples from Redis based
on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
Args:
config (RunnableConfig): The config to use for listing the checkpoints.
filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. Defaults to None.
before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]): The maximum number of checkpoints to return. Defaults to None.
Yields:
Iterator[CheckpointTuple]: An iterator of checkpoint tuples.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
pattern = _make_redis_checkpoint_key(thread_id, checkpoint_ns, "*")
keys = _filter_keys(self.conn.keys(pattern), before, limit)
for key in keys:
data = self.conn.hgetall(key)
if data and b"checkpoint" in data and b"metadata" in data:
yield _parse_redis_checkpoint_data(self.serde, key.decode(), data)
def _get_checkpoint_key(
self, conn, thread_id: str, checkpoint_ns: str, checkpoint_id: Optional[str]
) -> Optional[str]:
"""Determine the Redis key for a checkpoint."""
if checkpoint_id:
return _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
all_keys = conn.keys(_make_redis_checkpoint_key(thread_id, checkpoint_ns, "*"))
if not all_keys:
return None
latest_key = max(
all_keys,
key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"],
)
return latest_key.decode()
class RedisSaver(BaseCheckpointSaver): """基于 Redis 的检查点保存器实现。""" conn: Redis def __init__(self, conn: Redis): super().__init__() self.conn = conn @classmethod @contextmanager def from_conn_info(cls, *, host: str, port: int, db: int) -> Iterator["RedisSaver"]: conn = None try: conn = Redis(host=host, port=port, db=db) yield RedisSaver(conn) finally: if conn: conn.close() def put( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: """将检查点保存到 Redis。 Args: config (RunnableConfig): 与检查点关联的配置。 checkpoint (Checkpoint): 要保存的检查点。 metadata (CheckpointMetadata): 与检查点一起保存的附加元数据。 new_versions (ChannelVersions): 此写入时的新的通道版本。 Returns: RunnableConfig: 存储检查点后的更新配置。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = checkpoint["id"] parent_checkpoint_id = config["configurable"].get("checkpoint_id") key = _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id) type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) serialized_metadata = self.serde.dumps(metadata) data = { "checkpoint": serialized_checkpoint, "type": type_, "metadata": serialized_metadata, "parent_checkpoint_id": parent_checkpoint_id if parent_checkpoint_id else "", } self.conn.hset(key, mapping=data) return { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } } def put_writes( self, config: RunnableConfig, writes: List[Tuple[str, Any]], task_id: str, ) -> RunnableConfig: """存储与检查点链接的中间写入。 Args: config (RunnableConfig): 相关检查点的配置。 writes (Sequence[Tuple[str, Any]]): 要存储的写入列表,每个写入都是 (channel, value) 对。 task_id (str): 创建写入的任务的标识符。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = config["configurable"]["checkpoint_id"] for idx, data in enumerate(_dump_writes(self.serde, writes)): key = _make_redis_checkpoint_writes_key( thread_id, checkpoint_ns, checkpoint_id, task_id, idx ) self.conn.hset(key, mapping=data) return config def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: """从 Redis 获取检查点元组。 此方法根据提供的配置从 Redis 获取检查点元组。 如果配置包含“checkpoint_id”键,则检索匹配线程 ID 和检查点 ID 的检查点。 否则,将检索给定线程 ID 的最新检查点。 Args: config (RunnableConfig): 用于检索检查点的配置。 Returns: Optional[CheckpointTuple]: 检索到的检查点元组,如果未找到匹配的检查点,则为 None。 """ thread_id = config["configurable"]["thread_id"] checkpoint_id = get_checkpoint_id(config) checkpoint_ns = config["configurable"].get("checkpoint_ns", "") checkpoint_key = self._get_checkpoint_key( self.conn, thread_id, checkpoint_ns, checkpoint_id ) if not checkpoint_key: return None checkpoint_data = self.conn.hgetall(checkpoint_key) # 加载待处理写入 checkpoint_id = ( checkpoint_id or _parse_redis_checkpoint_key(checkpoint_key)["checkpoint_id"] ) writes_key = _make_redis_checkpoint_writes_key( thread_id, checkpoint_ns, checkpoint_id, "*", None ) matching_keys = self.conn.keys(pattern=writes_key) parsed_keys = [ _parse_redis_checkpoint_writes_key(key.decode()) for key in matching_keys ] pending_writes = _load_writes( self.serde, { (parsed_key["task_id"], parsed_key["idx"]): self.conn.hgetall(key) for key, parsed_key in sorted( zip(matching_keys, parsed_keys), key=lambda x: x[1]["idx"] ) }, ) return _parse_redis_checkpoint_data( self.serde, checkpoint_key, checkpoint_data, pending_writes=pending_writes ) def list( self, config: Optional[RunnableConfig], *, # TODO: 实现过滤 filter: Optional[dict[str, Any]] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None, ) -> Iterator[CheckpointTuple]: """从数据库中列出检查点。 此方法根据提供的配置从 Redis 检索检查点元组列表。 检查点按检查点 ID 降序排列(最新优先)。 Args: config (RunnableConfig): 用于列出检查点的配置。 filter (Optional[Dict[str, Any]]): 元数据的附加过滤条件。 默认值为 None。 before (Optional[RunnableConfig]): 如果提供,则仅返回指定检查点 ID 之前的检查点。 默认值为 None。 limit (Optional[int]): 要返回的最大检查点数。 默认值为 None。 Yields: Iterator[CheckpointTuple]: 检查点元组的迭代器。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") pattern = _make_redis_checkpoint_key(thread_id, checkpoint_ns, "*") keys = _filter_keys(self.conn.keys(pattern), before, limit) for key in keys: data = self.conn.hgetall(key) if data and b"checkpoint" in data and b"metadata" in data: yield _parse_redis_checkpoint_data(self.serde, key.decode(), data) def _get_checkpoint_key( self, conn, thread_id: str, checkpoint_ns: str, checkpoint_id: Optional[str] ) -> Optional[str]: """确定检查点的 Redis 键。""" if checkpoint_id: return _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id) all_keys = conn.keys(_make_redis_checkpoint_key(thread_id, checkpoint_ns, "*")) if not all_keys: return None latest_key = max( all_keys, key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"], ) return latest_key.decode()
AsyncRedis¶
以下是 AsyncRedisSaver 的参考实现(用于图的异步使用,即 .ainvoke()
、.astream()
)。AsyncRedisSaver 实施了任何异步检查点保存器所需的四种方法
.aput
- 存储检查点及其配置和元数据。.aput_writes
- 存储与检查点链接的中间写入(即待处理写入)。.aget_tuple
- 使用给定配置(thread_id
和checkpoint_id
)获取检查点元组。.alist
- 列出与给定配置和过滤条件匹配的检查点。
在 [5]
已复制!
class AsyncRedisSaver(BaseCheckpointSaver):
"""Async redis-based checkpoint saver implementation."""
conn: AsyncRedis
def __init__(self, conn: AsyncRedis):
super().__init__()
self.conn = conn
@classmethod
@asynccontextmanager
async def from_conn_info(
cls, *, host: str, port: int, db: int
) -> AsyncIterator["AsyncRedisSaver"]:
conn = None
try:
conn = AsyncRedis(host=host, port=port, db=db)
yield AsyncRedisSaver(conn)
finally:
if conn:
await conn.aclose()
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
"""Save a checkpoint to the database asynchronously.
This method saves a checkpoint to Redis. The checkpoint is associated
with the provided config and its parent config (if any).
Args:
config (RunnableConfig): The config to associate with the checkpoint.
checkpoint (Checkpoint): The checkpoint to save.
metadata (CheckpointMetadata): Additional metadata to save with the checkpoint.
new_versions (ChannelVersions): New channel versions as of this write.
Returns:
RunnableConfig: Updated configuration after storing the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = checkpoint["id"]
parent_checkpoint_id = config["configurable"].get("checkpoint_id")
key = _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
serialized_metadata = self.serde.dumps(metadata)
data = {
"checkpoint": serialized_checkpoint,
"type": type_,
"checkpoint_id": checkpoint_id,
"metadata": serialized_metadata,
"parent_checkpoint_id": parent_checkpoint_id
if parent_checkpoint_id
else "",
}
await self.conn.hset(key, mapping=data)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
async def aput_writes(
self,
config: RunnableConfig,
writes: List[Tuple[str, Any]],
task_id: str,
) -> RunnableConfig:
"""Store intermediate writes linked to a checkpoint asynchronously.
This method saves intermediate writes associated with a checkpoint to the database.
Args:
config (RunnableConfig): Configuration of the related checkpoint.
writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair.
task_id (str): Identifier for the task creating the writes.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = config["configurable"]["checkpoint_id"]
for idx, data in enumerate(_dump_writes(self.serde, writes)):
key = _make_redis_checkpoint_writes_key(
thread_id, checkpoint_ns, checkpoint_id, task_id, idx
)
await self.conn.hset(key, mapping=data)
return config
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get a checkpoint tuple from Redis asynchronously.
This method retrieves a checkpoint tuple from Redis based on the
provided config. If the config contains a "checkpoint_id" key, the checkpoint with
the matching thread ID and checkpoint ID is retrieved. Otherwise, the latest checkpoint
for the given thread ID is retrieved.
Args:
config (RunnableConfig): The config to use for retrieving the checkpoint.
Returns:
Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_id = get_checkpoint_id(config)
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_key = await self._aget_checkpoint_key(
self.conn, thread_id, checkpoint_ns, checkpoint_id
)
if not checkpoint_key:
return None
checkpoint_data = await self.conn.hgetall(checkpoint_key)
# load pending writes
checkpoint_id = (
checkpoint_id
or _parse_redis_checkpoint_key(checkpoint_key)["checkpoint_id"]
)
writes_key = _make_redis_checkpoint_writes_key(
thread_id, checkpoint_ns, checkpoint_id, "*", None
)
matching_keys = await self.conn.keys(pattern=writes_key)
parsed_keys = [
_parse_redis_checkpoint_writes_key(key.decode()) for key in matching_keys
]
pending_writes = _load_writes(
self.serde,
{
(parsed_key["task_id"], parsed_key["idx"]): await self.conn.hgetall(key)
for key, parsed_key in sorted(
zip(matching_keys, parsed_keys), key=lambda x: x[1]["idx"]
)
},
)
return _parse_redis_checkpoint_data(
self.serde, checkpoint_key, checkpoint_data, pending_writes=pending_writes
)
async def alist(
self,
config: Optional[RunnableConfig],
*,
# TODO: implement filtering
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> AsyncGenerator[CheckpointTuple, None]:
"""List checkpoints from Redis asynchronously.
This method retrieves a list of checkpoint tuples from Redis based
on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first).
Args:
config (Optional[RunnableConfig]): Base configuration for filtering checkpoints.
filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata.
before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None.
limit (Optional[int]): Maximum number of checkpoints to return.
Yields:
AsyncIterator[CheckpointTuple]: An asynchronous iterator of matching checkpoint tuples.
"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
pattern = _make_redis_checkpoint_key(thread_id, checkpoint_ns, "*")
keys = _filter_keys(await self.conn.keys(pattern), before, limit)
for key in keys:
data = await self.conn.hgetall(key)
if data and b"checkpoint" in data and b"metadata" in data:
yield _parse_redis_checkpoint_data(self.serde, key.decode(), data)
async def _aget_checkpoint_key(
self, conn, thread_id: str, checkpoint_ns: str, checkpoint_id: Optional[str]
) -> Optional[str]:
"""Asynchronously determine the Redis key for a checkpoint."""
if checkpoint_id:
return _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
all_keys = await conn.keys(
_make_redis_checkpoint_key(thread_id, checkpoint_ns, "*")
)
if not all_keys:
return None
latest_key = max(
all_keys,
key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"],
)
return latest_key.decode()
class AsyncRedisSaver(BaseCheckpointSaver): """基于异步 Redis 的检查点保存器实现。""" conn: AsyncRedis def __init__(self, conn: AsyncRedis): super().__init__() self.conn = conn @classmethod @asynccontextmanager async def from_conn_info( cls, *, host: str, port: int, db: int ) -> AsyncIterator["AsyncRedisSaver"]: conn = None try: conn = AsyncRedis(host=host, port=port, db=db) yield AsyncRedisSaver(conn) finally: if conn: await conn.aclose() async def aput( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) -> RunnableConfig: """异步将检查点保存到数据库。 此方法将检查点保存到 Redis。 检查点与提供的配置及其父配置(如果有)相关联。 Args: config (RunnableConfig): 与检查点关联的配置。 checkpoint (Checkpoint): 要保存的检查点。 metadata (CheckpointMetadata): 与检查点一起保存的附加元数据。 new_versions (ChannelVersions): 此写入时的新的通道版本。 Returns: RunnableConfig: 存储检查点后的更新配置。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = checkpoint["id"] parent_checkpoint_id = config["configurable"].get("checkpoint_id") key = _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id) type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) serialized_metadata = self.serde.dumps(metadata) data = { "checkpoint": serialized_checkpoint, "type": type_, "checkpoint_id": checkpoint_id, "metadata": serialized_metadata, "parent_checkpoint_id": parent_checkpoint_id if parent_checkpoint_id else "", } await self.conn.hset(key, mapping=data) return { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } } async def aput_writes( self, config: RunnableConfig, writes: List[Tuple[str, Any]], task_id: str, ) -> RunnableConfig: """异步存储与检查点链接的中间写入。 此方法将与检查点关联的中间写入保存到数据库。 Args: config (RunnableConfig): 相关检查点的配置。 writes (Sequence[Tuple[str, Any]]): 要存储的写入列表,每个写入都是 (channel, value) 对。 task_id (str): 创建写入的任务的标识符。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] checkpoint_id = config["configurable"]["checkpoint_id"] for idx, data in enumerate(_dump_writes(self.serde, writes)): key = _make_redis_checkpoint_writes_key( thread_id, checkpoint_ns, checkpoint_id, task_id, idx ) await self.conn.hset(key, mapping=data) return config async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: """异步从 Redis 获取检查点元组。 此方法根据提供的配置从 Redis 检索检查点元组。 如果配置包含“checkpoint_id”键,则检索匹配线程 ID 和检查点 ID 的检查点。 否则,将检索给定线程 ID 的最新检查点。 Args: config (RunnableConfig): 用于检索检查点的配置。 Returns: Optional[CheckpointTuple]: 检索到的检查点元组,如果未找到匹配的检查点,则为 None。 """ thread_id = config["configurable"]["thread_id"] checkpoint_id = get_checkpoint_id(config) checkpoint_ns = config["configurable"].get("checkpoint_ns", "") checkpoint_key = await self._aget_checkpoint_key( self.conn, thread_id, checkpoint_ns, checkpoint_id ) if not checkpoint_key: return None checkpoint_data = await self.conn.hgetall(checkpoint_key) # 加载待处理写入 checkpoint_id = ( checkpoint_id or _parse_redis_checkpoint_key(checkpoint_key)["checkpoint_id"] ) writes_key = _make_redis_checkpoint_writes_key( thread_id, checkpoint_ns, checkpoint_id, "*", None ) matching_keys = await self.conn.keys(pattern=writes_key) parsed_keys = [ _parse_redis_checkpoint_writes_key(key.decode()) for key in matching_keys ] pending_writes = _load_writes( self.serde, { (parsed_key["task_id"], parsed_key["idx"]): await self.conn.hgetall(key) for key, parsed_key in sorted( zip(matching_keys, parsed_keys), key=lambda x: x[1]["idx"] ) }, ) return _parse_redis_checkpoint_data( self.serde, checkpoint_key, checkpoint_data, pending_writes=pending_writes ) async def alist( self, config: Optional[RunnableConfig], *, # TODO: 实现过滤 filter: Optional[dict[str, Any]] = None, before: Optional[RunnableConfig] = None, limit: Optional[int] = None, ) -> AsyncGenerator[CheckpointTuple, None]: """异步从 Redis 中列出检查点。 此方法根据提供的配置从 Redis 检索检查点元组列表。 检查点按检查点 ID 降序排列(最新优先)。 Args: config (Optional[RunnableConfig]): 用于过滤检查点的基本配置。 filter (Optional[Dict[str, Any]]): 元数据的附加过滤条件。 before (Optional[RunnableConfig]): 如果提供,则仅返回指定检查点 ID 之前的检查点。 默认值为 None。 limit (Optional[int]): 要返回的最大检查点数。 Yields: AsyncIterator[CheckpointTuple]: 匹配检查点元组的异步迭代器。 """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") pattern = _make_redis_checkpoint_key(thread_id, checkpoint_ns, "*") keys = _filter_keys(await self.conn.keys(pattern), before, limit) for key in keys: data = await self.conn.hgetall(key) if data and b"checkpoint" in data and b"metadata" in data: yield _parse_redis_checkpoint_data(self.serde, key.decode(), data) async def _aget_checkpoint_key( self, conn, thread_id: str, checkpoint_ns: str, checkpoint_id: Optional[str] ) -> Optional[str]: """异步确定检查点的 Redis 键。""" if checkpoint_id: return _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id) all_keys = await conn.keys( _make_redis_checkpoint_key(thread_id, checkpoint_ns, "*") ) if not all_keys: return None latest_key = max( all_keys, key=lambda k: _parse_redis_checkpoint_key(k.decode())["checkpoint_id"], ) return latest_key.decode()
设置图的模型和工具¶
在 [6]
已复制!
from typing import Literal
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
from typing import Literal from langchain_core.runnables import ConfigurableField from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent @tool def get_weather(city: Literal["nyc", "sf"]): """使用此工具获取天气信息。""" if city == "nyc": return "纽约可能会多云" elif city == "sf": return "旧金山总是阳光明媚" else: raise AssertionError("未知城市") tools = [get_weather] model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
使用同步连接¶
在 [7]
已复制!
with RedisSaver.from_conn_info(host="localhost", port=6379, db=0) as checkpointer:
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
latest_checkpoint = checkpointer.get(config)
latest_checkpoint_tuple = checkpointer.get_tuple(config)
checkpoint_tuples = list(checkpointer.list(config))
with RedisSaver.from_conn_info(host="localhost", port=6379, db=0) as checkpointer: graph = create_react_agent(model, tools=tools, checkpointer=checkpointer) config = {"configurable": {"thread_id": "1"}} res = graph.invoke({"messages": [("human", "旧金山的天气怎么样")]}, config) latest_checkpoint = checkpointer.get(config) latest_checkpoint_tuple = checkpointer.get_tuple(config) checkpoint_tuples = list(checkpointer.list(config))
在 [8]
已复制!
latest_checkpoint
latest_checkpoint
Out[8]
{'v': 1, 'ts': '2024-08-09T01:56:48.328315+00:00', 'id': '1ef55f2a-3614-69b4-8003-2181cff935cc', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='e27bb3a1-1798-494a-b4ad-2deadda8b2bf', tool_call_id='call_l5e5YcTJDJYOdvi4scBy9n2I'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-ad546b5a-70ce-404e-9656-dcc6ecd482d3-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.16e98d6f7ece7598829eddf1b33a33c4', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}
在 [9]
已复制!
latest_checkpoint_tuple
latest_checkpoint_tuple
Out[9]
CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3614-69b4-8003-2181cff935cc'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:48.328315+00:00', 'id': '1ef55f2a-3614-69b4-8003-2181cff935cc', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='e27bb3a1-1798-494a-b4ad-2deadda8b2bf', tool_call_id='call_l5e5YcTJDJYOdvi4scBy9n2I'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-ad546b5a-70ce-404e-9656-dcc6ecd482d3-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.16e98d6f7ece7598829eddf1b33a33c4', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-ad546b5a-70ce-404e-9656-dcc6ecd482d3-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-306f-6252-8002-47c2374ec1f2'}}, pending_writes=[])
在 [10]
已复制!
checkpoint_tuples
checkpoint_tuples
Out[10]
[CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3614-69b4-8003-2181cff935cc'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:48.328315+00:00', 'id': '1ef55f2a-3614-69b4-8003-2181cff935cc', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='e27bb3a1-1798-494a-b4ad-2deadda8b2bf', tool_call_id='call_l5e5YcTJDJYOdvi4scBy9n2I'), AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-ad546b5a-70ce-404e-9656-dcc6ecd482d3-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.16e98d6f7ece7598829eddf1b33a33c4', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in San Francisco is always sunny!', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-ad546b5a-70ce-404e-9656-dcc6ecd482d3-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-306f-6252-8002-47c2374ec1f2'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-306f-6252-8002-47c2374ec1f2'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:47.736251+00:00', 'id': '1ef55f2a-306f-6252-8002-47c2374ec1f2', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='e27bb3a1-1798-494a-b4ad-2deadda8b2bf', tool_call_id='call_l5e5YcTJDJYOdvi4scBy9n2I')], 'tools': 'tools'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000004.b16eb718f179ac1dcde54c5652768cf5', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000004.', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'tools': {'messages': [ToolMessage(content="It's always sunny in sf", name='get_weather', id='e27bb3a1-1798-494a-b4ad-2deadda8b2bf', tool_call_id='call_l5e5YcTJDJYOdvi4scBy9n2I')]}}, 'step': 2}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-305f-61cc-8001-efac33022ef7'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-305f-61cc-8001-efac33022ef7'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:47.729689+00:00', 'id': '1ef55f2a-305f-61cc-8001-efac33022ef7', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})], 'agent': 'agent', 'branch:agent:should_continue:tools': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000003.4dd312547dcca1cf91a19adb620a18d6', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f1531f1-067c-4e16-8b62-7a6b663e93bd-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_l5e5YcTJDJYOdvi4scBy9n2I', 'type': 'tool_call'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71})]}}, 'step': 1}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-2a52-6a7c-8000-27624d954d15'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-2a52-6a7c-8000-27624d954d15'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:47.095456+00:00', 'id': '1ef55f2a-2a52-6a7c-8000-27624d954d15', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='f911e000-75a1-41f6-8e38-77bb086c2ecf')], 'start:agent': '__start__'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000002.52e8b0c387f50c28345585c088150464', 'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': None, 'step': 0}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-2a50-6812-bfff-34e3be35d6f2'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-2a50-6812-bfff-34e3be35d6f2'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:47.094575+00:00', 'id': '1ef55f2a-2a50-6812-bfff-34e3be35d6f2', 'channel_values': {'messages': [], '__start__': {'messages': [['human', "what's the weather in sf"]]}}, 'channel_versions': {'__start__': '00000000000000000000000000000001.ab89befb52cc0e91e106ef7f500ea033'}, 'versions_seen': {'__input__': {}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'input', 'writes': {'messages': [['human', "what's the weather in sf"]]}, 'step': -1}, parent_config=None, pending_writes=None)]
使用异步连接¶
在 [11]
已复制!
async with AsyncRedisSaver.from_conn_info(
host="localhost", port=6379, db=0
) as checkpointer:
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "2"}}
res = await graph.ainvoke(
{"messages": [("human", "what's the weather in nyc")]}, config
)
latest_checkpoint = await checkpointer.aget(config)
latest_checkpoint_tuple = await checkpointer.aget_tuple(config)
checkpoint_tuples = [c async for c in checkpointer.alist(config)]
使用 AsyncRedisSaver.from_conn_info() 方法创建 Redis 检查点,并将其用于创建 React Agent。
In [12]
已复制!
latest_checkpoint
latest_checkpoint
Out[12]
{'v': 1, 'ts': '2024-08-09T01:56:49.503241+00:00', 'id': '1ef55f2a-4149-61ea-8003-dc5506862287', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='922124bd-d3b0-4929-a996-a75d842b8b44', tool_call_id='call_TvPLLyhuQQN99EcZc8SzL8x9'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-69a10e66-d61f-475e-b7de-a1ecd08a6c3a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.2cb29d082da6435a7528b4c917fd0c28', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}
In [13]
已复制!
latest_checkpoint_tuple
latest_checkpoint_tuple
Out[13]
CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-4149-61ea-8003-dc5506862287'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:49.503241+00:00', 'id': '1ef55f2a-4149-61ea-8003-dc5506862287', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='922124bd-d3b0-4929-a996-a75d842b8b44', tool_call_id='call_TvPLLyhuQQN99EcZc8SzL8x9'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-69a10e66-d61f-475e-b7de-a1ecd08a6c3a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.2cb29d082da6435a7528b4c917fd0c28', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-69a10e66-d61f-475e-b7de-a1ecd08a6c3a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3d07-647e-8002-b5e4d28c00c9'}}, pending_writes=[])
In [14]
已复制!
checkpoint_tuples
checkpoint_tuples
Out[14]
[CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-4149-61ea-8003-dc5506862287'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:49.503241+00:00', 'id': '1ef55f2a-4149-61ea-8003-dc5506862287', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='922124bd-d3b0-4929-a996-a75d842b8b44', tool_call_id='call_TvPLLyhuQQN99EcZc8SzL8x9'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-69a10e66-d61f-475e-b7de-a1ecd08a6c3a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000005.2cb29d082da6435a7528b4c917fd0c28', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000005.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000005.'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'stop', 'logprobs': None}, id='run-69a10e66-d61f-475e-b7de-a1ecd08a6c3a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}, 'step': 3}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3d07-647e-8002-b5e4d28c00c9'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3d07-647e-8002-b5e4d28c00c9'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:49.056860+00:00', 'id': '1ef55f2a-3d07-647e-8002-b5e4d28c00c9', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='922124bd-d3b0-4929-a996-a75d842b8b44', tool_call_id='call_TvPLLyhuQQN99EcZc8SzL8x9')], 'tools': 'tools'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000004.07964a3a545f9ff95545db45a9753d11', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000004.', 'branch:agent:should_continue:tools': '00000000000000000000000000000004.', 'tools': '00000000000000000000000000000004.022986cd20ae85c77ea298a383f69ba8'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}, 'tools': {'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'tools': {'messages': [ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='922124bd-d3b0-4929-a996-a75d842b8b44', tool_call_id='call_TvPLLyhuQQN99EcZc8SzL8x9')]}}, 'step': 2}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3cf9-6996-8001-88dab066840d'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-3cf9-6996-8001-88dab066840d'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:49.051234+00:00', 'id': '1ef55f2a-3cf9-6996-8001-88dab066840d', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})], 'agent': 'agent', 'branch:agent:should_continue:tools': 'agent'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000003.cc96d93b1afbd1b69d53851320670b97', 'start:agent': '00000000000000000000000000000003.', 'agent': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af', 'branch:agent:should_continue:tools': '00000000000000000000000000000003.065d90dd7f7cd091f0233855210bb2af'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'agent': {'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_48196bc67a', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-0d6fa3b4-cace-41a8-b025-d01d16f6bbe9-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TvPLLyhuQQN99EcZc8SzL8x9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})]}}, 'step': 1}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-36a6-6788-8000-9efe1769f8c1'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-36a6-6788-8000-9efe1769f8c1'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:48.388067+00:00', 'id': '1ef55f2a-36a6-6788-8000-9efe1769f8c1', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='5a106e79-a617-4707-839f-134d4e4b762a')], 'start:agent': '__start__'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.', 'messages': '00000000000000000000000000000002.a6994b785a651d88df51020401745af8', 'start:agent': '00000000000000000000000000000002.d6f25946c3108fc12f27abbcf9b4cedc'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'loop', 'writes': None, 'step': 0}, parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-36a3-6614-bfff-05dafa02b4d7'}}, pending_writes=None), CheckpointTuple(config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1ef55f2a-36a3-6614-bfff-05dafa02b4d7'}}, checkpoint={'v': 1, 'ts': '2024-08-09T01:56:48.386807+00:00', 'id': '1ef55f2a-36a3-6614-bfff-05dafa02b4d7', 'channel_values': {'messages': [], '__start__': {'messages': [['human', "what's the weather in nyc"]]}}, 'channel_versions': {'__start__': '00000000000000000000000000000001.0e148ae3debe753278387e84f786e863'}, 'versions_seen': {'__input__': {}}, 'pending_sends': [], 'current_tasks': {}}, metadata={'source': 'input', 'writes': {'messages': [['human', "what's the weather in nyc"]]}, 'step': -1}, parent_config=None, pending_writes=None)]