跳到内容

如何添加线程级持久性(函数式API)

先决条件

本指南假定您熟悉以下内容

许多AI应用程序需要内存来在同一个线程上的多次交互中共享上下文(例如,对话的多个轮次)。在LangGraph函数式API中,可以使用线程级持久性将这种内存添加到任何entrypoint()工作流中。

创建LangGraph工作流时,您可以使用检查点保存器(checkpointer)来使其结果持久化

  1. 创建一个检查点保存器实例

    import { MemorySaver } from "@langchain/langgraph";
    
    const checkpointer = new MemorySaver();
    
  2. checkpointer实例传递给entrypoint()包装函数

    import { entrypoint } from "@langchain/langgraph";
    const workflow = entrypoint({
      name: "workflow",
      checkpointer,
    }, async (inputs) => {
      ...
    });
    
  3. 从工作流中之前的执行中检索previous状态

    import { entrypoint, getPreviousState } from "@langchain/langgraph";
    
    const workflow = entrypoint({
      name: "workflow",
      checkpointer,
    }, async (inputs) => {
      const previous = getPreviousState();
      const result = doSomething(previous, inputs);
      ...
    });
    
  4. 可选地选择工作流将返回哪些值,以及检查点保存器将哪些值保存为previous

    import { entrypoint, getPreviousState } from "@langchain/langgraph";
    
    const workflow = entrypoint({
      name: "workflow",
      checkpointer,
    }, async (inputs) => {
      const previous = getPreviousState();
      const result = doSomething(previous, inputs);
      ...
      return entrypoint.final({
        value: result,
        save: combineState(inputs, result),
      });
    });
    

本指南将展示如何向您的工作流添加线程级持久性。

注意

如果您需要跨多个对话或用户共享的内存(跨线程持久性),请查阅此操作指南

注意

如果您需要向StateGraph添加线程级持久性,请查阅此操作指南

设置

注意

本指南需要 @langchain/langgraph>=0.2.42

首先,安装本示例所需的依赖项

npm install @langchain/langgraph @langchain/anthropic @langchain/core zod

接下来,我们需要设置 Anthropic(我们将使用的 LLM)的 API 密钥

process.env.ANTHROPIC_API_KEY = "YOUR_API_KEY";

为 LangGraph 开发设置 LangSmith

注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控您使用LangGraph构建的LLM应用程序——在此了解更多如何开始

示例:带短期记忆的简单聊天机器人

我们将使用一个包含单个任务的工作流,该任务调用一个聊天模型

首先,让我们定义将要使用的模型

import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({
  model: "claude-3-5-sonnet-latest",
});

现在我们可以定义任务和工作流。要添加持久性,我们需要将一个Checkpointer(检查点保存器)传递给entrypoint()包装函数。

import type { BaseMessage, BaseMessageLike } from "@langchain/core/messages";
import {
  addMessages,
  entrypoint,
  task,
  getPreviousState,
  MemorySaver,
} from "@langchain/langgraph";

const callModel = task("callModel", async (messages: BaseMessageLike[]) => {
  const response = model.invoke(messages);
  return response;
});

const checkpointer = new MemorySaver();

const workflow = entrypoint({
  name: "workflow",
  checkpointer,
}, async (inputs: BaseMessageLike[]) => {
  const previous = getPreviousState<BaseMessage>() ?? [];
  const messages = addMessages(previous, inputs);
  const response = await callModel(messages);
  return entrypoint.final({
    value: response,
    save: addMessages(messages, response),
  });
});

如果我们尝试使用此工作流,对话的上下文将在多次交互中持久化。

注意

如果您正在使用LangGraph Cloud或LangGraph Studio,则无需将检查点保存器(checkpointer)传递给entrypoint包装器,因为它是自动完成的。

以下是其在实践中的工作方式

const config = {
  configurable: { thread_id: "1" },
  streamMode: "values" as const,
};
const inputMessage = { role: "user", content: "hi! I'm bob" };

const stream = await workflow.stream(
  [inputMessage],
  config,
);

for await (const chunk of stream) {
  console.log("=".repeat(30), `${chunk.getType()} message`, "=".repeat(30));
  console.log(chunk.content);
}
============================== ai message ==============================
Hi Bob! I'm Claude. Nice to meet you! How can I help you today?
您可以随时恢复先前的线程

const followupStream = await workflow.stream(
  [{ role: "user", content: "what's my name?" }], 
  config,
);

for await (const chunk of followupStream) {
  console.log("=".repeat(30), `${chunk.getType()} message`, "=".repeat(30));
  console.log(chunk.content);
}
============================== ai message ==============================
Your name is Bob - you just told me that in your first message.
如果我们想开始一个新的对话,可以传入一个不同的thread_id。噗!所有记忆都消失了!

const newStream = await workflow.stream(
  [{ role: "user", content: "what's my name?" }],
  {
    configurable: {
      thread_id: "2",
    },
    streamMode: "values",
  },
);

for await (const chunk of newStream) {
  console.log("=".repeat(30), `${chunk.getType()} message`, "=".repeat(30));
  console.log(chunk.content);
}
============================== ai message ==============================
I don't know your name as we just started chatting. Would you like to introduce yourself?

流式传输 token

如果您希望从聊天机器人流式传输LLM令牌,可以使用streamMode: "messages"。查阅此操作指南了解更多。