跳到内容

如何为子图添加线程级持久性

先决条件

本指南假设您熟悉以下内容:

本指南展示了如何为使用子图的图添加线程级持久性。

设置

首先,让我们安装所需的包

$ npm install @langchain/langgraph @langchain/core

为 LangGraph 开发设置 LangSmith

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控您使用 LangGraph 构建的 LLM 应用程序——在此处阅读更多关于如何入门的信息这里

定义带持久性的图

要为包含子图的图添加持久性,您只需在**编译父图时**传入一个检查点保存器(checkpointer)。LangGraph 将自动将检查点保存器传播到子图。

注意

编译子图时**不应提供**检查点保存器。相反,您必须定义**单个**检查点保存器并将其传递给parentGraph.compile(),LangGraph 会自动将检查点保存器传播到子图。如果您将检查点保存器传递给subgraph.compile(),它将被简单地忽略。当您添加一个显式调用子图的节点时,这也适用。

让我们定义一个带有一个子图节点的简单图,以展示如何实现这一点。

import { StateGraph, Annotation } from "@langchain/langgraph";

// subgraph

const SubgraphStateAnnotation = Annotation.Root({
  foo: Annotation<string>,
  bar: Annotation<string>,
});

const subgraphNode1 = async (state: typeof SubgraphStateAnnotation.State) => {
  return { bar: "bar" };
};

const subgraphNode2 = async (state: typeof SubgraphStateAnnotation.State) => {
  // note that this node is using a state key ('bar') that is only available in the subgraph
  // and is sending update on the shared state key ('foo')
  return { foo: state.foo + state.bar };
};

const subgraph = new StateGraph(SubgraphStateAnnotation)
  .addNode("subgraphNode1", subgraphNode1)
  .addNode("subgraphNode2", subgraphNode2)
  .addEdge("__start__", "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2")
  .compile();

// parent graph
const StateAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const node1 = async (state: typeof StateAnnotation.State) => {
  return {
    foo: "hi! " + state.foo,
  };
};

const builder = new StateGraph(StateAnnotation)
  .addNode("node1", node1)
  // note that we're adding the compiled subgraph as a node to the parent graph
  .addNode("node2", subgraph)
  .addEdge("__start__", "node1")
  .addEdge("node1", "node2");

现在我们可以使用内存检查点保存器(MemorySaver)编译图。

import { MemorySaver } from "@langchain/langgraph-checkpoint";

const checkpointer = new MemorySaver();

// You must only pass checkpointer when compiling the parent graph.
// LangGraph will automatically propagate the checkpointer to the child subgraphs.

const graph = builder.compile({
  checkpointer: checkpointer
});

验证持久性是否有效

现在,让我们运行该图并检查父图和子图的持久化状态,以验证持久性是否有效。我们应该会看到父图和子图的最终执行结果都位于 state.values 中。

const config = { configurable: { thread_id: "1" } };

const stream = await graph.stream({
  foo: "foo"
}, {
  ...config,
  subgraphs: true,
});

for await (const [_source, chunk] of stream) {
  console.log(chunk);
}
{ node1: { foo: 'hi! foo' } }
{ subgraphNode1: { bar: 'bar' } }
{ subgraphNode2: { foo: 'hi! foobar' } }
{ node2: { foo: 'hi! foobar' } }
现在,我们可以通过使用与调用图相同的配置来调用 graph.get_state(),从而查看父图状态。

(await graph.getState(config)).values;
{ foo: 'hi! foobar' }
要查看子图状态,我们需要做两件事

  1. 找到子图最新的配置值
  2. 使用 graph.getState() 检索最新子图配置的值。

为了找到正确的配置,我们可以检查父图的状态历史记录,并找到从 node2(包含子图的节点)返回结果之前的状态快照。

let stateWithSubgraph;

const graphHistories = await graph.getStateHistory(config);

for await (const state of graphHistories) {
  if (state.next[0] === "node2") {
    stateWithSubgraph = state;
    break;
  }
}

状态快照将包含接下来要执行的 tasks 列表。当使用子图时,tasks 将包含可用于检索子图状态的配置。

const subgraphConfig = stateWithSubgraph.tasks[0].state;

console.log(subgraphConfig);
{
  configurable: {
    thread_id: '1',
    checkpoint_ns: 'node2:25814e09-45f0-5b70-a5b4-23b869d582c2'
  }
}

(await graph.getState(subgraphConfig)).values
{ foo: 'hi! foobar', bar: 'bar' }
如果您想了解更多关于如何修改子图状态以用于人机循环工作流的信息,请查阅这篇操作指南