跳到内容

如何添加跨线程持久性(功能性 API)

前提条件

本指南假定您熟悉以下内容

LangGraph 允许您跨不同线程持久化数据。例如,您可以将有关用户的信息(他们的姓名或偏好)存储在共享(跨线程)内存中,并在新线程(例如,新的对话)中重复使用它们。

当使用功能性 API时,您可以设置它以通过使用 Store 接口来存储和检索记忆

  1. 创建一个 Store 的实例

    import { InMemoryStore } from "@langchain/langgraph";
    
    const store = new InMemoryStore();
    
  2. store 实例传递给 entrypoint() 包装函数。它将作为 config.store 传递给工作流。

    import { entrypoint } from "@langchain/langgraph";
    
    const workflow = entrypoint({
      store,
      name: "myWorkflow",
    }, async (input, config) => {
      const foo = await myTask({input, store: config.store});
      ...
    });
    

在本指南中,我们将展示如何构建和使用一个工作流,该工作流具有使用 Store 接口实现的共享内存。

注意

如果您需要向 StateGraph 添加跨线程持久性,请查看此操作指南

设置

注意

本指南需要 @langchain/langgraph>=0.2.42

首先,安装此示例所需的依赖项

npm install @langchain/langgraph @langchain/openai @langchain/anthropic @langchain/core uuid

接下来,我们需要为 Anthropic 和 OpenAI 设置 API 密钥(我们将使用的 LLM 和嵌入)

process.env.OPENAI_API_KEY = "YOUR_API_KEY";
process.env.ANTHROPIC_API_KEY = "YOUR_API_KEY";

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 在此处阅读有关如何开始使用的更多信息 here

示例:具有长期记忆的简单聊天机器人

定义存储

在此示例中,我们将创建一个工作流,该工作流能够检索有关用户偏好的信息。我们将通过定义 InMemoryStore 来实现这一点 - InMemoryStore 是一个可以在内存中存储数据并查询该数据的对象。

当使用 Store 接口存储对象时,您需要定义两件事

  • 对象的命名空间,一个元组(类似于目录)
  • 对象键(类似于文件名)

在我们的示例中,我们将使用 ["memories", <user_id>] 作为命名空间,并为每个新记忆使用随机 UUID 作为键。

让我们首先定义我们的存储

import { InMemoryStore } from "@langchain/langgraph";
import { OpenAIEmbeddings } from "@langchain/openai";

const inMemoryStore = new InMemoryStore({
  index: {
    embeddings: new OpenAIEmbeddings({
      model: "text-embedding-3-small",
    }),
    dims: 1536,
  },
});

创建工作流

现在让我们创建我们的工作流

import { v4 } from "uuid";
import { ChatAnthropic } from "@langchain/anthropic";
import {
  entrypoint,
  task,
  MemorySaver,
  addMessages,
  type BaseStore,
  getStore,
} from "@langchain/langgraph";
import type { BaseMessage, BaseMessageLike } from "@langchain/core/messages";

const model = new ChatAnthropic({
  model: "claude-3-5-sonnet-latest",
});

const callModel = task("callModel", async (
  messages: BaseMessage[],
  memoryStore: BaseStore,
  userId: string
) => {
  const namespace = ["memories", userId];
  const lastMessage = messages.at(-1);
  if (typeof lastMessage?.content !== "string") {
    throw new Error("Received non-string message content.");
  }
  const memories = await memoryStore.search(namespace, {
    query: lastMessage.content,
  });
  const info = memories.map((memory) => memory.value.data).join("\n");
  const systemMessage = `You are a helpful assistant talking to the user. User info: ${info}`;

  // Store new memories if the user asks the model to remember
  if (lastMessage.content.toLowerCase().includes("remember")) {
    // Hard-coded for demo
    const memory = `Username is Bob`;
    await memoryStore.put(namespace, v4(), { data: memory });
  }
  const response = await model.invoke([
    {
      role: "system",
      content: systemMessage 
    },
    ...messages
  ]);
  return response;
});

// NOTE: we're passing the store object here when creating a workflow via entrypoint()
const workflow = entrypoint({
  checkpointer: new MemorySaver(),
  store: inMemoryStore,
  name: "workflow",
}, async (params: {
  messages: BaseMessageLike[];
  userId: string;
}, config) => {
  const messages = addMessages([], params.messages)
  const response = await callModel(messages, config.store, params.userId);
  return entrypoint.final({
    value: response,
    save: addMessages(messages, response),
  });
});

当前的存储作为 entrypoint 的第二个参数的一部分传入,即 config.store

注意

如果您正在使用 LangGraph Cloud 或 LangGraph Studio,则无需将 store 传递到 entrypoint 中,因为它会自动完成。

运行工作流!

现在让我们在配置中指定一个用户 ID,并告诉模型我们的名字

const config = {
  configurable: {
    thread_id: "1",
  },
  streamMode: "values" as const,
};

const inputMessage = {
  role: "user",
  content: "Hi! Remember: my name is Bob",
};

const stream = await workflow.stream({ messages: [inputMessage], userId: "1" }, config);

for await (const chunk of stream) {
  console.log(chunk);
}
AIMessage {
  "id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
  "content": "Hi Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?",
  "additional_kwargs": {
    "id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
    "type": "message",
    "role": "assistant",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 28,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 27
    }
  },
  "response_metadata": {
    "id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 28,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 27
    },
    "type": "message",
    "role": "assistant"
  },
  "tool_calls": [],
  "invalid_tool_calls": [],
  "usage_metadata": {
    "input_tokens": 28,
    "output_tokens": 27,
    "total_tokens": 55,
    "input_token_details": {
      "cache_creation": 0,
      "cache_read": 0
    }
  }
}

const config2 = {
  configurable: {
    thread_id: "2",
  },
  streamMode: "values" as const,
};

const followupStream = await workflow.stream({
  messages: [{
    role: "user",
    content: "what is my name?",
  }],
  userId: "1"
}, config2);

for await (const chunk of followupStream) {
  console.log(chunk);
}
AIMessage {
  "id": "msg_01LB4YapkFawBUbpiu3oeWbF",
  "content": "Your name is Bob.",
  "additional_kwargs": {
    "id": "msg_01LB4YapkFawBUbpiu3oeWbF",
    "type": "message",
    "role": "assistant",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 28,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 8
    }
  },
  "response_metadata": {
    "id": "msg_01LB4YapkFawBUbpiu3oeWbF",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 28,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 8
    },
    "type": "message",
    "role": "assistant"
  },
  "tool_calls": [],
  "invalid_tool_calls": [],
  "usage_metadata": {
    "input_tokens": 28,
    "output_tokens": 8,
    "total_tokens": 36,
    "input_token_details": {
      "cache_creation": 0,
      "cache_read": 0
    }
  }
}
现在我们可以检查我们的内存存储,并验证我们实际上已经保存了用户的记忆

const memories = await inMemoryStore.search(["memories", "1"]);
for (const memory of memories) {
  console.log(memory.value);
}
{ data: 'Username is Bob' }
现在让我们为另一个用户运行工作流,以验证关于第一个用户的记忆是自包含的

const config3 = {
  configurable: {
    thread_id: "3",
  },
  streamMode: "values" as const,
};

const otherUserStream = await workflow.stream({
  messages: [{
    role: "user",
    content: "what is my name?",
  }],
  userId: "2"
}, config3);

for await (const chunk of otherUserStream) {
  console.log(chunk);
}
AIMessage {
  "id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
  "content": "I don't have any information about your name. While I aim to be helpful, I can only know what you directly tell me during our conversation.",
  "additional_kwargs": {
    "id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
    "type": "message",
    "role": "assistant",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 25,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 33
    }
  },
  "response_metadata": {
    "id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
    "model": "claude-3-5-sonnet-20241022",
    "stop_reason": "end_turn",
    "stop_sequence": null,
    "usage": {
      "input_tokens": 25,
      "cache_creation_input_tokens": 0,
      "cache_read_input_tokens": 0,
      "output_tokens": 33
    },
    "type": "message",
    "role": "assistant"
  },
  "tool_calls": [],
  "invalid_tool_calls": [],
  "usage_metadata": {
    "input_tokens": 25,
    "output_tokens": 33,
    "total_tokens": 58,
    "input_token_details": {
      "cache_creation": 0,
      "cache_read": 0
    }
  }
}