跳到内容

如何添加跨线程持久化 (功能性 API)

先决条件

本指南假定您熟悉以下内容

LangGraph 允许您跨不同线程持久化数据。例如,您可以将有关用户的信息(他们的姓名或偏好)存储在共享(跨线程)内存中,并在新线程(例如,新的对话)中重复使用它们。

当使用 功能性 API 时,您可以设置它通过使用 Store 接口来存储和检索记忆。

  1. 创建 Store 的实例

    from langgraph.store.memory import InMemoryStore, BaseStore
    
    store = InMemoryStore()
    
  2. store 实例传递给 entrypoint() 装饰器,并在函数签名中公开 store 参数

    from langgraph.func import entrypoint
    
    @entrypoint(store=store)
    def workflow(inputs: dict, store: BaseStore):
        my_task(inputs).result()
        ...
    

在本指南中,我们将展示如何构建和使用一个工作流,该工作流具有使用 Store 接口实现的共享内存。

注意

本指南中使用的 Store API 的支持是在 LangGraph v0.2.32 中添加的。

本指南中使用的 Store API 的 indexquery 参数的支持是在 LangGraph v0.2.54 中添加的。

注意

如果您需要为 StateGraph 添加跨线程持久化,请查看此操作指南

设置

首先,让我们安装所需的软件包并设置我们的 API 密钥

%%capture --no-stderr
%pip install -U langchain_anthropic langchain_openai langgraph
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序——阅读更多关于如何开始使用 这里

示例:具有长期记忆的简单聊天机器人

定义存储

在本示例中,我们将创建一个工作流,该工作流能够检索有关用户偏好的信息。我们将通过定义 InMemoryStore 来实现这一点 - InMemoryStore 是一个可以将数据存储在内存中并查询该数据的对象。

当使用 Store 接口存储对象时,您需要定义两件事

  • 对象的命名空间,一个元组(类似于目录)
  • 对象键(类似于文件名)

在我们的示例中,我们将使用 ("memories", <user_id>) 作为命名空间,并使用随机 UUID 作为每个新记忆的键。

重要的是,为了确定用户,我们将通过节点函数的 config 关键字参数传递 user_id

让我们首先定义我们的存储!

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings

in_memory_store = InMemoryStore(
    index={
        "embed": OpenAIEmbeddings(model="text-embedding-3-small"),
        "dims": 1536,
    }
)

API 参考:OpenAIEmbeddings

创建工作流

import uuid

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore


model = ChatAnthropic(model="claude-3-5-sonnet-latest")


@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
    namespace = ("memories", user_id)
    last_message = messages[-1]
    memories = memory_store.search(namespace, query=str(last_message.content))
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"You are a helpful assistant talking to the user. User info: {info}"

    # Store new memories if the user asks the model to remember
    if "remember" in last_message.content.lower():
        memory = "User name is Bob"
        memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})

    response = model.invoke([{"role": "system", "content": system_msg}] + messages)
    return response


# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
    inputs: list[BaseMessage],
    *,
    previous: list[BaseMessage],
    config: RunnableConfig,
    store: BaseStore,
):
    user_id = config["configurable"]["user_id"]
    previous = previous or []
    inputs = add_messages(previous, inputs)
    response = call_model(inputs, store, user_id).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

API 参考:ChatAnthropic | RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver

注意

如果您正在使用 LangGraph Cloud 或 LangGraph Studio,则无需将 store 传递给 entrypoint 装饰器,因为它会自动完成。

运行工作流!

现在让我们在配置中指定用户 ID,并告诉模型我们的名字

config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?

config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.
我们现在可以检查我们的内存存储,并验证我们实际上已经保存了用户的记忆

for memory in in_memory_store.search(("memories", "1")):
    print(memory.value)
{'data': 'User name is Bob'}
现在让我们为另一个用户运行工作流,以验证关于第一个用户的记忆是自包含的

config = {"configurable": {"thread_id": "3", "user_id": "2"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't have any information about your name. I can only see our current conversation without any prior context or personal details about you. If you'd like me to know your name, feel free to tell me!

评论