类 CompiledGraph<N, State, Update, ConfigurableFieldType, InputType, OutputType>

Pregel 类是 LangGraph 的核心运行时引擎,它实现了一种受 Google 的 Pregel 系统 启发的基于消息传递的图计算模型。它为构建可靠、可控且可以随时间演化状态的代理工作流提供了基础。

主要特性

  • 在离散的“超步”中节点之间传递消息
  • 通过检查点程序实现的内置持久层
  • 对值、更新和事件的一流流式处理支持
  • 通过中断实现的人工参与 (Human-in-the-loop) 能力
  • 支持在超步内并行执行节点

Pregel 类不应由消费者直接实例化。请改用以下更高级别的 API

  • StateGraph:用于构建代理工作流的主要图类
  • 函数式 API:一种使用任务和入口点的声明式方法

示例

// Using StateGraph API
const graph = new StateGraph(annotation)
.addNode("nodeA", myNodeFunction)
.addEdge("nodeA", "nodeB")
.compile();

// The compiled graph is a Pregel instance
const result = await graph.invoke(input);

示例

// Using Functional API
import { task, entrypoint } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";

// Define tasks that can be composed
const addOne = task("add", async (x: number) => x + 1);

// Create a workflow using the entrypoint function
const workflow = entrypoint({
name: "workflow",
checkpointer: new MemorySaver()
}, async (numbers: number[]) => {
// Tasks can be run in parallel
const results = await Promise.all(numbers.map(n => addOne(n)));
return results;
});

// The workflow is a Pregel instance
const result = await workflow.invoke([1, 2, 3]); // Returns [2, 3, 4]

类型参数

  • N extends string

    节点名称到其 PregelNode 实现的映射

  • State = any

    通道名称到其 BaseChannelManagedValueSpec 实现的映射

  • Update = any

    可以传递给图的可配置字段的类型

  • ConfigurableFieldType extends Record<string, any> = Record<string, any>

    图接受的输入值的类型

  • InputType = any

    图产生的输出值的类型

  • OutputType = any

继承关系 (查看完整)

构造函数

属性

NodeType: N
RunInput: State
RunOutput: Update
autoValidate: boolean

是否在编译时自动验证图结构。默认为 true。

builder: Graph<N, State, Update, NodeSpec<State, Update>, StateDefinition>
channels: Record<string | N, BaseChannel<unknown, unknown, unknown>>

图中的通道,将通道名称映射到其 BaseChannel 或 ManagedValueSpec 实例

checkpointer?: false | BaseCheckpointSaver<number>

用于持久化图状态的可选检查点程序。如果提供,则在每个超步保存图状态的检查点。当为 false 或未定义时,检查点功能将被禁用,并且图将无法保存或恢复状态。

config?: LangGraphRunnableConfig<Record<string, any>>

图执行的默认配置,可以在每次调用时被覆盖

debug: boolean

是否启用调试日志记录。默认为 false。

inputChannels: string | N | (string | N)[]

图的输入通道。这些通道在图被调用时接收初始输入。可以是单个通道键或通道键数组。

interruptAfter?: "*" | ("__start__" | N)[]

可选的节点名称数组或 “all”,用于在执行这些节点后中断。用于实现人工参与 (human-in-the-loop) 工作流。

interruptBefore?: "*" | ("__start__" | N)[]

可选的节点名称数组或 “all”,用于在执行这些节点之前中断。用于实现人工参与 (human-in-the-loop) 工作流。

lc_kwargs: SerializedFields
lc_runnable: boolean
lc_serializable: boolean
name?: string
nodes: Record<"__start__" | N, PregelNode<State, Update>>

图中的节点,将节点名称映射到其 PregelNode 实例

outputChannels: string | N | (string | N)[]

图的输出通道。这些通道包含图完成时的最终输出。可以是单个通道键或通道键数组。

retryPolicy?: RetryPolicy

用于处理节点执行失败的可选重试策略

stepTimeout?: number

每个超步执行的可选超时时间,单位为毫秒

store?: BaseStore

图的可选长期内存存储,允许跨线程持久化和检索数据

streamChannels?: string | N | (string | N)[]

可选的流通道。如果未指定,将流式传输所有通道。可以是单个通道键或通道键数组。

streamMode: StreamMode[]

为此图启用的流式传输模式。默认为 ["values"]。支持的模式包括:

  • "values": 流式传输每个步骤后的完整状态
  • "updates": 流式传输每个步骤后的状态更新
  • "messages": 流式传输来自节点内部的消息
  • "custom": 流式传输来自节点内部的自定义事件
  • "debug": 流式传输与图的执行相关的事件 - 对跟踪和调试图执行非常有用

访问器

  • get lc_aliases(): undefined | {
        [key: string]: string;
    }
  • 构造函数参数别名映射。键是属性名称,例如 "foo"。值是将替换序列化中键的别名。这用于例如使参数名称与 Python 匹配。

    返回 undefined | {
        [key: string]: string;
    }

  • get lc_attributes(): undefined | SerializedFields
  • 要与构造函数参数合并的其他属性的映射。键是属性名称,例如 "foo"。值是将被序列化的属性值。这些属性需要被构造函数接受为参数。

    返回 undefined | SerializedFields

  • get lc_id(): string[]
  • 模块的最终序列化标识符。

    返回 string[]

  • get lc_secrets(): undefined | {
        [key: string]: string;
    }
  • 密钥映射,将从序列化中省略。键是构造函数参数中密钥的路径,例如 "foo.bar.baz"。值是密钥 ID,将在反序列化时使用。

    返回 undefined | {
        [key: string]: string;
    }

  • get lc_serializable_keys(): undefined | string[]
  • 应序列化的键的手动列表。如果未重写,则将序列化传递给构造函数的所有字段。

    返回 undefined | string[]

  • get streamChannelsAsIs(): keyof Channels | (keyof Channels)[]
  • 以原始格式获取要流式传输的通道。如果指定了 streamChannels,则按原样返回(单个键或数组)。否则,将图中所有通道作为数组返回。

    返回 keyof Channels | (keyof Channels)[]

    要流式传输的通道键,可以是单个键或数组

  • get streamChannelsList(): (keyof Channels)[]
  • 获取应流式传输的所有通道的列表。如果指定了 streamChannels,则返回这些通道。否则,返回图中所有通道。

    返回 (keyof Channels)[]

    要流式传输的通道键数组

方法

  • 处理可运行对象的批处理和配置的内部方法。它接受一个函数、输入值和可选配置,并返回一个 Promise,该 Promise 解析为输出值。

    类型参数

    • T

    参数

    返回 Promise<(Error | OutputType)[]>

    解析为输出值的 Promise。

  • 参数

    返回值 AsyncGenerator<RunLogPatch, any, unknown>

  • 辅助方法,用于将输入值的迭代器转换为输出值的迭代器,并带有回调。在 Runnable 子类中使用此方法来实现 stream()transform()

    类型参数

    • I
    • O

    参数

    • inputGenerator: AsyncGenerator<I, any, unknown>
    • transformer: ((generator, runManager?, options?) => AsyncGenerator<O, any, unknown>)
        • (generator, runManager?, options?): AsyncGenerator<O, any, unknown>
        • 参数

          返回值 AsyncGenerator<O, any, unknown>

    • 可选 options: Partial<PregelOptions<Record<"__start__" | N, PregelNode<State, Update>>, Record<string | N, BaseChannel<unknown, unknown, unknown>>, ConfigurableFieldType & Record<string, any>>> & {
          runType?: string;
      }

    返回值 AsyncGenerator<O, any, unknown>

  • 将 runnable 转换为工具。返回 RunnableToolLike 的新实例,其中包含 runnable、名称、描述和模式。

    类型参数

    参数

    • fields: {
          description?: string;
          name?: string;
          schema: ZodType<T, ZodTypeDef, T>;
      }
      • Optional description?: string

        工具的描述。如果未提供,则回退到 Zod 模式上的描述;如果两者均未提供,则为未定义。

      • Optional name?: string

        工具的名称。如果未提供,则默认为 runnable 的名称。

      • schema: ZodType<T, ZodTypeDef, T>

        工具输入的 Zod 模式。从 runnable 的输入类型推断 Zod 类型。

    返回值 RunnableToolLike<ZodType<ToolCall | T, ZodTypeDef, ToolCall | T>, OutputType>

    RunnableToolLike 的一个实例,它是一个可以用作工具的 runnable。

  • 将新字段分配给此 runnable 的 dict 输出。返回一个新的 runnable。

    参数

    • mapping: RunnableMapLike<Record<string, unknown>, Record<string, unknown>>

    返回值 Runnable<any, any, RunnableConfig<Record<string, any>>>

  • 参数

    返回值 void

  • 参数

    • start: "__start__" | N
    • end: "__end__" | N

    返回值 void

  • 参数

    返回值 void

  • 返回计算图的可绘制表示形式。

    参数

    • Optional config: RunnableConfig<Record<string, any>> & {
          xray?: number | boolean;
      }

    返回值 Graph

    已弃用

    请改用 getGraphAsync。在下一个次要核心版本中,异步方法将成为默认方法。

  • 返回计算图的可绘制表示形式。

    参数

    • Optional config: RunnableConfig<Record<string, any>> & {
          xray?: number | boolean;
      }

    返回 Promise<Graph>

  • 参数

    • Optional suffix: string

    返回 string

  • 获取图的当前状态。需要配置检查点。

    参数

    • config: RunnableConfig<Record<string, any>>

      用于检索状态的配置

    • Optional options: GetStateOptions

      附加选项

    返回 Promise<StateSnapshot>

    当前图状态的快照

    抛出

    如果未配置检查点

  • 获取图状态的历史记录。需要配置检查点。用于

    • 调试执行历史记录
    • 实现时间旅行
    • 分析图行为

    参数

    • config: RunnableConfig<Record<string, any>>

      用于检索历史记录的配置

    • Optional options: CheckpointListOptions

      用于筛选历史记录的选项

    返回 AsyncIterableIterator<StateSnapshot>

    状态快照的异步迭代器

    抛出

    如果未配置检查点

  • 获取此图中的所有子图。子图是嵌套在此图节点内的 Pregel 实例。

    参数

    • Optional namespace: string

      用于筛选子图的可选命名空间

    • Optional recurse: boolean

      是否递归获取子图的子图

    返回 Generator<[string, Pregel<any, any, StrRecord<string, any>, any, any>], any, unknown>

    生成器,产生 [名称, 子图] 的元组

    已弃用

    请使用 getSubgraphsAsync 代替。异步方法将在下一个小版本中成为默认方法。

  • 异步获取此图中的所有子图。子图是嵌套在此图节点内的 Pregel 实例。

    参数

    • Optional namespace: string

      用于筛选子图的可选命名空间

    • Optional recurse: boolean

      是否递归获取子图的子图

    返回 AsyncGenerator<[string, Pregel<any, any, StrRecord<string, any>, any, any>], any, unknown>

    AsyncGenerator,产生 [名称, 子图] 的元组

  • 从这个 runnable 的字典输出中挑选键。返回一个新的 runnable。

    参数

    • keys: string | string[]

    返回值 Runnable<any, any, RunnableConfig<Record<string, any>>>

  • 创建一个新的 runnable 序列,该序列串行运行每个单独的 runnable,并将一个 runnable 的输出管道传输到另一个 runnable 或类似 runnable 的对象。

    类型参数

    • NewRunOutput

    参数

    • coerceable: RunnableLike<OutputType, NewRunOutput, RunnableConfig<Record<string, any>>>

      runnable、函数或对象,其值是函数或 runnables。

    返回 Runnable<null | Command<unknown> | InputType, Exclude<NewRunOutput, Error>, RunnableConfig<Record<string, any>>>

    一个新的 runnable 序列。

  • 流式传输图的执行过程,在状态更新发生时发出更新。这是实时观察图执行过程的主要方法。

    流模式

    • "values": 在每个步骤后发出完整状态
    • "updates": 在每个步骤后仅发出状态更改
    • "debug": 发出详细的调试信息
    • "messages": 发出来自节点内部的消息

    有关更多详细信息,请参阅 流式传输操作指南

    参数

    返回 Promise<IterableReadableStream<any>>

    图状态更新的异步可迭代流

  • 生成由 runnable 内部步骤发出的事件流。

    用于创建 StreamEvents 的迭代器,这些 StreamEvents 提供关于 runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

    StreamEvent 是一个具有以下模式的字典

    • event: string - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。
    • name: string - 生成事件的 runnable 的名称。
    • run_id: string - 随机生成的 ID,与发出事件的 runnable 的给定执行相关联。作为父 runnable 执行的一部分而调用的子 runnable 将被分配其自己的唯一 ID。
    • tags: string[] - 生成事件的 runnable 的标签。
    • metadata: Record<string, any> - 生成事件的 runnable 的元数据。
    • data: Record<string, any>

    下表说明了各种链可能发出的一些事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。

    注意 此参考表适用于 V2 版本的模式。

    +----------------------+-----------------------------+------------------------------------------+
    | event | input | output/chunk |
    +======================+=============================+==========================================+
    | on_chat_model_start | {"messages": BaseMessage[]} | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_chat_model_stream | | AIMessageChunk("hello") |
    +----------------------+-----------------------------+------------------------------------------+
    | on_chat_model_end | {"messages": BaseMessage[]} | AIMessageChunk("hello world") |
    +----------------------+-----------------------------+------------------------------------------+
    | on_llm_start | {'input': 'hello'} | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_llm_stream | | 'Hello' |
    +----------------------+-----------------------------+------------------------------------------+
    | on_llm_end | 'Hello human!' | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_chain_start | | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_chain_stream | | "hello world!" |
    +----------------------+-----------------------------+------------------------------------------+
    | on_chain_end | [Document(...)] | "hello world!, goodbye world!" |
    +----------------------+-----------------------------+------------------------------------------+
    | on_tool_start | {"x": 1, "y": "2"} | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_tool_end | | {"x": 1, "y": "2"} |
    +----------------------+-----------------------------+------------------------------------------+
    | on_retriever_start | {"query": "hello"} | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_retriever_end | {"query": "hello"} | [Document(...), ..] |
    +----------------------+-----------------------------+------------------------------------------+
    | on_prompt_start | {"question": "hello"} | |
    +----------------------+-----------------------------+------------------------------------------+
    | on_prompt_end | {"question": "hello"} | ChatPromptValue(messages: BaseMessage[]) |
    +----------------------+-----------------------------+------------------------------------------+

    "on_chain_*" 事件是默认事件,适用于不属于上述任何类别的 Runnables。

    除了上面的标准事件之外,用户还可以调度自定义事件。

    自定义事件将仅在 API 的 v2 版本中显示!

    自定义事件具有以下格式

    +-----------+------+------------------------------------------------------------+
    | Attribute | Type | Description |
    +===========+======+============================================================+
    | name | str | A user defined name for the event. |
    +-----------+------+------------------------------------------------------------+
    | data | Any | The data associated with the event. This can be anything. |
    +-----------+------+------------------------------------------------------------+

    这是一个例子

    import { RunnableLambda } from "@langchain/core/runnables";
    import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch";
    // Use this import for web environments that don't support "async_hooks"
    // and manually pass config to child runs.
    // import { dispatchCustomEvent } from "@langchain/core/callbacks/dispatch/web";

    const slowThing = RunnableLambda.from(async (someInput: string) => {
    // Placeholder for some slow operation
    await new Promise((resolve) => setTimeout(resolve, 100));
    await dispatchCustomEvent("progress_event", {
    message: "Finished step 1 of 2",
    });
    await new Promise((resolve) => setTimeout(resolve, 100));
    return "Done";
    });

    const eventStream = await slowThing.streamEvents("hello world", {
    version: "v2",
    });

    for await (const event of eventStream) {
    if (event.event === "on_custom_event") {
    console.log(event);
    }
    }

    参数

    返回 IterableReadableStream<StreamEvent>

  • 参数

    返回 IterableReadableStream<Uint8Array>

  • 流式传输来自 runnable 的所有输出,如同报告给回调系统一样。这包括 LLM、检索器、工具等的所有内部运行。输出以 Log 对象的形式流式传输,其中包括 jsonpatch 操作列表,这些操作描述了运行状态在每个步骤中的变化以及运行的最终状态。可以按顺序应用 jsonpatch 操作来构建状态。

    参数

    返回值 AsyncGenerator<RunLogPatch, any, unknown>

  • 返回 Serialized

  • 返回 SerializedNotImplemented

  • transform 的默认实现,它会缓冲输入,然后调用 stream。子类应该重写此方法,如果它们可以在输入仍在生成时开始生成输出。

    参数

    返回 AsyncGenerator<OutputType, any, unknown>

  • 使用新值更新图的状态。需要配置检查点。

    此方法可用于

    • 实现人机回路工作流
    • 在断点期间修改图状态
    • 将外部输入集成到图中

    参数

    • inputConfig: LangGraphRunnableConfig<Record<string, any>>

      更新的配置

    • values: unknown

      用于更新状态的值

    • 可选 asNode: string | N

      可选的节点名称,用于将更新归于该节点

    返回 Promise<RunnableConfig<Record<string, any>>>

    更新后的配置

    抛出

    如果未配置检查点

    抛出

    如果更新无法归于节点

  • 验证图结构以确保其结构良好。检查以下内容

    • 没有孤立节点
    • 有效的输入/输出通道配置
    • 有效的中断配置

    返回 this

    this - 用于方法链的 Pregel 实例

    抛出

    如果图结构无效

  • 创建一个具有更新配置的 Pregel 图的新实例。此方法遵循不可变模式 - 它不是修改当前实例,而是返回一个具有合并配置的新实例。

    参数

    • config: RunnableConfig<Record<string, any>>

      要与当前配置合并的配置

    返回 CompiledGraph<N, State, Update, ConfigurableFieldType, InputType, OutputType>

    具有合并配置的新 Pregel 实例

    示例

    // Create a new instance with debug enabled
    const debugGraph = graph.withConfig({ debug: true });

    // Create a new instance with a specific thread ID
    const threadGraph = graph.withConfig({
    configurable: { thread_id: "123" }
    });
  • 从当前 runnable 创建一个新的 runnable,如果初始调用失败,它将尝试调用其他传递的 fallback runnable。

    参数

    返回 RunnableWithFallbacks<null | Command<unknown> | InputType, OutputType>

    一个新的 RunnableWithFallbacks。

  • 将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。“Run”对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、startTime、endTime 以及添加到运行中的任何标签或元数据。

    参数

    • params: {
          onEnd?: ((run, config?) => void | Promise<void>);
          onError?: ((run, config?) => void | Promise<void>);
          onStart?: ((run, config?) => void | Promise<void>);
      }

      包含回调函数的对象。

      • 可选 onEnd?: ((run, config?) => void | Promise<void>)

        在 runnable 完成运行后调用,带有“Run”对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

      • 可选 onError?: ((run, config?) => void | Promise<void>)

        如果 runnable 抛出错误时调用,带有“Run”对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

      • 可选 onStart?: ((run, config?) => void | Promise<void>)

        在 runnable 开始运行之前调用,带有“Run”对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

    返回值 Runnable<null | Command<unknown> | InputType, OutputType, PregelOptions<Record<"__start__" | N, PregelNode<State, Update>>, Record<string | N, BaseChannel<unknown, unknown, unknown>>, ConfigurableFieldType & Record<string, any>>>

  • 参数

    • thing: any

    返回 thing is Runnable<any, any, RunnableConfig<Record<string, any>>>