跳到内容

持久性

LangGraph 具有内置的持久性层,通过检查点程序实现。当您使用检查点程序编译图时,检查点程序会在每个超步保存图状态的checkpoint(检查点)。这些检查点保存到thread(线程)中,可以在图执行后访问。由于threads(线程)允许在执行后访问图的状态,因此可以实现包括人机环路、内存、时间旅行和容错在内的多种强大功能。有关如何向图添加和使用检查点程序的端到端示例,请参阅此操作指南。下面,我们将更详细地讨论这些概念。

Checkpoints

线程

线程是分配给检查点程序保存的每个检查点的唯一 ID 或 线程标识符。当使用检查点程序调用图时,您必须在配置的 configurable 部分中指定 thread_id

{"configurable": {"thread_id": "1"}}

检查点

检查点是在每个超步保存的图状态的快照,由具有以下关键属性的 StateSnapshot 对象表示

  • config:与此检查点关联的配置。
  • metadata:与此检查点关联的元数据。
  • values:此时刻状态通道的值。
  • next:图中接下来要执行的节点名称的元组。
  • tasks:包含有关要执行的下一个任务信息的 PregelTask 对象元组。如果之前尝试过该步骤,它将包含错误信息。如果图在节点内动态中断,则任务将包含与中断关联的其他数据。

让我们看看当一个简单的图按如下方式调用时,会保存哪些检查点

import { StateGraph, START, END, MemorySaver, Annotation } from "@langchain/langgraph";

const GraphAnnotation = Annotation.Root({
  foo: Annotation<string>
  bar: Annotation<string[]>({
    reducer: (a, b) => [...a, ...b],
    default: () => [],
  })
});

function nodeA(state: typeof GraphAnnotation.State) {
  return { foo: "a", bar: ["a"] };
}

function nodeB(state: typeof GraphAnnotation.State) {
  return { foo: "b", bar: ["b"] };
}

const workflow = new StateGraph(GraphAnnotation);
  .addNode("nodeA", nodeA)
  .addNode("nodeB", nodeB)
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);

在运行图后,我们预计会看到正好 4 个检查点

  • 带有 START 作为要执行的下一个节点的空检查点
  • 带有用户输入 {foo: '', bar: []}nodeA 作为要执行的下一个节点的检查点
  • 带有 nodeA 的输出 {foo: 'a', bar: ['a']}nodeB 作为要执行的下一个节点的检查点
  • 带有 nodeB 的输出 {foo: 'b', bar: ['a', 'b']} 且没有要执行的下一个节点的检查点

请注意,我们的 bar 通道值包含来自两个节点的输出,因为我们为 bar 通道设置了 reducer。

获取状态

当与保存的图状态交互时,您必须指定一个 线程标识符。您可以通过调用 await graph.getState(config) 来查看图的最新状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点,或者如果提供了检查点 ID,则对应于与线程的检查点 ID 关联的检查点。

// Get the latest state snapshot
const config = { configurable: { thread_id: "1" } };
const state = await graph.getState(config);

// Get a state snapshot for a specific checkpoint_id
const configWithCheckpoint = { configurable: { thread_id: "1", checkpoint_id: "1ef663ba-28fe-6528-8002-5a559208592c" } };
const stateWithCheckpoint = await graph.getState(configWithCheckpoint);

在我们的示例中,getState 的输出将如下所示

{
  values: { foo: 'b', bar: ['a', 'b'] },
  next: [],
  config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c' } },
  metadata: { source: 'loop', writes: { nodeB: { foo: 'b', bar: ['b'] } }, step: 2 },
  created_at: '2024-08-29T19:19:38.821749+00:00',
  parent_config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8' } },
  tasks: []
}

获取状态历史记录

您可以通过调用 await graph.getStateHistory(config) 来获取给定线程的图执行的完整历史记录。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot 是列表中的第一个。

const config = { configurable: { thread_id: "1" } };
const history = await graph.getStateHistory(config);

在我们的示例中,getStateHistory 的输出将如下所示

[
  {
    values: { foo: 'b', bar: ['a', 'b'] },
    next: [],
    config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c' } },
    metadata: { source: 'loop', writes: { nodeB: { foo: 'b', bar: ['b'] } }, step: 2 },
    created_at: '2024-08-29T19:19:38.821749+00:00',
    parent_config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8' } },
    tasks: [],
  },
  {
    values: { foo: 'a', bar: ['a'] },
    next: ['nodeB'],
    config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8' } },
    metadata: { source: 'loop', writes: { nodeA: { foo: 'a', bar: ['a'] } }, step: 1 },
    created_at: '2024-08-29T19:19:38.819946+00:00',
    parent_config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a' } },
    tasks: [{ id: '6fb7314f-f114-5413-a1f3-d37dfe98ff44', name: 'nodeB', error: null, interrupts: [] }],
  },
  // ... (other checkpoints)
]

State

重放

也可以回放之前的图执行。如果我们使用 thread_idcheckpoint_id invoking 图,那么我们将从与 checkpoint_id 对应的检查点重放图。

  • thread_id 只是线程的 ID。这是始终必需的。
  • checkpoint_id 此标识符指的是线程内的特定检查点。

您必须在调用图时将这些作为配置的 configurable 部分传递

// { configurable: { thread_id: "1" } }  // valid config
// { configurable: { thread_id: "1", checkpoint_id: "0c62ca34-ac19-445d-bbb0-5b4984975b2a" } }  // also valid config

const config = { configurable: { thread_id: "1" } };
await graph.invoke(inputs, config);

重要的是,LangGraph 知道特定的检查点是否之前已执行过。如果已执行过,LangGraph 只是重放图中的特定步骤,而不会重新执行该步骤。有关重放的更多信息,请参阅关于时间旅行的操作指南

Replay

更新状态

除了从特定 checkpoints(检查点)重放图之外,我们还可以编辑图状态。我们使用 graph.updateState() 来执行此操作。此方法有三个不同的参数

config

配置应包含 thread_id,用于指定要更新的线程。当仅传递 thread_id 时,我们更新(或派生)当前状态。或者,如果我们包含 checkpoint_id 字段,则我们派生所选的检查点。

values

这些是将用于更新状态的值。请注意,此更新的处理方式与来自节点的任何更新完全相同。这意味着这些值将传递给 reducer 函数(如果它们为图状态中的某些通道定义了)。这意味着 updateState 不会自动覆盖每个通道的通道值,而仅覆盖没有 reducer 的通道。让我们来看一个例子。

假设您已使用以下架构定义了图的状态(请参阅上面的完整示例)

import { Annotation } from "@langchain/langgraph";

const GraphAnnotation = Annotation.Root({
  foo: Annotation<string>
  bar: Annotation<string[]>({
    reducer: (a, b) => [...a, ...b],
    default: () => [],
  })
});

现在假设图的当前状态是

{ foo: "1", bar: ["a"] }

如果您按如下方式更新状态

await graph.updateState(config, { foo: "2", bar: ["b"] });

那么图的新状态将是

{ foo: "2", bar: ["a", "b"] }

foo 键(通道)已完全更改(因为没有为该通道指定 reducer,因此 updateState 会覆盖它)。但是,为 bar 键指定了 reducer,因此它会将 "b" 附加到 bar 的状态。

作为节点

当调用 updateState 时,您可以选择性地指定的最后一个参数是第三个位置参数 asNode。如果提供了它,则更新将像来自节点 asNode 一样应用。如果未提供 asNode,则它将设置为上次更新状态的节点(如果不是模棱两可的话)。这很重要,原因是接下来要执行的步骤取决于给出更新的最后一个节点,因此这可以用于控制接下来执行哪个节点。有关派生状态的更多信息,请参阅关于时间旅行的操作指南

Update

内存存储

Update

状态架构指定在执行图时填充的一组键。如上所述,状态可以由检查点程序在每个图步骤写入线程,从而实现状态持久性。

但是,如果我们想要跨线程保留某些信息怎么办?考虑一下聊天机器人的情况,我们希望在与该用户的所有聊天对话(例如,线程)中保留有关该用户的特定信息!

仅使用检查点程序,我们无法跨线程共享信息。这促使我们需要 Store 接口。为了说明这一点,我们可以定义一个 InMemoryStore 来存储有关用户的跨线程信息。首先,让我们在不使用 LangGraph 的情况下单独展示这一点。

import { InMemoryStore } from "@langchain/langgraph";

const inMemoryStore = new InMemoryStore();

内存通过 tuple(元组)命名空间,在本例中,它将是 [<user_id>, "memories"]。命名空间的长度可以是任意长度,并且可以表示任何内容,不一定必须是用户特定的。

const userId = "1";
const namespaceForMemory = [userId, "memories"];

我们使用 store.put 方法将内存保存到存储中的命名空间。当我们这样做时,我们指定命名空间(如上定义)以及内存的键值对:键只是内存的唯一标识符 (memoryId),值(对象)是内存本身。

import { v4 as uuid4 } from 'uuid';

const memoryId = uuid4();
const memory = { food_preference: "I like pizza" };
await inMemoryStore.put(namespaceForMemory, memoryId, memory);

我们可以使用 store.search 读取命名空间中的内存,这将返回给定用户的所有内存作为列表。最新的内存是列表中的最后一个。

const memories = await inMemoryStore.search(namespaceForMemory);
console.log(memories.at(-1));

/*
  {
    'value': {'food_preference': 'I like pizza'},
    'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
    'namespace': ['1', 'memories'],
    'created_at': '2024-10-02T17:22:31.590602+00:00',
    'updated_at': '2024-10-02T17:22:31.590605+00:00'
  }
*/

检索到的内存具有的属性是

  • value:此内存的值(本身是一个字典)
  • key:此命名空间中此内存的 UUID
  • namespace:字符串列表,此内存类型的命名空间
  • created_at:创建此内存的时间戳
  • updated_at:更新此内存的时间戳

有了这一切,我们在 LangGraph 中使用了 inMemoryStoreinMemoryStore 与检查点程序协同工作:检查点程序将状态保存到线程(如上所述),而 inMemoryStore 允许我们存储任意信息以供跨线程访问。我们使用检查点程序和 inMemoryStore 编译图,如下所示。

import { MemorySaver } from "@langchain/langgraph";

// We need this because we want to enable threads (conversations)
const checkpointer = new MemorySaver();

// ... Define the graph ...

// Compile the graph with the checkpointer and store
const graph = builder.compile({
  checkpointer,
  store: inMemoryStore
});

我们像以前一样使用 thread_id 调用图,并使用 user_id,我们将使用它来将内存命名空间到该特定用户,如上所示。

// Invoke the graph
const user_id = "1";
const config = { configurable: { thread_id: "1", user_id } };

// First let's just say hi to the AI
const stream = await graph.stream(
  { messages: [{ role: "user", content: "hi" }] },
  { ...config, streamMode: "updates" },
);

for await (const update of stream) {
  console.log(update);
}

我们可以通过传递 config: LangGraphRunnableConfig 作为节点参数来访问任何节点中的 inMemoryStoreuser_id。然后,正如我们上面看到的,只需使用 put 方法将内存保存到存储中。

import {
  type LangGraphRunnableConfig,
  MessagesAnnotation,
} from "@langchain/langgraph";

const updateMemory = async (
  state: typeof MessagesAnnotation.State,
  config: LangGraphRunnableConfig
) => {
  // Get the store instance from the config
  const store = config.store;

  // Get the user id from the config
  const userId = config.configurable.user_id;

  // Namespace the memory
  const namespace = [userId, "memories"];

  // ... Analyze conversation and create a new memory

  // Create a new memory ID
  const memoryId = uuid4();

  // We create a new memory
  await store.put(namespace, memoryId, { memory });
};

正如我们上面展示的,我们也可以访问任何节点中的存储并使用 search 来获取内存。回想一下,内存作为对象列表返回,可以转换为字典。

const memories = inMemoryStore.search(namespaceForMemory);
console.log(memories.at(-1));

/*
  {
    'value': {'food_preference': 'I like pizza'},
    'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
    'namespace': ['1', 'memories'],
    'created_at': '2024-10-02T17:22:31.590602+00:00',
    'updated_at': '2024-10-02T17:22:31.590605+00:00'
  }
*/

我们可以访问内存并在我们的模型调用中使用它们。

const callModel = async (
  state: typeof StateAnnotation.State,
  config: LangGraphRunnableConfig
) => {
  const store = config.store;

  // Get the user id from the config
  const userId = config.configurable.user_id;

  // Get the memories for the user from the store
  const memories = await store.search([userId, "memories"]);
  const info = memories.map((memory) => {
    return JSON.stringify(memory.value);
  }).join("\n");

  // ... Use memories in the model call
}

如果我们创建一个新线程,我们仍然可以访问相同的内存,只要 user_id 相同即可。

// Invoke the graph
const config = { configurable: { thread_id: "2", user_id: "1" } };

// Let's say hi again
const stream = await graph.stream(
  { messages: [{ role: "user", content: "hi, tell me about my memories" }] },
  { ...config, streamMode: "updates" },
);

for await (const update of stream) {
  console.log(update);
}

当我们使用 LangGraph API 时,无论是在本地(例如,在 LangGraph Studio 中)还是使用 LangGraph Cloud,默认情况下内存存储都可用,并且在图编译期间无需指定。

检查点库

在底层,检查点由符合 BaseCheckpointSaver 接口的检查点对象提供支持。 LangGraph 提供了多种检查点程序实现,所有实现都通过独立的、可安装的库实现

  • @langchain/langgraph-checkpoint:检查点程序保存器的基本接口 (BaseCheckpointSaver) 和序列化/反序列化接口 (SerializerProtocol)。包括用于实验的内存检查点程序实现 (MemorySaver)。 LangGraph 随附 @langchain/langgraph-checkpoint
  • @langchain/langgraph-checkpoint-sqlite:LangGraph 检查点程序的实现,它使用 SQLite 数据库 (SqliteSaver)。非常适合实验和本地工作流程。需要单独安装。
  • @langchain/langgraph-checkpoint-postgres:高级检查点程序,它使用 Postgres 数据库 (PostgresSaver),在 LangGraph Cloud 中使用。非常适合在生产中使用。需要单独安装。

检查点接口

每个检查点程序都符合 BaseCheckpointSaver 接口并实现以下方法

  • .put - 存储带有其配置和元数据的检查点。
  • .putWrites - 存储链接到检查点的中间写入(即 待处理写入)。
  • .getTuple - 使用给定的配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.getState() 中填充 StateSnapshot
  • .list - 列出与给定配置和筛选条件匹配的检查点。这用于在 graph.getStateHistory() 中填充状态历史记录

序列化器

当检查点程序保存图状态时,它们需要序列化状态中的通道值。这是使用序列化器对象完成的。 @langchain/langgraph-checkpoint 定义了一个 协议,用于实现序列化器和一个默认实现,该默认实现处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

功能

人机环路

首先,检查点程序通过允许人类检查、中断和批准图步骤来促进人机环路工作流程。这些工作流程需要检查点程序,因为人类必须能够查看图在任何时间点的状态,并且图必须能够在人类对状态进行任何更新后恢复执行。有关具体示例,请参阅这些操作指南

内存

其次,检查点程序允许交互之间存在“内存”。在重复的人工交互(如对话)的情况下,任何后续消息都可以发送到该线程,这将保留其先前消息的内存。有关如何使用检查点程序添加和管理对话内存的端到端示例,请参阅此操作指南

时间旅行

第三,检查点程序允许“时间旅行”,允许用户重放之前的图执行以查看和/或调试特定的图步骤。此外,检查点程序使在任意检查点派生图状态以探索替代轨迹成为可能。

容错

最后,检查点还提供容错和错误恢复:如果一个或多个节点在给定的超步中失败,您可以从上次成功的步骤重新启动图。此外,当图节点在给定的超步中执行期间失败时,LangGraph 会存储来自在该超步中成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步恢复图执行时,我们都不会重新运行成功的节点。

待处理写入

此外,当图节点在给定的超步中执行期间失败时,LangGraph 会存储来自在该超步中成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步恢复图执行时,我们都不会重新运行成功的节点。