类 Pregel<Nodes, Channels, ContextType, InputType, OutputType, StreamUpdatesType, StreamValuesType>

Pregel 类是 LangGraph 的核心运行时引擎,它实现了一种受谷歌 Pregel 系统启发的、基于消息传递的图计算模型。它为构建可靠、可控、能够随时间演化状态的 Agent 工作流提供了基础。

主要特性

  • 节点之间在离散的“超步”(supersteps)中进行消息传递
  • 通过检查点(checkpointers)实现内置的持久化层
  • 对值、更新和事件提供一流的流式支持
  • 通过中断(interrupts)实现人机回圈(human-in-the-loop)功能
  • 支持在超步内并行执行节点

Pregel 类不建议由消费者直接实例化。请使用以下更高级别的 API

  • StateGraph:用于构建 Agent 工作流的主要图类
  • 函数式 API:一种使用任务(tasks)和入口点(entrypoints)的声明式方法

示例

// Using StateGraph API
const graph = new StateGraph(annotation)
.addNode("nodeA", myNodeFunction)
.addEdge("nodeA", "nodeB")
.compile();

// The compiled graph is a Pregel instance
const result = await graph.invoke(input);

示例

// Using Functional API
import { task, entrypoint } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint";

// Define tasks that can be composed
const addOne = task("add", async (x: number) => x + 1);

// Create a workflow using the entrypoint function
const workflow = entrypoint({
name: "workflow",
checkpointer: new MemorySaver()
}, async (numbers: number[]) => {
// Tasks can be run in parallel
const results = await Promise.all(numbers.map(n => addOne(n)));
return results;
});

// The workflow is a Pregel instance
const result = await workflow.invoke([1, 2, 3]); // Returns [2, 3, 4]

类型参数

  • Nodes extends StrRecord<string, PregelNode>

    节点名称到其 PregelNode 实现的映射

  • Channels extends StrRecord<string, BaseChannel>

    通道名称到其 BaseChannel 或 ManagedValueSpec 实现的映射

  • ContextType extends Record<string, any> = StrRecord<string, any>

    可传递给图的上下文类型

  • InputType = PregelInputType

    图接受的输入值类型

  • OutputType = PregelOutputType

    图产生的输出值类型

  • StreamUpdatesType = InputType
  • StreamValuesType = OutputType

层级结构 (查看全部)

实现 (Implements)

属性

自动验证: boolean

编译时是否自动验证图结构。默认为 true。

cache?: BaseCache<unknown>

图的可选缓存,对缓存任务很有用。

channels: Channels

图中的通道,将通道名称映射到其 BaseChannel 或 ManagedValueSpec 实例。

checkpointer?: boolean | BaseCheckpointSaver<number>

用于持久化图状态的可选检查点。如果提供,则在每个超级步保存图状态的检查点。如果为 false 或未定义,则禁用检查点,图将无法保存或恢复状态。

config?: LangGraphRunnableConfig<Record<string, any>>

图执行的默认配置,可以按每次调用进行覆盖。

调试: boolean

是否启用调试日志。默认为 false。

inputChannels: keyof Channels | (keyof Channels)[]

图的输入通道。当图被调用时,这些通道接收初始输入。可以是单个通道键或通道键数组。

interruptAfter?: "*" | (keyof Nodes)[]

可选的节点名称数组或“all”,用于在执行这些节点后中断。用于实现人机协作工作流。

interruptBefore?: "*" | (keyof Nodes)[]

可选的节点名称数组或“all”,用于在执行这些节点之前中断。用于实现人机协作工作流。

lc_kwargs: SerializedFields
lc_runnable: boolean
lc_serializable: boolean
name?: string
nodes: Nodes

图中的节点,将节点名称映射到其 PregelNode 实例。

outputChannels: keyof Channels | (keyof Channels)[]

图的输出通道。当图完成时,这些通道包含最终输出。可以是单个通道键或通道键数组。

retryPolicy?: RetryPolicy

用于处理节点执行失败的可选重试策略。

步骤超时?: number

每个超级步执行的可选超时时间(毫秒)。

存储?: BaseStore

图的可选长期内存存储,允许跨线程持久化和检索数据。

streamChannels?: keyof Channels | (keyof Channels)[]

可选的流式通道。如果未指定,将流式传输所有通道。可以是单个通道键或通道键数组。

流模式: StreamMode[]

此图启用的流模式。默认为 ["values"]。支持的模式有:

  • "values":在每个步骤后流式传输完整状态
  • "updates":在每个步骤后流式传输状态更新
  • "messages":流式传输节点内部的消息
  • "custom":流式传输节点内部的自定义事件
  • "debug":流式传输与图执行相关的事件 - 有助于跟踪和调试图执行
triggerToNodes: Record<string, string[]>

访问器

  • get lc_aliases(): undefined | {
        [key: string]: string;
    }
  • 构造函数参数的别名映射。键是属性名,例如 "foo"。值是序列化时将替换该键的别名。这用于例如使参数名与 Python 匹配。

    返回 undefined | {
        [key: string]: string;
    }

  • get lc_attributes(): undefined | SerializedFields
  • 与构造函数参数合并的附加属性映射。键是属性名,例如 "foo"。值是属性值,它们将被序列化。这些属性需要被构造函数接受为参数。

    返回 undefined | SerializedFields

  • get lc_id(): string[]
  • 模块的最终序列化标识符。

    返回 string[]

  • get lc_secrets(): undefined | {
        [key: string]: string;
    }
  • 秘密映射,将从序列化中省略。键是构造函数参数中秘密的路径,例如 "foo.bar.baz"。值是秘密 ID,在反序列化时将使用。

    返回 undefined | {
        [key: string]: string;
    }

  • get lc_serializable_keys(): undefined | string[]
  • 应该序列化的键的手动列表。如果未被覆盖,所有传递给构造函数的字段都将被序列化。

    返回 undefined | string[]

  • get streamChannelsAsIs(): keyof Channels | (keyof Channels)[]
  • 获取以原始格式流式传输的通道。如果指定了 streamChannels,则按原样返回(单个键或数组)。否则,将图中所有通道作为数组返回。

    返回 keyof Channels | (keyof Channels)[]

    要流式传输的通道键,可以是单个键或数组。

  • get streamChannelsList(): (keyof Channels)[]
  • 获取所有应流式传输的通道列表。如果指定了 streamChannels,则返回这些通道。否则,返回图中所有通道。

    返回 (keyof Channels)[]

    要流式传输的通道键数组。

方法

  • 处理可运行对象的批量处理和配置的内部方法。它接受一个函数、输入值和可选配置,并返回一个解析为输出值的 Promise。

    类型参数

    • T

    参数

    返回 Promise<(Error | OutputType)[]>

    一个解析为输出值的 Promise。

  • 类型参数

    参数

    • options: Partial<O> | Partial<O>[]
    • 可选 length: number

    返回 Partial<O>[]

  • 参数

    返回 AsyncGenerator<RunLogPatch, any, any>

  • 辅助方法,用于将输入值迭代器转换为输出值迭代器,并带有回调。在 Runnable 子类中,使用此方法实现 stream()transform()

    类型参数

    • I
    • O

    参数

    返回 AsyncGenerator<O, any, any>

  • 将可运行对象转换为工具。返回 RunnableToolLike 的新实例,其中包含可运行对象、名称、描述和模式。

    类型参数

    参数

    • fields: {
          description?: string;
          name?: string;
          schema: InteropZodType<T>;
      }
      • 可选 description?: string

        工具的描述。如果未提供,则回退到 Zod 模式上的描述;如果两者都未提供,则为 undefined。

      • 可选 name?: string

        工具的名称。如果未提供,则默认为可运行对象的名称。

      • schema: InteropZodType<T>

        工具输入的 Zod 模式。从可运行对象的输入类型推断 Zod 类型。

    返回 RunnableToolLike<InteropZodType<ToolCall | T>, OutputType>

    RunnableToolLike 的一个实例,它是一个可用作工具的可运行对象。

  • 将新字段分配给此可运行对象的字典输出。返回一个新的可运行对象。

    参数

    • mapping: RunnableMapLike<Record<string, unknown>, Record<string, unknown>>

    返回 Runnable<any, any, RunnableConfig<Record<string, any>>>

  • 返回 Promise<void>

  • 参数

    • 可选 _: RunnableConfig<Record<string, any>>

    返回 Graph

  • 获取图结构的可绘制表示。这是 getGraph() 的异步版本,是首选使用的方法。

    参数

    • config: RunnableConfig<Record<string, any>>

      用于生成图可视化的配置

    返回 Promise<Graph>

    可以可视化的图的表示

  • 参数

    • 可选 suffix: string

    返回 string

  • 获取图的当前状态。需要配置检查点。

    参数

    • config: RunnableConfig<Record<string, any>>

      用于检索状态的配置

    • 可选 options: GetStateOptions

      其他选项

    返回 Promise<StateSnapshot>

    当前图状态的快照

    抛出

    如果未配置检查点

  • 获取图状态的历史记录。需要配置检查点。可用于

    • 调试执行历史
    • 实现时间旅行
    • 分析图行为

    参数

    • config: RunnableConfig<Record<string, any>>

      用于检索历史记录的配置

    • 可选 options: CheckpointListOptions

      用于过滤历史记录的选项

    返回 AsyncIterableIterator<StateSnapshot, any, any>

    状态快照的异步迭代器

    抛出

    如果未配置检查点

  • 获取此图中的所有子图。子图是嵌套在此图节点中的 Pregel 实例。

    参数

    • 可选 namespace: string

      用于过滤子图的可选命名空间

    • 可选 recurse: boolean

      是否递归获取子图的子图

    返回 Generator<[string, Pregel<any, any, StrRecord<string, any>, any, any, any, any>], any, any>

    生成器,生成 [名称, 子图] 的元组

    已弃用

    请改用 getSubgraphsAsync。异步方法将在下一个小版本中成为默认。

  • 异步获取此图中的所有子图。子图是嵌套在此图节点中的 Pregel 实例。

    参数

    • 可选 namespace: string

      用于过滤子图的可选命名空间

    • 可选 recurse: boolean

      是否递归获取子图的子图

    返回 AsyncGenerator<[string, Pregel<any, any, StrRecord<string, any>, any, any, any, any>], any, any>

    生成 [名称, 子图] 元组的异步生成器

  • 从此可运行对象的字典输出中选取键。返回一个新的可运行对象。

    参数

    • keys: string | string[]

    返回 Runnable<any, any, RunnableConfig<Record<string, any>>>

  • 创建一个新的可运行序列,该序列按顺序运行每个独立的 runnable,将一个 runnable 的输出导入到另一个 runnable 或类似 runnable 的对象中。

    类型参数

    • NewRunOutput

    参数

    • coerceable: RunnableLike<OutputType, NewRunOutput, RunnableConfig<Record<string, any>>>

      一个 runnable、函数或其值为函数或 runnable 的对象。

    返回 Runnable<null | CommandInstance | InputType, Exclude<NewRunOutput, Error>, RunnableConfig<Record<string, any>>>

    一个新的可运行序列。

  • 参数

    返回 IterableReadableStream<StreamEvent>

    继承文档

  • 参数

    返回 IterableReadableStream<Uint8Array<ArrayBufferLike>>

  • 以流的形式输出 runnable 的所有输出,正如向回调系统报告的那样。这包括 LLM、检索器、工具等的所有内部运行。输出以 Log 对象的形式流式传输,其中包括一个 jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。可以按顺序应用 jsonpatch 操作来构建状态。

    参数

    返回 AsyncGenerator<RunLogPatch, any, any>

  • 返回 Serialized

  • 返回 SerializedNotImplemented

  • transform 的默认实现,它会缓冲输入,然后调用 stream。如果子类可以在输入仍在生成时开始产生输出,则应覆盖此方法。

    参数

    返回 AsyncGenerator<OutputType, any, any>

  • 使用新值更新图的状态。需要配置检查点器。

    此方法可用于

    • 实现人工干预工作流
    • 在断点期间修改图状态
    • 将外部输入集成到图中

    参数

    • inputConfig: LangGraphRunnableConfig<Record<string, any>>

      更新配置

    • values: unknown

      用于更新状态的值

    • 可选 asNode: string | keyof Nodes

      可选的节点名称,用于归因更新

    返回 Promise<RunnableConfig<Record<string, any>>>

    已更新的配置

    抛出

    如果未配置检查点

    抛出

    如果更新无法归因于某个节点

  • 验证图结构以确保其格式正确。检查以下项:

    • 无孤立节点
    • 有效的输入/输出通道配置
    • 有效的ად中断配置

    返回 this

    this - 用于方法链的 Pregel 实例

    抛出

    如果图结构无效

  • 创建具有更新配置的 Pregel 图的新实例。此方法遵循不可变模式 - 它不修改当前实例,而是返回一个包含合并配置的新实例。

    参数

    • config: RunnableConfig<Record<string, any>>

      要与当前配置合并的配置

    返回 Pregel<Nodes, Channels, ContextType, InputType, OutputType, StreamUpdatesType, StreamValuesType>

    包含合并配置的新 Pregel 实例

    示例

    // Create a new instance with debug enabled
    const debugGraph = graph.withConfig({ debug: true });

    // Create a new instance with a specific thread ID
    const threadGraph = graph.withConfig({
    configurable: { thread_id: "123" }
    });
  • 从当前 runnable 创建一个新 runnable,如果初始调用失败,它将尝试调用其他传入的备用 runnable。

    参数

    • fields: {
          fallbacks: Runnable<null | CommandInstance | InputType, OutputType, RunnableConfig<Record<string, any>>>[];
      } | Runnable<null | CommandInstance | InputType, OutputType, RunnableConfig<Record<string, any>>>[]

    返回 RunnableWithFallbacks<null | CommandInstance | InputType, OutputType>

    一个新的 RunnableWithFallbacks。

  • 将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。Run 对象包含关于运行的信息,包括其 id、类型、输入、输出、错误、启动时间、结束时间以及添加到运行中的任何标签或元数据。

    参数

    • params: {
          onEnd?: ((run, config?) => void | Promise<void>);
          onError?: ((run, config?) => void | Promise<void>);
          onStart?: ((run, config?) => void | Promise<void>);
      }

      包含回调函数的对象。

      • 可选 onEnd?: ((run, config?) => void | Promise<void>)

        在 runnable 运行结束后调用,传入 Run 对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

      • 可选 onError?: ((run, config?) => void | Promise<void>)

        如果 runnable 抛出错误,则调用此函数,传入 Run 对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

      • 可选 onStart?: ((run, config?) => void | Promise<void>)

        在 runnable 开始运行前调用,传入 Run 对象。

          • (run, config?): void | Promise<void>
          • 参数

            • run: Run
            • 可选 config: RunnableConfig<Record<string, any>>

            返回 void | Promise<void>

    返回 Runnable<null | CommandInstance | InputType, OutputType, PregelOptions<Nodes, Channels, ContextType, undefined | StreamMode | StreamMode[], boolean>>

  • 为现有 runnable 添加重试逻辑。

    参数

    • 可选 fields: {
          onFailedAttempt?: RunnableRetryFailedAttemptHandler;
          stopAfterAttempt?: number;
      }
      • 可选 onFailedAttempt?: RunnableRetryFailedAttemptHandler

        重试失败时调用的函数。

      • 可选 stopAfterAttempt?: number

        重试的尝试次数。

    返回 RunnableRetry<null | CommandInstance | InputType, OutputType, PregelOptions<Nodes, Channels, ContextType, undefined | StreamMode | StreamMode[], boolean>>

    一个新的 RunnableRetry,当被调用时将根据参数进行重试。

  • 参数

    • thing: any

    返回 thing is Runnable<any, any, RunnableConfig<Record<string, any>>>