持久性¶
LangGraph 有一个内置的持久化层,通过检查点(checkpointer)实现。当您使用检查点编译一个图时,检查点会在每个超级步骤(super-step)保存图状态的checkpoint
。这些检查点被保存到一个thread
(线程)中,可以在图执行后访问。因为threads
允许在执行后访问图的状态,所以可以实现诸如人在环路、记忆、时间旅行和容错等多种强大功能。下面,我们将更详细地讨论这些概念。
LangGraph API 自动处理检查点
使用 LangGraph API 时,您无需手动实现或配置检查点。API 在后台为您处理所有持久化基础架构。
线程¶
线程(thread)是由检查点保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当一次运行被执行时,助手的底层图的状态将被持久化到该线程中。
当使用检查点调用一个图时,您必须在配置的configurable
部分指定一个thread_id
。
可以检索一个线程的当前和历史状态。为了持久化状态,必须在执行一次运行之前创建一个线程。LangGraph 平台 API 提供了几个用于创建和管理线程及线程状态的端点。更多详情请参见API 参考。
检查点¶
一个线程在特定时间点的状态称为检查点(checkpoint)。检查点是在每个超级步骤保存的图状态的快照,由一个StateSnapshot
对象表示,具有以下关键属性:
config
: 与此检查点关联的配置。metadata
: 与此检查点关联的元数据。values
: 在此时间点的状态通道的值。next
: 一个元组,包含图中接下来要执行的节点名称。tasks
: 一个PregelTask
对象的元组,包含有关接下来要执行的任务的信息。如果该步骤之前尝试过,它将包含错误信息。如果图在节点内部被动态中断,任务将包含与中断相关的额外数据。
检查点是持久化的,可以用来在稍后恢复线程的状态。
让我们看看当一个简单的图被如下调用时,会保存哪些检查点:
API 参考:StateGraph | START | END | InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: str
bar: Annotated[list[str], add]
def node_a(state: State):
return {"foo": "a", "bar": ["a"]}
def node_b(state: State):
return {"foo": "b", "bar": ["b"]}
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
在我们运行图之后,我们期望看到正好 4 个检查点:
- 一个空的检查点,以
START
作为下一个要执行的节点。 - 一个包含用户输入
{'foo': '', 'bar': []}
的检查点,以node_a
作为下一个要执行的节点。 - 一个包含
node_a
输出{'foo': 'a', 'bar': ['a']}
的检查点,以node_b
作为下一个要执行的节点。 - 一个包含
node_b
输出{'foo': 'b', 'bar': ['a', 'b']}
的检查点,没有下一个要执行的节点。
请注意,bar
通道的值包含了两个节点的输出,因为我们为bar
通道定义了一个归约器(reducer)。
获取状态¶
当与保存的图状态进行交互时,您必须指定一个线程标识符。您可以通过调用graph.get_state(config)
来查看图的最新状态。这将返回一个StateSnapshot
对象,它对应于配置中提供的线程 ID 的最新检查点,或者(如果提供了)对应于该线程的某个检查点 ID 的检查点。
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的例子中,get_state
的输出将如下所示:
StateSnapshot(
values={'foo': 'b', 'bar': ['a', 'b']},
next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
created_at='2024-08-29T19:19:38.821749+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)
获取状态历史¶
您可以通过调用graph.get_state_history(config)
来获取给定线程的完整图执行历史。这将返回一个与配置中提供的线程 ID 相关联的StateSnapshot
对象列表。重要的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot
位于列表的第一个。
在我们的例子中,get_state_history
的输出将如下所示:
[
StateSnapshot(
values={'foo': 'b', 'bar': ['a', 'b']},
next=(),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
created_at='2024-08-29T19:19:38.821749+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
tasks=(),
),
StateSnapshot(
values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
created_at='2024-08-29T19:19:38.819946+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
),
StateSnapshot(
values={'foo': '', 'bar': []},
next=('node_a',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
metadata={'source': 'loop', 'writes': None, 'step': 0},
created_at='2024-08-29T19:19:38.817813+00:00',
parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
),
StateSnapshot(
values={'bar': []},
next=('__start__',),
config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
created_at='2024-08-29T19:19:38.816205+00:00',
parent_config=None,
tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
)
]
重放¶
也可以回放之前的图执行。如果我们使用一个thread_id
和一个checkpoint_id
来invoke
一个图,那么我们将重放与checkpoint_id
对应的检查点之前已执行的步骤,而只执行该检查点之后的步骤。
thread_id
是线程的 ID。checkpoint_id
是引用线程内特定检查点的标识符。
您必须在调用图时将这些作为配置的configurable
部分传递。
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph 知道某个特定步骤之前是否已执行过。如果是,LangGraph 只是重放图中该特定步骤,而不会重新执行它,但这仅限于所提供的checkpoint_id
之前的步骤。所有在checkpoint_id
之后的步骤都将被执行(即一个新的分支),即使它们之前已经执行过。请参阅关于时间旅行的这份操作指南,以了解更多关于重放的信息。
更新状态¶
除了从特定的checkpoints
重放图之外,我们还可以编辑图状态。我们使用graph.update_state()
来做到这一点。该方法接受三个不同的参数:
config
¶
配置应包含thread_id
,指定要更新哪个线程。当只传递thread_id
时,我们更新(或分叉)当前状态。或者,如果我们包含checkpoint_id
字段,那么我们将分叉该选定的检查点。
values
¶
这些是用于更新状态的值。请注意,此更新的处理方式与来自节点的任何更新完全相同。这意味着这些值将被传递给归约器函数(如果为图状态中的某些通道定义了归约器)。这意味着update_state
不会自动覆盖每个通道的值,而只会覆盖没有归约器的通道。让我们看一个例子。
假设您使用以下模式定义了图的状态(参见上面的完整示例):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
现在假设图的当前状态是:
如果您如下更新状态:
那么图的新状态将是:
foo
键(通道)被完全更改(因为没有为该通道指定归约器,所以update_state
会覆盖它)。然而,为bar
键指定了一个归约器,所以它将"b"
附加到bar
的状态中。
as_node
¶
在调用update_state
时,您可以选择性地指定最后一个参数是as_node
。如果您提供了它,更新将像来自节点as_node
一样被应用。如果未提供as_node
,它将被设置为最后更新状态的节点(如果不模糊)。这之所以重要,是因为下一步要执行的步骤取决于最后给出更新的节点,因此这可以用来控制接下来执行哪个节点。请参阅关于分叉状态的这份操作指南,以了解更多信息。
记忆存储¶
一个状态模式指定了一组在图执行过程中被填充的键。如上所述,状态可以由检查点在每个图步骤写入线程,从而实现状态持久化。
但是,如果我们想在多个线程之间保留一些信息怎么办?考虑一个聊天机器人的案例,我们希望在与该用户的所有聊天对话(即线程)中保留关于该用户的特定信息!
仅使用检查点,我们无法在线程之间共享信息。这就需要Store
接口。作为示例,我们可以定义一个InMemoryStore
来跨线程存储关于用户的信息。我们像以前一样用检查点编译我们的图,并使用我们新的in_memory_store
变量。
LangGraph API 自动处理存储
当使用 LangGraph API 时,您无需手动实现或配置存储。API 在后台为您处理所有存储基础架构。
基本用法¶
首先,让我们在不使用 LangGraph 的情况下单独展示这一点。
记忆由一个tuple
进行命名空间划分,在这个特定示例中是(<user_id>, "memories")
。命名空间可以是任何长度,可以代表任何东西,不一定是特定于用户的。
我们使用store.put
方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定如上定义的命名空间,以及记忆的键值对:键是记忆的唯一标识符(memory_id
),值(一个字典)是记忆本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用store.search
方法读出我们命名空间中的记忆,它将返回给定用户的所有记忆作为一个列表。最新的记忆是列表中的最后一个。
memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
'namespace': ['1', 'memories'],
'created_at': '2024-10-02T17:22:31.590602+00:00',
'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种记忆类型都是一个 Python 类 (Item
),具有某些属性。我们可以通过.dict
转换将其作为字典访问,如上所示。
它具有的属性是:
value
: 该记忆的值(本身是一个字典)key
: 该记忆在此命名空间中的唯一键namespace
: 一个字符串列表,该记忆类型的命名空间created_at
: 该记忆被创建时的时间戳updated_at
: 该记忆被更新时的时间戳
语义搜索¶
除了简单的检索,存储还支持语义搜索,允许您根据含义而不是精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置存储:
API 参考:init_embeddings
from langchain.embeddings import init_embeddings
store = InMemoryStore(
index={
"embed": init_embeddings("openai:text-embedding-3-small"), # Embedding provider
"dims": 1536, # Embedding dimensions
"fields": ["food_preference", "$"] # Fields to embed
}
)
现在搜索时,您可以使用自然语言查询来查找相关的记忆:
# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
namespace_for_memory,
query="What does the user like to eat?",
limit=3 # Return top 3 matches
)
您可以通过配置fields
参数或在存储记忆时指定index
参数来控制记忆的哪些部分被嵌入。
# Store with specific fields to embed
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{
"food_preference": "I love Italian cuisine",
"context": "Discussing dinner plans"
},
index=["food_preference"] # Only embed "food_preferences" field
)
# Store without embedding (still retrievable, but not searchable)
store.put(
namespace_for_memory,
str(uuid.uuid4()),
{"system_info": "Last updated: 2024-01-01"},
index=False
)
在 LangGraph 中使用¶
有了这些,我们就可以在 LangGraph 中使用in_memory_store
了。in_memory_store
与检查点协同工作:检查点将状态保存到线程中(如上所述),而in_memory_store
允许我们存储任意信息以便在多个线程之间访问。我们用检查点和in_memory_store
两者来编译图,如下所示。
API 参考:InMemorySaver
from langgraph.checkpoint.memory import InMemorySaver
# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()
# ... Define the graph ...
# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
我们像以前一样用thread_id
调用图,同时也用user_id
,我们将用它来将我们的记忆命名空间限定到这个特定用户,如上所示。
# Invoke the graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}
# First let's just say hi to the AI
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
print(update)
我们可以通过将store: BaseStore
和config: RunnableConfig
作为节点参数,在任何节点中访问in_memory_store
和user_id
。以下是我们如何在节点中使用语义搜索来查找相关记忆:
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
# Get the user id from the config
user_id = config["configurable"]["user_id"]
# Namespace the memory
namespace = (user_id, "memories")
# ... Analyze conversation and create a new memory
# Create a new memory ID
memory_id = str(uuid.uuid4())
# We create a new memory
store.put(namespace, memory_id, {"memory": memory})
如上所示,我们也可以在任何节点中访问存储并使用store.search
方法来获取记忆。回想一下,记忆是作为可以转换为字典的对象列表返回的。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
'namespace': ['1', 'memories'],
'created_at': '2024-10-02T17:22:31.590602+00:00',
'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问记忆并在我们的模型调用中使用它们。
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
# Get the user id from the config
user_id = config["configurable"]["user_id"]
# Namespace the memory
namespace = (user_id, "memories")
# Search based on the most recent message
memories = store.search(
namespace,
query=state["messages"][-1].content,
limit=3
)
info = "\n".join([d.value["memory"] for d in memories])
# ... Use memories in the model call
如果我们创建一个新线程,只要user_id
相同,我们仍然可以访问相同的记忆。
# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
# Let's say hi again
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
print(update)
当我们使用 LangGraph 平台时,无论是在本地(例如,在 LangGraph Studio 中)还是在 LangGraph 平台上,基础存储默认可用,无需在图编译期间指定。然而,要启用语义搜索,您确实需要在您的langgraph.json
文件中配置索引设置。例如:
{
...
"store": {
"index": {
"embed": "openai:text-embeddings-3-small",
"dims": 1536,
"fields": ["$"]
}
}
}
更多详情和配置选项,请参见部署指南。
检查点库¶
在底层,检查点功能由符合BaseCheckpointSaver接口的检查点对象提供支持。LangGraph 提供了几种检查点实现,都是通过独立、可安装的库实现的:
langgraph-checkpoint
: 检查点保存器(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)的基础接口。包括用于实验的内存检查点实现(InMemorySaver)。LangGraph 自带langgraph-checkpoint
。langgraph-checkpoint-sqlite
: 使用 SQLite 数据库的 LangGraph 检查点实现(SqliteSaver / AsyncSqliteSaver)。非常适合实验和本地工作流。需要单独安装。langgraph-checkpoint-postgres
: 使用 Postgres 数据库的高级检查点(PostgresSaver / AsyncPostgresSaver),在 LangGraph 平台中使用。非常适合在生产环境中使用。需要单独安装。
检查点接口¶
每个检查点都符合BaseCheckpointSaver接口,并实现以下方法:
.put
- 存储一个检查点及其配置和元数据。.put_writes
- 存储与一个检查点相关联的中间写入(即待定写入)。.get_tuple
- 使用给定的配置(thread_id
和checkpoint_id
)获取一个检查点元组。这用于在graph.get_state()
中填充StateSnapshot
。.list
- 列出匹配给定配置和筛选条件的检查点。这用于在graph.get_state_history()
中填充状态历史。
如果检查点用于异步图执行(即通过.ainvoke
、.astream
、.abatch
执行图),则将使用上述方法的异步版本(.aput
、.aput_writes
、.aget_tuple
、.alist
)。
注意
为了异步运行您的图,您可以使用InMemorySaver
,或 Sqlite/Postgres 检查点的异步版本——AsyncSqliteSaver
/ AsyncPostgresSaver
检查点。
序列化器¶
当检查点保存图状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。
langgraph_checkpoint
定义了实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),该实现处理多种类型,包括 LangChain 和 LangGraph 的原生类型、日期时间、枚举等。
使用pickle
进行序列化¶
默认的序列化器JsonPlusSerializer
在底层使用 ormsgpack 和 JSON,这不适用于所有类型的对象。
如果您想对我们 msgpack 编码器目前不支持的对象(如 Pandas 数据帧)回退到使用 pickle,您可以使用JsonPlusSerializer
的pickle_fallback
参数。
API 参考:InMemorySaver | JsonPlusSerializer
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
# ... Define the graph ...
graph.compile(
checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)
加密¶
检查点可以有选择地加密所有持久化的状态。要启用此功能,请将EncryptedSerializer
的实例传递给任何BaseCheckpointSaver
实现的serde
参数。创建加密序列化器的最简单方法是通过from_pycryptodome_aes
,它从LANGGRAPH_AES_KEY
环境变量中读取 AES 密钥(或接受一个key
参数)。
API 参考:SqliteSaver
import sqlite3
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver
serde = EncryptedSerializer.from_pycryptodome_aes() # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
API 参考:PostgresSaver
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver
serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
在 LangGraph 平台上运行时,只要存在LANGGRAPH_AES_KEY
,加密就会自动启用,因此您只需提供环境变量即可。可以通过实现CipherProtocol
并将其提供给EncryptedSerializer
来使用其他加密方案。
功能¶
人机协作 (Human-in-the-loop)¶
首先,检查点通过允许人类检查、中断和批准图步骤,促进了人在环路的工作流。这些工作流需要检查点,因为人类必须能够随时查看图的状态,并且图必须能够在人类对状态进行任何更新后恢复执行。请参阅操作指南中的示例。
内存¶
其次,检查点允许在交互之间建立“记忆”。在重复的人类交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留其先前交互的记忆。有关如何使用检查点添加和管理对话记忆的信息,请参见添加记忆。
时光穿梭¶
第三,检查点允许“时间旅行”,允许用户重放先前的图执行,以审查和/或调试特定的图步骤。此外,检查点还可以在任意检查点分叉图状态,以探索不同的轨迹。
容错¶
最后,检查点还提供了容错和错误恢复功能:如果一个或多个节点在给定的超级步骤中失败,您可以从最后一个成功的步骤重新启动您的图。此外,当一个图节点在给定的超级步骤中执行失败时,LangGraph 会存储在该超级步骤中成功完成的任何其他节点的待定检查点写入,这样每当我们从该超级步骤恢复图执行时,我们就不必重新运行成功的节点。
待定写入¶
此外,当一个图节点在给定的超级步骤中执行失败时,LangGraph 会存储在该超级步骤中成功完成的任何其他节点的待定检查点写入,这样每当我们从该超级步骤恢复图执行时,我们就不必重新运行成功的节点。