跳到内容

延迟后台记忆处理

当对话活跃时,智能体可能会在短时间内接收到多条消息。与其立即处理每条消息以进行长期记忆管理,不如等到对话活动平息后再处理。本指南将展示如何使用 ReflectionExecutor 对记忆处理进行防抖(debounce)操作。

问题

对每条消息都进行记忆处理存在以下缺点: - 当消息快速连续到达时,会产生冗余工作 - 在对话中途进行处理时,上下文不完整 - 不必要的 token 消耗

ReflectionExecutor 可以延迟记忆处理并取消冗余工作。

from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint
from langgraph.store.memory import InMemoryStore
from langmem import ReflectionExecutor, create_memory_store_manager

# Create memory manager to extract memories from conversations (1)
memory_manager = create_memory_store_manager(
    "anthropic:claude-3-5-sonnet-latest",
    namespace=("memories",),
)
# Wrap memory_manager to handle deferred background processing (2)
executor = ReflectionExecutor(memory_manager)
store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": "openai:text-embedding-3-small",
    }
)

@entrypoint(store=store)
def chat(message: str):
    response = llm.invoke(message)
    # Format conversation for memory processing
    # Must follow OpenAI's message format
    to_process = {"messages": [{"role": "user", "content": message}] + [response]}

    # Wait 30 minutes before processing
    # If new messages arrive before then:
    # 1. Cancel pending processing task
    # 2. Reschedule with new messages included
    delay = 0.5 # In practice would choose longer (30-60 min)
    # depending on app context.
    executor.submit(to_process, after_seconds=delay)
    return response.content
  1. create_memory_store_manager 创建一个可运行(Runnable)的实例,用于从对话中提取记忆。它处理 OpenAI 格式的消息。

    {"messages": [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]}
    

  2. ReflectionExecutor 负责后台处理记忆。对于每个对话线程:

    • 维护一个待处理记忆任务的队列
    • 当新消息到达时,取消旧任务
    • 仅在指定的延迟后进行处理

    这种防抖(debouncing)机制确保您处理的是完整的对话上下文,而不是零散的片段。

    无服务器部署

    在无服务器(serverless)函数的两次调用之间,本地线程会终止。请改用 LangGraph 平台的远程执行器。

    ReflectionExecutor(
        "my_memory_manager", 
        ("memories",), 
        url="https://:2024",
    )
    

评论