跳到内容

存储

持久键值存储的基类和类型。

存储提供跨线程和对话持久化的长期记忆。支持分层命名空间、键值存储和可选的向量搜索。

核心类型
  • BaseStore:具有同步/异步操作的存储接口
  • Item:带有元数据的存储键值对
  • Op:Get/Put/Search/List 操作

NamespacePath = tuple[Union[str, Literal['*']], ...] module-attribute

表示可以包含通配符的命名空间路径的元组。

示例
("users",)  # Exact users namespace
("documents", "*")  # Any sub-namespace under documents
("cache", "*", "v1")  # Any cache category with v1 version

NamespaceMatchType = Literal['prefix', 'suffix'] module-attribute

指定如何匹配命名空间路径。

“prefix”:从命名空间开头匹配 “suffix”:从命名空间末尾匹配

NotProvided

哨兵单例。

Item

表示带有元数据的存储项。

参数

  • value (dict[str, Any]) –

    存储的数据,以字典形式。键是可过滤的。

  • key (str) –

    命名空间内的唯一标识符。

  • namespace (tuple[str, ...]) –

    定义此文档所在集合的分层路径。表示为字符串元组,允许嵌套分类。例如:(“documents”,“user123”)

  • created_at (datetime) –

    项目创建的时间戳。

  • updated_at (datetime) –

    上次更新的时间戳。

SearchItem

基类:Item

表示从搜索操作返回的项,带有额外的元数据。

__init__(namespace: tuple[str, ...], key: str, value: dict[str, Any], created_at: datetime, updated_at: datetime, score: Optional[float] = None) -> None

初始化结果项。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

  • value (dict[str, Any]) –

    存储的值。

  • created_at (datetime) –

    项目首次创建的时间。

  • updated_at (datetime) –

    项目上次更新的时间。

  • score (Optional[float], default: None ) –

    如果是排序操作,则为相关性/相似度得分。

GetOp

基类:NamedTuple

通过命名空间和键检索特定项的操作。

此操作允许使用完整路径(命名空间)和唯一标识符(键)组合精确检索存储的项。

示例

基本项检索

GetOp(namespace=("users", "profiles"), key="user123")
GetOp(namespace=("cache", "embeddings"), key="doc456")

namespace: tuple[str, ...] instance-attribute

唯一标识项目位置的分层路径。

示例
("users",)  # Root level users namespace
("users", "profiles")  # Profiles within users namespace

key: str instance-attribute

项目在其特定命名空间内的唯一标识符。

示例
"user123"  # For a user profile
"doc456"  # For a document

refresh_ttl: bool = True class-attribute instance-attribute

是否刷新返回项的 TTL。

如果原始项未指定 TTL,或者您的适配器未启用 TTL 支持,则忽略此参数。

SearchOp

基类:NamedTuple

在指定的命名空间层次结构中搜索项的操作。

此操作支持在给定命名空间前缀内进行结构化过滤和自然语言搜索。它通过 limit 和 offset 参数提供分页。

注意

自然语言搜索支持取决于您的存储实现。

示例

使用过滤器和分页进行搜索

SearchOp(
    namespace_prefix=("documents",),
    filter={"type": "report", "status": "active"},
    limit=5,
    offset=10
)

自然语言搜索

SearchOp(
    namespace_prefix=("users", "content"),
    query="technical documentation about APIs",
    limit=20
)

namespace_prefix: tuple[str, ...] instance-attribute

定义搜索范围的分层路径前缀。

示例
()  # Search entire store
("documents",)  # Search all documents
("users", "content")  # Search within user content

filter: Optional[dict[str, Any]] = None class-attribute instance-attribute

用于基于完全匹配或比较运算符过滤结果的键值对。

过滤器同时支持完全匹配和基于运算符的比较。

支持的运算符
  • $eq:等于(与直接值比较相同)
  • $ne:不等于
  • $gt:大于
  • $gte:大于或等于
  • $lt:小于
  • $lte:小于或等于
示例

简单的完全匹配

{"status": "active"}

比较运算符

{"score": {"$gt": 4.99}}  # Score greater than 4.99

多重条件

{
    "score": {"$gte": 3.0},
    "color": "red"
}

limit: int = 10 class-attribute instance-attribute

搜索结果中返回的最大项数。

offset: int = 0 class-attribute instance-attribute

分页要跳过的匹配项数。

query: Optional[str] = None class-attribute instance-attribute

用于语义搜索功能的自然语言搜索查询。

示例
  • “关于 REST API 的技术文档”
  • “2023 年的机器学习论文”

refresh_ttl: bool = True class-attribute instance-attribute

是否刷新返回项的 TTL。

如果原始项未指定 TTL,或者您的适配器未启用 TTL 支持,则忽略此参数。

MatchCondition

基类:NamedTuple

表示用于匹配存储中命名空间的模式。

此类将匹配类型(前缀或后缀)与命名空间路径模式相结合,该模式可以包含通配符,以灵活匹配不同的命名空间层次结构。

示例

前缀匹配

MatchCondition(match_type="prefix", path=("users", "profiles"))

带有通配符的后缀匹配

MatchCondition(match_type="suffix", path=("cache", "*"))

简单后缀匹配

MatchCondition(match_type="suffix", path=("v1",))

match_type: NamespaceMatchType instance-attribute

要执行的命名空间匹配类型。

path: NamespacePath instance-attribute

可以包含通配符的命名空间路径模式。

ListNamespacesOp

基类:NamedTuple

列出和过滤存储中命名空间的操作。

此操作允许探索数据组织、查找特定集合以及导航命名空间层次结构。

示例

列出 “documents” 路径下的所有命名空间

ListNamespacesOp(
    match_conditions=(MatchCondition(match_type="prefix", path=("documents",)),),
    max_depth=2
)

列出所有以 “v1” 结尾的命名空间

ListNamespacesOp(
    match_conditions=(MatchCondition(match_type="suffix", path=("v1",)),),
    limit=50
)

match_conditions: Optional[tuple[MatchCondition, ...]] = None class-attribute instance-attribute

用于过滤命名空间的可选条件。

示例

所有用户命名空间

(MatchCondition(match_type="prefix", path=("users",)),)

所有以 “docs” 开头并以 “draft” 结尾的命名空间

(
    MatchCondition(match_type="prefix", path=("docs",)),
    MatchCondition(match_type="suffix", path=("draft",))
) 

max_depth: Optional[int] = None class-attribute instance-attribute

要返回的命名空间层次结构的最大深度。

注意

深度超过此级别的命名空间将被截断。

limit: int = 100 class-attribute instance-attribute

要返回的最大命名空间数。

offset: int = 0 class-attribute instance-attribute

分页要跳过的命名空间数。

PutOp

基类:NamedTuple

在存储中存储、更新或删除项的操作。

此类表示修改存储内容的单个操作,无论是添加新项、更新现有项还是删除它们。

namespace: tuple[str, ...] instance-attribute

标识项目位置的分层路径。

命名空间充当文件夹状结构来组织项。元组中的每个元素代表层次结构中的一个级别。

示例

根级别文档

("documents",)

用户特定文档

("documents", "user123")

嵌套缓存结构

("cache", "embeddings", "v1")

key: str instance-attribute

项目在其命名空间内的唯一标识符。

键在特定命名空间内必须唯一,以避免冲突。与命名空间一起,它构成了项目的完整路径。

示例

如果命名空间为(“documents”,“user123”),键为 “report1”,则完整路径实际上将为 “documents/user123/report1”

value: Optional[dict[str, Any]] instance-attribute

要存储的数据,或 None 以标记要删除的项目。

值必须是带有字符串键和 JSON 可序列化值的字典。将其设置为 None 表示应删除该项。

示例

{ “field1”: “字符串值”, “field2”: 123, “nested”: {“can”: “contain”, “any”: “可序列化数据”} }

index: Optional[Union[Literal[False], list[str]]] = None class-attribute instance-attribute

控制如何为搜索操作索引项目的字段。

无论是否索引,该项目仍然可以通过直接 get() 操作访问。索引后,可以使用自然语言查询通过向量相似性搜索(如果存储实现支持)搜索字段。

路径语法
  • 简单字段访问:“field”
  • 嵌套字段:“parent.child.grandchild”
  • 数组索引
  • 特定索引:“array[0]”
  • 最后一个元素:“array[-1]”
  • 所有元素(每个单独地):“array[*]”
示例
  • None - 使用存储默认值(整个项目)
  • list[str] - 要索引的字段列表
[
    "metadata.title",                    # Nested field access
    "context[*].content",                # Index content from all context as separate vectors
    "authors[0].name",                   # First author's name
    "revisions[-1].changes",             # Most recent revision's changes
    "sections[*].paragraphs[*].text",    # All text from all paragraphs in all sections
    "metadata.tags[*]",                  # All tags in metadata
]

ttl: Optional[float] = None class-attribute instance-attribute

控制项目中 TTL(生存时间),单位为分钟。

如果提供,并且如果您使用的存储支持此功能,则该项目将在上次访问后这么多分钟过期。过期计时器在读取操作(get/search)和写入操作(put/update)时都会刷新。当 TTL 过期时,该项目将被安排在尽力而为的基础上删除。默认为 None(永不过期)。

InvalidNamespaceError

基类:ValueError

提供的命名空间无效。

TTLConfig

基类:TypedDict

存储中 TTL(生存时间)行为的配置。

refresh_on_read: bool instance-attribute

读取操作(GET 和 SEARCH)时刷新 TTL 的默认行为。

如果为 True,则默认情况下将在读取操作(get/search)时刷新 TTL。可以通过显式设置 refresh_ttl 来按操作覆盖此设置。如果未配置,则默认为 True。

default_ttl: Optional[float] instance-attribute

新项目的默认 TTL(生存时间),单位为分钟。

如果提供,则新项目将在上次访问后这么多分钟过期。过期计时器在读取和写入操作时都会刷新。默认为 None(永不过期)。

sweep_interval_minutes: Optional[int] instance-attribute

TTL 清扫操作之间的间隔,单位为分钟。

如果提供,存储将定期删除基于 TTL 过期的项目。默认为 None(不进行清扫)。

IndexConfig

基类:TypedDict

用于在存储中为语义搜索索引文档的配置。

如果未提供给存储,则存储将不支持向量搜索。在这种情况下,put() 和 aput() 操作的所有 index 参数都将被忽略。

dims: int instance-attribute

嵌入向量中的维度数。

常见的嵌入模型具有以下维度
  • openai:text-embedding-3-large:3072
  • openai:text-embedding-3-small:1536
  • openai:text-embedding-ada-002:1536
  • cohere:embed-english-v3.0:1024
  • cohere:embed-english-light-v3.0:384
  • cohere:embed-multilingual-v3.0:1024
  • cohere:embed-multilingual-light-v3.0:384

embed: Union[Embeddings, EmbeddingsFunc, AEmbeddingsFunc, str] instance-attribute

用于从文本生成嵌入的可选函数。

可以通过三种方式指定
  1. LangChain Embeddings 实例
  2. 同步嵌入函数 (EmbeddingsFunc)
  3. 异步嵌入函数 (AEmbeddingsFunc)
  4. 提供程序字符串(例如,“openai:text-embedding-3-small”)
示例

使用 LangChain 的初始化与 InMemoryStore

from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStore

store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small")
    }
)

使用自定义嵌入函数与 InMemoryStore

from openai import OpenAI
from langgraph.store.memory import InMemoryStore

client = OpenAI()

def embed_texts(texts: list[str]) -> list[list[float]]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [e.embedding for e in response.data]

store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": embed_texts
    }
)

使用异步嵌入函数与 InMemoryStore

from openai import AsyncOpenAI
from langgraph.store.memory import InMemoryStore

client = AsyncOpenAI()

async def aembed_texts(texts: list[str]) -> list[list[float]]:
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [e.embedding for e in response.data]

store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": aembed_texts
    }
)

fields: Optional[list[str]] instance-attribute

要从中提取文本以生成嵌入的字段。

控制存储项的哪些部分被嵌入以进行语义搜索。遵循 JSON 路径语法

- ["$"]: Embeds the entire JSON object as one vector  (default)
- ["field1", "field2"]: Embeds specific top-level fields
- ["parent.child"]: Embeds nested fields using dot notation
- ["array[*].field"]: Embeds field from each array element separately
注意

您始终可以使用 putaput 操作中的 index 参数来覆盖此行为。

示例
# Embed entire document (default)
fields=["$"]

# Embed specific fields
fields=["text", "summary"]

# Embed nested fields
fields=["metadata.title", "content.body"]

# Embed from arrays
fields=["messages[*].content"]  # Each message content separately
fields=["context[0].text"]      # First context item's text
注意
  • 文档中缺少的字段将被跳过
  • 数组表示法为每个元素创建单独的嵌入
  • 支持复杂的嵌套路径(例如,“a.b[*].c.d”)

BaseStore

基类:ABC

持久性键值存储的抽象基类。

存储支持持久性和内存,这些持久性和内存可以在线程之间共享,并限定于用户 ID、助手 ID 或其他任意命名空间。某些实现可能通过可选的 index 配置来支持语义搜索功能。

注意

语义搜索功能因实现而异,通常默认禁用。支持此功能的存储可以通过在创建时提供 index 配置来进行配置。如果没有此配置,语义搜索将被禁用,并且对存储操作的任何 index 参数都将无效。

同样,TTL(生存时间)支持默认情况下也是禁用的。子类必须显式设置 supports_ttl = True 才能启用此功能。

batch(ops: Iterable[Op]) -> list[Result] abstractmethod

在单个批处理中同步执行多个操作。

参数

  • ops (Iterable[Op]) –

    要执行的操作的可迭代对象。

返回

  • list[Result]

    结果列表,其中每个结果对应于输入中的一个操作。

  • list[Result]

    结果的顺序与输入操作的顺序一致。

abatch(ops: Iterable[Op]) -> list[Result] abstractmethod async

在单个批处理中异步执行多个操作。

参数

  • ops (Iterable[Op]) –

    要执行的操作的可迭代对象。

返回

  • list[Result]

    结果列表,其中每个结果对应于输入中的一个操作。

  • list[Result]

    结果的顺序与输入操作的顺序一致。

get(namespace: tuple[str, ...], key: str, *, refresh_ttl: Optional[bool] = None) -> Optional[Item]

检索单个项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果为 None(默认值),则使用存储的默认 refresh_ttl 设置。如果未指定 TTL,则忽略此参数。

返回

  • Optional[Item]

    检索到的项目;如果未找到,则为 None。

search(namespace_prefix: tuple[str, ...], /, *, query: Optional[str] = None, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, refresh_ttl: Optional[bool] = None) -> list[SearchItem]

在命名空间前缀内搜索项目。

参数

  • namespace_prefix (tuple[str, ...]) –

    要在其中搜索的分层路径前缀。

  • query (Optional[str], default: None ) –

    自然语言搜索的可选查询。

  • filter (Optional[dict[str, Any]], default: None ) –

    用于过滤结果的键值对。

  • limit (int, default: 10 ) –

    要返回的最大项目数。

  • offset (int, default: 0 ) –

    在返回结果之前要跳过的项目数。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果未指定 TTL,则忽略此参数。

返回

  • list[SearchItem]

    与搜索条件匹配的项目列表。

示例

基本过滤

# Search for documents with specific metadata
results = store.search(
    ("docs",),
    filter={"type": "article", "status": "published"}
)

自然语言搜索(需要向量存储实现)

# Initialize store with embedding configuration
store = YourStore( # e.g., InMemoryStore, AsyncPostgresStore
    index={
        "dims": 1536,  # embedding dimensions
        "embed": your_embedding_function,  # function to create embeddings
        "fields": ["text"]  # fields to embed. Defaults to ["$"]
    }
)

# Search for semantically similar documents
results = store.search(
    ("docs",),
    query="machine learning applications in healthcare",
    filter={"type": "research_paper"},
    limit=5
)

注意:自然语言搜索支持取决于您的存储实现,并且需要正确的嵌入配置。

put(namespace: tuple[str, ...], key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, *, ttl: Union[Optional[float], NotProvided] = NOT_PROVIDED) -> None

在存储中存储或更新项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径,表示为字符串元组。示例:(“documents”, “user123”)

  • key (str) –

    命名空间内的唯一标识符。与命名空间一起构成项目的完整路径。

  • value (dict[str, Any]) –

    包含项目数据的字典。必须包含字符串键和可 JSON 序列化的值。

  • index (Optional[Union[Literal[False], list[str]]], default: None ) –

    控制如何为搜索索引项目的字段

    • None(默认值):使用您在创建存储时配置的 fields(如果有)。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略
    • False:禁用此项目的索引
    • list[str]:要索引的字段路径列表,支持
      • 嵌套字段:“metadata.title”
      • 数组访问:“chapters[*].content”(每个元素单独索引)
      • 特定索引:“authors[0].name”
  • ttl (Union[Optional[float], NotProvided], default: NOT_PROVIDED ) –

    生存时间(分钟)。对此参数的支持取决于您的存储适配器。如果指定,则项目将在上次访问后经过这么多分钟后过期。None 表示永不过期。过期的运行将被机会性地删除。默认情况下,每当项目包含在操作中时,过期计时器都会在读取操作(get/search)和写入操作(put/update)时刷新。

注意

索引支持取决于您的存储实现。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略。

同样,TTL 支持取决于特定的存储实现。某些实现可能不支持项目过期。

示例

存储项目。索引取决于您如何配置存储。

store.put(("docs",), "report", {"memory": "Will likes ai"})

不要为语义搜索索引项目。仍然可以通过 get() 和 search() 操作访问,但不会有向量表示。

store.put(("docs",), "report", {"memory": "Will likes ai"}, index=False)

为搜索索引特定字段。

store.put(("docs",), "report", {"memory": "Will likes ai"}, index=["memory"])

delete(namespace: tuple[str, ...], key: str) -> None

删除项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

list_namespaces(*, prefix: Optional[NamespacePath] = None, suffix: Optional[NamespacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]]

列出和过滤存储中的命名空间。

用于探索数据的组织结构、查找特定集合或导航命名空间层次结构。

参数

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径开头的命名空间。

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径结尾的命名空间。

  • max_depth (Optional[int], default: None ) –

    返回层级结构中达到此深度的命名空间。深度超过此级别的命名空间将被截断。

  • limit (int, default: 100 ) –

    要返回的最大命名空间数(默认为 100)。

  • offset (int, default: 0 ) –

    用于分页的要跳过的命名空间数(默认为 0)。

返回

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]:与条件匹配的命名空间元组列表。

  • list[tuple[str, ...]]

    每个元组表示达到 max_depth 的完整命名空间路径。

???+ example "示例":设置 max_depth=3。给定命名空间

# Example if you have the following namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")
store.list_namespaces(prefix=("a", "b"), max_depth=3)
# [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]

aget(namespace: tuple[str, ...], key: str, *, refresh_ttl: Optional[bool] = None) -> Optional[Item] async

异步检索单个项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

返回

  • Optional[Item]

    检索到的项目;如果未找到,则为 None。

asearch(namespace_prefix: tuple[str, ...], /, *, query: Optional[str] = None, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, refresh_ttl: Optional[bool] = None) -> list[SearchItem] async

异步搜索命名空间前缀内的项目。

参数

  • namespace_prefix (tuple[str, ...]) –

    要在其中搜索的分层路径前缀。

  • query (Optional[str], default: None ) –

    自然语言搜索的可选查询。

  • filter (Optional[dict[str, Any]], default: None ) –

    用于过滤结果的键值对。

  • limit (int, default: 10 ) –

    要返回的最大项目数。

  • offset (int, default: 0 ) –

    在返回结果之前要跳过的项目数。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果为 None(默认值),则使用存储的 TTLConfig.refresh_default 设置。如果未提供 TTLConfig 或未指定 TTL,则忽略此参数。

返回

  • list[SearchItem]

    与搜索条件匹配的项目列表。

示例

基本过滤

# Search for documents with specific metadata
results = await store.asearch(
    ("docs",),
    filter={"type": "article", "status": "published"}
)

自然语言搜索(需要向量存储实现)

# Initialize store with embedding configuration
store = YourStore( # e.g., InMemoryStore, AsyncPostgresStore
    index={
        "dims": 1536,  # embedding dimensions
        "embed": your_embedding_function,  # function to create embeddings
        "fields": ["text"]  # fields to embed
    }
)

# Search for semantically similar documents
results = await store.asearch(
    ("docs",),
    query="machine learning applications in healthcare",
    filter={"type": "research_paper"},
    limit=5
)

注意:自然语言搜索支持取决于您的存储实现,并且需要正确的嵌入配置。

aput(namespace: tuple[str, ...], key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, *, ttl: Union[Optional[float], NotProvided] = NOT_PROVIDED) -> None async

异步存储或更新存储中的项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径,表示为字符串元组。示例:(“documents”, “user123”)

  • key (str) –

    命名空间内的唯一标识符。与命名空间一起构成项目的完整路径。

  • value (dict[str, Any]) –

    包含项目数据的字典。必须包含字符串键和可 JSON 序列化的值。

  • index (Optional[Union[Literal[False], list[str]]], default: None ) –

    控制如何为搜索索引项目的字段

    • None(默认值):使用您在创建存储时配置的 fields(如果有)。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略
    • False:禁用此项目的索引
    • list[str]:要索引的字段路径列表,支持
      • 嵌套字段:“metadata.title”
      • 数组访问:“chapters[*].content”(每个元素单独索引)
      • 特定索引:“authors[0].name”
  • ttl (Union[Optional[float], NotProvided], default: NOT_PROVIDED ) –

    生存时间(分钟)。对此参数的支持取决于您的存储适配器。如果指定,则项目将在上次访问后经过这么多分钟后过期。None 表示永不过期。过期的运行将被机会性地删除。默认情况下,每当项目包含在操作中时,过期计时器都会在读取操作(get/search)和写入操作(put/update)时刷新。

注意

索引支持取决于您的存储实现。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略。

同样,TTL 支持取决于特定的存储实现。某些实现可能不支持项目过期。

示例

存储项目。索引取决于您如何配置存储。

await store.aput(("docs",), "report", {"memory": "Will likes ai"})

不要为语义搜索索引项目。仍然可以通过 get() 和 search() 操作访问,但不会有向量表示。

await store.aput(("docs",), "report", {"memory": "Will likes ai"}, index=False)

为搜索索引特定字段(如果存储配置为索引项目)

await store.aput(
    ("docs",),
    "report",
    {
        "memory": "Will likes ai",
        "context": [{"content": "..."}, {"content": "..."}]
    },
    index=["memory", "context[*].content"]
)

adelete(namespace: tuple[str, ...], key: str) -> None async

异步删除项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

alist_namespaces(*, prefix: Optional[NamespacePath] = None, suffix: Optional[NamespacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]] async

异步列出和过滤存储中的命名空间。

用于探索数据的组织结构、查找特定集合或导航命名空间层次结构。

参数

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径开头的命名空间。

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径结尾的命名空间。

  • max_depth (Optional[int], default: None ) –

    返回层级结构中达到此深度的命名空间。深度超过此级别的命名空间将被截断到此深度。

  • limit (int, default: 100 ) –

    要返回的最大命名空间数(默认为 100)。

  • offset (int, default: 0 ) –

    用于分页的要跳过的命名空间数(默认为 0)。

返回

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]:与条件匹配的命名空间元组列表。

  • list[tuple[str, ...]]

    每个元组表示达到 max_depth 的完整命名空间路径。

示例

使用现有命名空间设置 max_depth=3

# Given the following namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")

await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
# Returns: [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]

ensure_embeddings(embed: Union[Embeddings, EmbeddingsFunc, AEmbeddingsFunc, str, None]) -> Embeddings

确保嵌入函数符合 LangChain 的 Embeddings 接口。

此函数包装任意嵌入函数,使其与 LangChain 的 Embeddings 接口兼容。它处理同步和异步函数。

参数

  • embed (Union[Embeddings, EmbeddingsFunc, AEmbeddingsFunc, str, None]) –

    现有的 Embeddings 实例,或将文本转换为嵌入的函数。如果该函数是异步的,它将用于同步和异步操作。

返回

  • Embeddings

    一个 Embeddings 实例,它包装了提供的函数。

示例

包装同步嵌入函数

def my_embed_fn(texts):
    return [[0.1, 0.2] for _ in texts]

embeddings = ensure_embeddings(my_embed_fn)
result = embeddings.embed_query("hello")  # Returns [0.1, 0.2]

包装异步嵌入函数

async def my_async_fn(texts):
    return [[0.1, 0.2] for _ in texts]

embeddings = ensure_embeddings(my_async_fn)
result = await embeddings.aembed_query("hello")  # Returns [0.1, 0.2]

使用提供程序字符串初始化嵌入

# Requires langchain>=0.3.9 and langgraph-checkpoint>=2.0.11
embeddings = ensure_embeddings("openai:text-embedding-3-small")
result = embeddings.embed_query("hello")

get_text_at_path(obj: Any, path: Union[str, list[str]]) -> list[str]

使用路径表达式或预标记化路径从对象中提取文本。

参数

  • obj (Any) –

    要从中提取文本的对象

  • path (Union[str, list[str]]) –

    路径字符串或预标记化路径列表。

处理的路径类型

  • 简单路径:“field1.field2”
  • 数组索引:“[0]”、“[*]”、“[-1]”
  • 通配符:“*”
  • 多字段选择:“{field1,field2}”
  • 多字段中的嵌套路径:“{field1,nested.field2}”

tokenize_path(path: str) -> list[str]

将路径标记化为组件。

处理的类型

  • 简单路径:“field1.field2”
  • 数组索引:“[0]”、“[*]”、“[-1]”
  • 通配符:“*”
  • 多字段选择:“{field1,field2}”

AsyncPostgresStore

基类:AsyncBatchedBaseStoreBasePostgresStore[Conn]

异步 Postgres 支持的存储,具有使用 pgvector 的可选向量搜索。

示例

基本设置和用法

from langgraph.store.postgres import AsyncPostgresStore

conn_string = "postgresql://user:pass@localhost:5432/dbname"

async with AsyncPostgresStore.from_conn_string(conn_string) as store:
    await store.setup()  # Run migrations. Done once

    # Store and retrieve data
    await store.aput(("users", "123"), "prefs", {"theme": "dark"})
    item = await store.aget(("users", "123"), "prefs")

使用 LangChain 嵌入的向量搜索

from langchain.embeddings import init_embeddings
from langgraph.store.postgres import AsyncPostgresStore

conn_string = "postgresql://user:pass@localhost:5432/dbname"

async with AsyncPostgresStore.from_conn_string(
    conn_string,
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "fields": ["text"]  # specify which fields to embed. Default is the whole serialized value
    }
) as store:
    await store.setup()  # Run migrations. Done once

    # Store documents
    await store.aput(("docs",), "doc1", {"text": "Python tutorial"})
    await store.aput(("docs",), "doc2", {"text": "TypeScript guide"})
    await store.aput(("docs",), "doc3", {"text": "Other guide"}, index=False)  # don't index

    # Search by similarity
    results = await store.asearch(("docs",), "programming guides", limit=2)

使用连接池以获得更好的性能

from langgraph.store.postgres import AsyncPostgresStore, PoolConfig

conn_string = "postgresql://user:pass@localhost:5432/dbname"

async with AsyncPostgresStore.from_conn_string(
    conn_string,
    pool_config=PoolConfig(
        min_size=5,
        max_size=20
    )
) as store:
    await store.setup()  # Run migrations. Done once
    # Use store with connection pooling...

警告

请确保:1. 首次使用前调用 setup() 以创建必要的表和索引 2. 具有可用的 pgvector 扩展以使用向量搜索 3. 使用 Python 3.10+ 以获得异步功能

注意

语义搜索默认禁用。您可以通过在创建存储时提供 index 配置来启用它。如果没有此配置,传递给 putaput 的所有 index 参数都将无效。

注意

如果您提供 TTL 配置,则必须显式调用 start_ttl_sweeper() 以启动删除过期项目的后台任务。在完成存储后,调用 stop_ttl_sweeper() 以正确清理资源。

from_conn_string(conn_string: str, *, pipeline: bool = False, pool_config: Optional[PoolConfig] = None, index: Optional[PostgresIndexConfig] = None, ttl: Optional[TTLConfig] = None) -> AsyncIterator[AsyncPostgresStore] async classmethod

从连接字符串创建新的 AsyncPostgresStore 实例。

参数

  • conn_string (str) –

    Postgres 连接信息字符串。

  • pipeline (bool, default: False ) –

    是否使用 AsyncPipeline(仅用于单连接)

  • pool_config (Optional[PoolConfig], default: None ) –

    连接池的配置。如果提供,将创建一个连接池并使用它代替单个连接。这会覆盖 pipeline 参数。

  • index (Optional[PostgresIndexConfig], default: None ) –

    嵌入配置。

返回

  • AsyncPostgresStore ( AsyncIterator[AsyncPostgresStore] ) –

    新的 AsyncPostgresStore 实例。

setup() -> None async

异步设置存储数据库。

此方法在 Postgres 数据库中创建必要的表(如果它们尚不存在)并运行数据库迁移。首次使用存储时,必须由用户直接调用它。

sweep_ttl() -> int async

根据 TTL 删除过期的存储项目。

返回

  • int ( int ) –

    已删除项目的数量。

start_ttl_sweeper(sweep_interval_minutes: Optional[int] = None) -> asyncio.Task[None] async

定期根据 TTL 删除过期的存储项目。

返回

  • Task[None]

    可以等待或取消的任务。

stop_ttl_sweeper(timeout: Optional[float] = None) -> bool async

如果 TTL 清理器任务正在运行,则停止它。

参数

  • timeout (Optional[float], default: None ) –

    等待任务停止的最长时间(秒)。如果为 None,则无限期等待。

返回

  • bool ( bool ) –

    如果任务已成功停止或未运行,则为 True;如果在任务停止之前达到超时,则为 False。

PostgresStore

基类:BaseStoreBasePostgresStore[Conn]

Postgres 支持的存储,具有使用 pgvector 的可选向量搜索。

示例

基本设置和用法

from langgraph.store.postgres import PostgresStore
from psycopg import Connection

conn_string = "postgresql://user:pass@localhost:5432/dbname"

# Using direct connection
with Connection.connect(conn_string) as conn:
    store = PostgresStore(conn)
    store.setup() # Run migrations. Done once

    # Store and retrieve data
    store.put(("users", "123"), "prefs", {"theme": "dark"})
    item = store.get(("users", "123"), "prefs")

或使用方便的 from_conn_string 助手

from langgraph.store.postgres import PostgresStore

conn_string = "postgresql://user:pass@localhost:5432/dbname"

with PostgresStore.from_conn_string(conn_string) as store:
    store.setup()

    # Store and retrieve data
    store.put(("users", "123"), "prefs", {"theme": "dark"})
    item = store.get(("users", "123"), "prefs")

使用 LangChain 嵌入的向量搜索

from langchain.embeddings import init_embeddings
from langgraph.store.postgres import PostgresStore

conn_string = "postgresql://user:pass@localhost:5432/dbname"

with PostgresStore.from_conn_string(
    conn_string,
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "fields": ["text"]  # specify which fields to embed. Default is the whole serialized value
    }
) as store:
    store.setup() # Do this once to run migrations

    # Store documents
    store.put(("docs",), "doc1", {"text": "Python tutorial"})
    store.put(("docs",), "doc2", {"text": "TypeScript guide"})
    store.put(("docs",), "doc2", {"text": "Other guide"}, index=False) # don't index

    # Search by similarity
    results = store.search(("docs",), "programming guides", limit=2)

注意

语义搜索默认禁用。您可以通过在创建存储时提供 index 配置来启用它。如果没有此配置,传递给 putaput 的所有 index 参数都将无效。

警告

请确保在首次使用前调用 setup() 以创建必要的表和索引。pgvector 扩展必须可用才能使用向量搜索。

注意

如果您提供 TTL 配置,则必须显式调用 start_ttl_sweeper() 以启动删除过期项目的后台线程。在完成存储后,调用 stop_ttl_sweeper() 以正确清理资源。

get(namespace: tuple[str, ...], key: str, *, refresh_ttl: Optional[bool] = None) -> Optional[Item]

检索单个项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果为 None(默认值),则使用存储的默认 refresh_ttl 设置。如果未指定 TTL,则忽略此参数。

返回

  • Optional[Item]

    检索到的项目;如果未找到,则为 None。

search(namespace_prefix: tuple[str, ...], /, *, query: Optional[str] = None, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, refresh_ttl: Optional[bool] = None) -> list[SearchItem]

在命名空间前缀内搜索项目。

参数

  • namespace_prefix (tuple[str, ...]) –

    要在其中搜索的分层路径前缀。

  • query (Optional[str], default: None ) –

    自然语言搜索的可选查询。

  • filter (Optional[dict[str, Any]], default: None ) –

    用于过滤结果的键值对。

  • limit (int, default: 10 ) –

    要返回的最大项目数。

  • offset (int, default: 0 ) –

    在返回结果之前要跳过的项目数。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果未指定 TTL,则忽略此参数。

返回

  • list[SearchItem]

    与搜索条件匹配的项目列表。

示例

基本过滤

# Search for documents with specific metadata
results = store.search(
    ("docs",),
    filter={"type": "article", "status": "published"}
)

自然语言搜索(需要向量存储实现)

# Initialize store with embedding configuration
store = YourStore( # e.g., InMemoryStore, AsyncPostgresStore
    index={
        "dims": 1536,  # embedding dimensions
        "embed": your_embedding_function,  # function to create embeddings
        "fields": ["text"]  # fields to embed. Defaults to ["$"]
    }
)

# Search for semantically similar documents
results = store.search(
    ("docs",),
    query="machine learning applications in healthcare",
    filter={"type": "research_paper"},
    limit=5
)

注意:自然语言搜索支持取决于您的存储实现,并且需要正确的嵌入配置。

put(namespace: tuple[str, ...], key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, *, ttl: Union[Optional[float], NotProvided] = NOT_PROVIDED) -> None

在存储中存储或更新项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径,表示为字符串元组。示例:(“documents”, “user123”)

  • key (str) –

    命名空间内的唯一标识符。与命名空间一起构成项目的完整路径。

  • value (dict[str, Any]) –

    包含项目数据的字典。必须包含字符串键和可 JSON 序列化的值。

  • index (Optional[Union[Literal[False], list[str]]], default: None ) –

    控制如何为搜索索引项目的字段

    • None(默认值):使用您在创建存储时配置的 fields(如果有)。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略
    • False:禁用此项目的索引
    • list[str]:要索引的字段路径列表,支持
      • 嵌套字段:“metadata.title”
      • 数组访问:“chapters[*].content”(每个元素单独索引)
      • 特定索引:“authors[0].name”
  • ttl (Union[Optional[float], NotProvided], default: NOT_PROVIDED ) –

    生存时间(分钟)。对此参数的支持取决于您的存储适配器。如果指定,则项目将在上次访问后经过这么多分钟后过期。None 表示永不过期。过期的运行将被机会性地删除。默认情况下,每当项目包含在操作中时,过期计时器都会在读取操作(get/search)和写入操作(put/update)时刷新。

注意

索引支持取决于您的存储实现。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略。

同样,TTL 支持取决于特定的存储实现。某些实现可能不支持项目过期。

示例

存储项目。索引取决于您如何配置存储。

store.put(("docs",), "report", {"memory": "Will likes ai"})

不要为语义搜索索引项目。仍然可以通过 get() 和 search() 操作访问,但不会有向量表示。

store.put(("docs",), "report", {"memory": "Will likes ai"}, index=False)

为搜索索引特定字段。

store.put(("docs",), "report", {"memory": "Will likes ai"}, index=["memory"])

delete(namespace: tuple[str, ...], key: str) -> None

删除项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

list_namespaces(*, prefix: Optional[NamespacePath] = None, suffix: Optional[NamespacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]]

列出和过滤存储中的命名空间。

用于探索数据的组织结构、查找特定集合或导航命名空间层次结构。

参数

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径开头的命名空间。

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径结尾的命名空间。

  • max_depth (Optional[int], default: None ) –

    返回层级结构中达到此深度的命名空间。深度超过此级别的命名空间将被截断。

  • limit (int, default: 100 ) –

    要返回的最大命名空间数(默认为 100)。

  • offset (int, default: 0 ) –

    用于分页的要跳过的命名空间数(默认为 0)。

返回

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]:与条件匹配的命名空间元组列表。

  • list[tuple[str, ...]]

    每个元组表示达到 max_depth 的完整命名空间路径。

???+ example "示例":设置 max_depth=3。给定命名空间

# Example if you have the following namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")
store.list_namespaces(prefix=("a", "b"), max_depth=3)
# [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]

aget(namespace: tuple[str, ...], key: str, *, refresh_ttl: Optional[bool] = None) -> Optional[Item] 异步

异步检索单个项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

返回

  • Optional[Item]

    检索到的项目;如果未找到,则为 None。

asearch(namespace_prefix: tuple[str, ...], /, *, query: Optional[str] = None, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, refresh_ttl: Optional[bool] = None) -> list[SearchItem] 异步

异步搜索命名空间前缀内的项目。

参数

  • namespace_prefix (tuple[str, ...]) –

    要在其中搜索的分层路径前缀。

  • query (Optional[str], default: None ) –

    自然语言搜索的可选查询。

  • filter (Optional[dict[str, Any]], default: None ) –

    用于过滤结果的键值对。

  • limit (int, default: 10 ) –

    要返回的最大项目数。

  • offset (int, default: 0 ) –

    在返回结果之前要跳过的项目数。

  • refresh_ttl (Optional[bool], default: None ) –

    是否刷新返回项目的 TTL。如果为 None(默认值),则使用存储的 TTLConfig.refresh_default 设置。如果未提供 TTLConfig 或未指定 TTL,则忽略此参数。

返回

  • list[SearchItem]

    与搜索条件匹配的项目列表。

示例

基本过滤

# Search for documents with specific metadata
results = await store.asearch(
    ("docs",),
    filter={"type": "article", "status": "published"}
)

自然语言搜索(需要向量存储实现)

# Initialize store with embedding configuration
store = YourStore( # e.g., InMemoryStore, AsyncPostgresStore
    index={
        "dims": 1536,  # embedding dimensions
        "embed": your_embedding_function,  # function to create embeddings
        "fields": ["text"]  # fields to embed
    }
)

# Search for semantically similar documents
results = await store.asearch(
    ("docs",),
    query="machine learning applications in healthcare",
    filter={"type": "research_paper"},
    limit=5
)

注意:自然语言搜索支持取决于您的存储实现,并且需要正确的嵌入配置。

aput(namespace: tuple[str, ...], key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, *, ttl: Union[Optional[float], NotProvided] = NOT_PROVIDED) -> None 异步

异步存储或更新存储中的项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径,表示为字符串元组。示例:(“documents”, “user123”)

  • key (str) –

    命名空间内的唯一标识符。与命名空间一起构成项目的完整路径。

  • value (dict[str, Any]) –

    包含项目数据的字典。必须包含字符串键和可 JSON 序列化的值。

  • index (Optional[Union[Literal[False], list[str]]], default: None ) –

    控制如何为搜索索引项目的字段

    • None(默认值):使用您在创建存储时配置的 fields(如果有)。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略
    • False:禁用此项目的索引
    • list[str]:要索引的字段路径列表,支持
      • 嵌套字段:“metadata.title”
      • 数组访问:“chapters[*].content”(每个元素单独索引)
      • 特定索引:“authors[0].name”
  • ttl (Union[Optional[float], NotProvided], default: NOT_PROVIDED ) –

    生存时间(分钟)。对此参数的支持取决于您的存储适配器。如果指定,则项目将在上次访问后经过这么多分钟后过期。None 表示永不过期。过期的运行将被机会性地删除。默认情况下,每当项目包含在操作中时,过期计时器都会在读取操作(get/search)和写入操作(put/update)时刷新。

注意

索引支持取决于您的存储实现。如果您在初始化存储时未启用索引功能,则 index 参数将被忽略。

同样,TTL 支持取决于特定的存储实现。某些实现可能不支持项目过期。

示例

存储项目。索引取决于您如何配置存储。

await store.aput(("docs",), "report", {"memory": "Will likes ai"})

不要为语义搜索索引项目。仍然可以通过 get() 和 search() 操作访问,但不会有向量表示。

await store.aput(("docs",), "report", {"memory": "Will likes ai"}, index=False)

为搜索索引特定字段(如果存储配置为索引项目)

await store.aput(
    ("docs",),
    "report",
    {
        "memory": "Will likes ai",
        "context": [{"content": "..."}, {"content": "..."}]
    },
    index=["memory", "context[*].content"]
)

adelete(namespace: tuple[str, ...], key: str) -> None 异步

异步删除项目。

参数

  • namespace (tuple[str, ...]) –

    项目的分层路径。

  • key (str) –

    命名空间内的唯一标识符。

alist_namespaces(*, prefix: Optional[NamespacePath] = None, suffix: Optional[NamespacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]] 异步

异步列出和过滤存储中的命名空间。

用于探索数据的组织结构、查找特定集合或导航命名空间层次结构。

参数

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径开头的命名空间。

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    过滤以该路径结尾的命名空间。

  • max_depth (Optional[int], default: None ) –

    返回层级结构中达到此深度的命名空间。深度超过此级别的命名空间将被截断到此深度。

  • limit (int, default: 100 ) –

    要返回的最大命名空间数(默认为 100)。

  • offset (int, default: 0 ) –

    用于分页的要跳过的命名空间数(默认为 0)。

返回

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]:与条件匹配的命名空间元组列表。

  • list[tuple[str, ...]]

    每个元组表示达到 max_depth 的完整命名空间路径。

示例

使用现有命名空间设置 max_depth=3

# Given the following namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")

await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
# Returns: [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]

from_conn_string(conn_string: str, *, pipeline: bool = False, pool_config: Optional[PoolConfig] = None, index: Optional[PostgresIndexConfig] = None, ttl: Optional[TTLConfig] = None) -> Iterator[PostgresStore] 类方法

从连接字符串创建一个新的 PostgresStore 实例。

参数

  • conn_string (str) –

    Postgres 连接信息字符串。

  • pipeline (bool, default: False ) –

    是否使用 Pipeline

  • pool_config (Optional[PoolArgs], 默认值: None ) –

    连接池的配置。如果提供,将创建一个连接池并使用它代替单个连接。这会覆盖 pipeline 参数。

  • index (Optional[PostgresIndexConfig], default: None ) –

    商店的索引配置。

返回

  • PostgresStore ( Iterator[PostgresStore] ) –

    一个新的 PostgresStore 实例。

sweep_ttl() -> int

根据 TTL 删除过期的存储项目。

返回

  • int ( int ) –

    已删除项目的数量。

start_ttl_sweeper(sweep_interval_minutes: Optional[int] = None) -> concurrent.futures.Future[None]

定期根据 TTL 删除过期的存储项目。

返回

  • Future[None]

    可以等待或取消的 Future。

stop_ttl_sweeper(timeout: Optional[float] = None) -> bool

如果 TTL 清理器线程正在运行,则停止它。

参数

  • timeout (Optional[float], default: None ) –

    等待线程停止的最长时间,以秒为单位。如果为 None,则无限期等待。

返回

  • bool ( bool ) –

    如果线程成功停止或未运行,则为 True;如果线程停止前达到超时时间,则为 False。

__del__() -> None

确保在对象被垃圾回收时停止 TTL 清理器线程。

setup() -> None

设置商店数据库。

此方法在 Postgres 数据库中创建必要的表(如果它们尚不存在)并运行数据库迁移。首次使用存储时,必须由用户直接调用它。

评论