跳到内容

函数式 API

名称 描述
entrypoint

使用 entrypoint 装饰器定义一个 LangGraph 工作流。

函数

名称 描述
task

使用 task 装饰器定义一个 LangGraph 任务。

entrypoint

基类:Generic[ContextT]

使用 entrypoint 装饰器定义一个 LangGraph 工作流。

函数签名

被装饰的函数必须接受一个**单一参数**,作为函数的输入。这个输入参数可以是任何类型。使用字典来向函数传递**多个参数**。

可注入参数

被装饰的函数可以请求访问将在运行时自动注入的额外参数。这些参数包括:

参数 描述
config 一个配置对象(即可运行配置 RunnableConfig),持有运行时的配置值。
previous 给定线程的上一个返回值(仅在提供了检查点时可用)。
runtime 一个 Runtime 对象,包含当前运行的信息,包括 context、store、writer。

entrypoint 装饰器可以应用于同步函数或异步函数。

状态管理

previous 参数可用于访问在同一线程 ID 上对 entrypoint 的上一次调用的返回值。此值仅在提供了检查点时可用。

如果您希望 previous 与返回值不同,您可以使用 entrypoint.final 对象来返回一个值,同时将另一个不同的值保存到检查点。

参数

名称 类型 描述 默认值
checkpointer BaseCheckpointSaver | None

指定一个检查点来创建一个可以在多次运行之间持久化其状态的工作流。

None
store BaseStore | None

一个通用的键值存储。一些实现可能通过可选的 index 配置支持语义搜索功能。

None
cache BaseCache | None

用于缓存工作流结果的缓存。

None
context_schema type[ContextT] | None

指定将传递给工作流的上下文对象的模式。

None
cache_policy CachePolicy | None

用于缓存工作流结果的缓存策略。

None
retry_policy RetryPolicy | Sequence[RetryPolicy] | None

在工作流失败时使用的重试策略(或策略列表)。

None

config_schema 已弃用

config_schema 参数在 v0.6.0 版本中已弃用,并将在 v2.0.0 版本中移除支持。请改用 context_schema 来指定运行范围内的上下文模式。

使用 entrypoint 和 tasks
import time

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def compose_essay(topic: str) -> str:
    time.sleep(1.0)  # Simulate slow operation
    return f"An essay about {topic}"

@entrypoint(checkpointer=InMemorySaver())
def review_workflow(topic: str) -> dict:
    """Manages the workflow for generating and reviewing an essay.

    The workflow includes:
    1. Generating an essay about the given topic.
    2. Interrupting the workflow for human review of the generated essay.

    Upon resuming the workflow, compose_essay task will not be re-executed
    as its result is cached by the checkpointer.

    Args:
        topic: The subject of the essay.

    Returns:
        dict: A dictionary containing the generated essay and the human review.
    """
    essay_future = compose_essay(topic)
    essay = essay_future.result()
    human_review = interrupt({
        "question": "Please provide a review",
        "essay": essay
    })
    return {
        "essay": essay,
        "review": human_review,
    }

# Example configuration for the workflow
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Topic for the essay
topic = "cats"

# Stream the workflow to generate the essay and await human review
for result in review_workflow.stream(topic, config):
    print(result)

# Example human review provided after the interrupt
human_review = "This essay is great."

# Resume the workflow with the provided human review
for result in review_workflow.stream(Command(resume=human_review), config):
    print(result)
访问上一个返回值

当启用检查点时,函数可以访问在同一线程 ID 上对上一次调用的返回值。

from typing import Optional

from langgraph.checkpoint.memory import MemorySaver

from langgraph.func import entrypoint

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(input_data: str, previous: Optional[str] = None) -> str:
    return "world"

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}
my_workflow.invoke("hello", config)
使用 entrypoint.final 保存一个值

entrypoint.final 对象允许您返回一个值,同时将另一个不同的值保存到检查点。只要使用相同的线程 ID,这个值就可以在下一次调用 entrypoint 时通过 previous 参数访问。

from typing import Any

from langgraph.checkpoint.memory import MemorySaver

from langgraph.func import entrypoint

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # This will return the previous value to the caller, saving
    # 2 * number to the checkpoint, which will be used in the next invocation
    # for the `previous` parameter.
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

my_workflow.invoke(3, config)  # 0 (previous was None)
my_workflow.invoke(1, config)  # 6 (previous was 3 * 2 from the previous invocation)

名称 描述
final

一个可以从 entrypoint 返回的原始类型。

方法

名称 描述
__init__

初始化 entrypoint 装饰器。

__call__

将一个函数转换为 Pregel 图。

final dataclass

基类:Generic[R, S]

一个可以从 entrypoint 返回的原始类型。

这个原始类型允许保存一个与 entrypoint 返回值不同的值到检查点。

解耦返回值和保存值
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # This will return the previous value to the caller, saving
    # 2 * number to the checkpoint, which will be used in the next invocation
    # for the `previous` parameter.
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "1"
    }
}

my_workflow.invoke(3, config)  # 0 (previous was None)
my_workflow.invoke(1, config)  # 6 (previous was 3 * 2 from the previous invocation)

属性

名称 类型 描述
R

要返回的值。即使值为 None,也总会返回一个值。

save S

下一个检查点的状态值。

value instance-attribute

value: R

要返回的值。即使值为 None,也总会返回一个值。

save instance-attribute

save: S

下一个检查点的状态值。

即使值为 None,也总会保存一个值。

__init__

__init__(
    checkpointer: BaseCheckpointSaver | None = None,
    store: BaseStore | None = None,
    cache: BaseCache | None = None,
    context_schema: type[ContextT] | None = None,
    cache_policy: CachePolicy | None = None,
    retry_policy: (
        RetryPolicy | Sequence[RetryPolicy] | None
    ) = None,
    **kwargs: Unpack[DeprecatedKwargs]
) -> None

初始化 entrypoint 装饰器。

__call__

__call__(func: Callable[..., Any]) -> Pregel

将一个函数转换为 Pregel 图。

参数

名称 类型 描述 默认值
func Callable[..., Any]

要转换的函数。支持同步和异步函数。

必填

返回

类型 描述
Pregel

一个 Pregel 图。

task

task(
    __func_or_none__: (
        Callable[P, Awaitable[T]]
        | Callable[P, T]
        | None
    ) = None,
    *,
    name: str | None = None,
    retry_policy: (
        RetryPolicy | Sequence[RetryPolicy] | None
    ) = None,
    cache_policy: (
        CachePolicy[Callable[P, str | bytes]] | None
    ) = None,
    **kwargs: Unpack[DeprecatedKwargs]
) -> (
    Callable[
        [Callable[P, Awaitable[T]] | Callable[P, T]],
        _TaskFunction[P, T],
    ]
    | _TaskFunction[P, T]
)

使用 task 装饰器定义一个 LangGraph 任务。

对于异步函数,需要 Python 3.11 或更高版本

task 装饰器支持同步和异步函数。要使用异步函数,请确保您使用的是 Python 3.11 或更高版本。

任务只能在 entrypoint 或 StateGraph 内部调用。一个任务可以像普通函数一样调用,但有以下区别:

  • 当启用检查点时,函数的输入和输出必须是可序列化的。
  • 被装饰的函数只能在 entrypoint 或 StateGraph 内部调用。
  • 调用该函数会产生一个 future。这使得并行化任务变得容易。

参数

名称 类型 描述 默认值
name str | None

任务的可选名称。如果未提供,将使用函数名。

None
retry_policy RetryPolicy | Sequence[RetryPolicy] | None

在任务失败时使用的可选重试策略(或策略列表)。

None
cache_policy CachePolicy[Callable[P, str | bytes]] | None

用于任务的可选缓存策略。这允许缓存任务结果。

None

返回

类型 描述
Callable[[Callable[P, Awaitable[T]] | Callable[P, T]], _TaskFunction[P, T]] | _TaskFunction[P, T]

当用作装饰器时是一个可调用函数。

同步任务
from langgraph.func import entrypoint, task

@task
def add_one(a: int) -> int:
    return a + 1

@entrypoint()
def add_one(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    results = [f.result() for f in futures]
    return results

# Call the entrypoint
add_one.invoke([1, 2, 3])  # Returns [2, 3, 4]
异步任务
import asyncio
from langgraph.func import entrypoint, task

@task
async def add_one(a: int) -> int:
    return a + 1

@entrypoint()
async def add_one(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    return asyncio.gather(*futures)

# Call the entrypoint
await add_one.ainvoke([1, 2, 3])  # Returns [2, 3, 4]