跳到内容

函数式 API

注意

函数式 API 要求 @langchain/langgraph>=0.2.42

概述

通过**函数式 API**,您只需对现有代码进行最小更改,即可将 LangGraph 的核心功能——持久化内存人机协作流式传输——添加到您的应用程序中。

它旨在将这些功能集成到可能使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格执行模型的情况下整合这些能力。

**函数式 API** 使用两个关键构建块

  • **entrypoint** – **入口点 (entrypoint)** 是一个包装器,它将函数作为工作流的起点。它封装了工作流逻辑并管理执行流,包括处理*长时间运行的任务*和中断
  • **task** – 代表一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 Future 的对象,可以被等待或同步解析。

这为构建具有状态管理和流式传输功能的工作流提供了最小的抽象。

提示

对于更喜欢声明式方法的开发者,LangGraph 的图式 API (Graph API) 允许您使用图范式定义工作流。两种 API 共享相同的底层运行时,因此您可以在同一个应用程序中同时使用它们。有关这两种范式的比较,请参阅函数式 API 对比 图式 API 部分。

示例

下面我们演示一个简单的应用程序,它撰写一篇论文并中断以请求人工审查。

import { task, entrypoint, interrupt, MemorySaver } from "@langchain/langgraph";

const writeEssay = task("write_essay", (topic: string): string => {
  // A placeholder for a long-running task.
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);
详细说明

此工作流将撰写一篇关于主题“猫”的论文,然后暂停以获取人工审查。工作流可以无限期中断,直到提供审查为止。

当工作流恢复时,它会从头开始执行,但由于 writeEssay 任务的结果已经保存,任务结果将从检查点加载,而不是重新计算。

import { task, entrypoint, interrupt, MemorySaver, Command } from "@langchain/langgraph";

const writeEssay = task("write_essay", (topic: string): string => {
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      essay, // The essay we want reviewed.
      action: "Please approve/reject the essay",
    });

    return {
      essay,
      isApproved,
    };
  }
);

const threadId = crypto.randomUUID();

const config = {
  configurable: {
    thread_id: threadId,
  },
};

for await (const item of await workflow.stream("cat", config)) {
  console.log(item);
}
{ write_essay: 'An essay about topic: cat' }
{ __interrupt__: [{
  value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
  resumable: true,
  ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
  when: 'during'
}] }

论文已撰写完成,等待审查。一旦提供审查,我们就可以恢复工作流

// Get review from a user (e.g., via a UI)
// In this case, we're using a bool, but this can be any json-serializable value.
const humanReview = true;

for await (const item of await workflow.stream(new Command({ resume: humanReview }), config)) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }

工作流已完成,审查已添加到论文中。

入口点

entrypoint 函数可用于从函数创建工作流。它封装了工作流逻辑并管理执行流,包括处理*长时间运行的任务*和中断

定义

通过将函数传递给 entrypoint 函数来定义一个**入口点 (entrypoint)**。

该函数**必须接受一个位置参数**,作为工作流的输入。如果您需要传递多条数据,请使用对象作为第一个参数的输入类型。

您通常会希望将**检查点器 (checkpointer)** 传递给 entrypoint 函数,以启用持久化并使用**人机协作 (human-in-the-loop)** 等功能。

import { entrypoint, MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // some logic that may involve long-running tasks like API calls,
    // and may be interrupted for human-in-the-loop.
    return result;
  }
);

序列化

入口点的**输入**和**输出**必须是 JSON 可序列化的,以支持检查点。请参阅序列化部分了解更多详情。

可注入参数

声明 entrypoint 时,您可以使用 getPreviousState 函数和其他实用程序来访问在运行时自动注入的额外参数。这些参数包括

参数 描述
config 用于访问运行时配置。作为 entrypoint 函数的第二个参数自动填充(但不适用于 task,因为任务可以有可变数量的参数)。请参阅 RunnableConfig 了解信息。
config.store BaseStore 的一个实例。适用于长期记忆
config.writer 用于流式传输自定义数据的 writer。请参阅流式传输自定义数据指南
getPreviousState() 使用 getPreviousState 访问与给定线程的先前 checkpoint 相关联的状态。请参阅状态管理
请求可注入参数
import {
  entrypoint,
  getPreviousState,
  BaseStore,
  InMemoryStore,
} from "@langchain/langgraph";
import { RunnableConfig } from "@langchain/core/runnables";

const inMemoryStore = new InMemoryStore(...);  // An instance of InMemoryStore for long-term memory

const myWorkflow = entrypoint(
  {
    checkpointer,  // Specify the checkpointer
    store: inMemoryStore,  // Specify the store
    name: "myWorkflow",
  },
  async (someInput: Record<string, any>) => {
    const previous = getPreviousState<any>(); // For short-term memory
    // Rest of workflow logic...
  }
);

执行

使用 entrypoint 函数将返回一个对象,该对象可以使用 invokestream 方法执行。

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};
await myWorkflow.invoke(someInput, config);  // Wait for the result
const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

for await (const chunk of await myWorkflow.stream(someInput, config)) {
  console.log(chunk);
}

恢复

中断后恢复执行可以通过将**恢复**值传递给 Command 原语来完成。

import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

const stream = await myWorkflow.stream(
  new Command({ resume: someResumeValue }),
  config,
);

for await (const chunk of stream) {
  console.log(chunk);
}

暂时性错误后恢复

要在暂时性错误(例如模型提供商中断)后恢复,请使用 null 和相同的**线程 ID** (config) 运行 entrypoint

这假定底层**错误**已解决,并且执行可以成功进行。

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(null, config);
const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

for await (const chunk of await myWorkflow.stream(null, config)) {
  console.log(chunk);
}

状态管理

entrypointcheckpointer 一起定义时,它会在同一**线程 ID** 的连续调用之间将信息存储在检查点中。

这允许使用 getPreviousState 函数访问上一次调用的状态。

默认情况下,先前状态是上一次调用的返回值。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (number: number) => {
    const previous = getPreviousState<number>();
    return number + (previous ?? 0);
  }
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous was undefined)
await myWorkflow.invoke(2, config); // 3 (previous was 1 from the previous invocation)

entrypoint.final

entrypoint.final 是一个特殊的原语,可以从入口点返回,并允许**解耦****保存在检查点中的值**与**入口点的返回值**。

第一个值是入口点的返回值,第二个值是将保存在检查点中的值。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (number: number) => {
    const previous = getPreviousState<number>();
    // This will return the previous value to the caller, saving
    // 2 * number to the checkpoint, which will be used in the next invocation
    // for the previous state
    return entrypoint.final({
      value: previous ?? 0,
      save: 2 * number,
    });
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous was undefined)
await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)

任务

**任务 (task)** 代表一个离散的工作单元,例如 API 调用或数据处理步骤。它具有三个关键特征

  • **异步执行**:任务被设计为异步执行,允许多个操作并发运行而不阻塞。
  • **检查点**:任务结果保存到检查点,使工作流能够从最后保存的状态恢复。(详见持久化)。
  • **重试**:任务可以配置重试策略以处理暂时性错误。

定义

任务是使用 task 函数定义的,该函数包装了一个普通函数。

import { task } from "@langchain/langgraph";

const slowComputation = task({"slowComputation", async (inputValue: any) => {
  // Simulate a long-running operation
  ...
  return result;
});

序列化

任务的**输出****必须**是 JSON 可序列化的,以支持检查点。

执行

**任务**只能从**入口点 (entrypoint)**、另一个**任务**或状态图节点内部调用。

任务*不能*直接从主应用程序代码调用。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (someInput: number) => {
    return await slowComputation(someInput);
  }
);

重试策略

您可以通过向 task 函数传递 retry 参数来为**任务**指定重试策略

const slowComputation = task(
  {
    name: "slowComputation",
    // only attempt to run this task once before giving up
    retry: { maxAttempts: 1 },
  },
  async (inputValue: any) => {
    // A long-running operation that may fail
    return result;
  }
);

何时使用任务

**任务**在以下场景中非常有用

  • **检查点**:当您需要将长时间运行操作的结果保存到检查点时,这样在恢复工作流时就不需要重新计算它。
  • **人机协作**:如果您正在构建需要人工干预的工作流,您**必须**使用**任务**来封装任何随机性(例如,API 调用),以确保工作流能够正确恢复。详见确定性部分。
  • **并行执行**:对于 I/O 密集型任务,**任务**支持并行执行,允许多个操作并发运行而不阻塞(例如,调用多个 API)。
  • **可观测性**:将操作封装在**任务**中提供了一种使用 LangSmith 跟踪工作流进度和监控单个操作执行的方法。
  • **可重试工作**:当工作需要重试以处理故障或不一致时,**任务**提供了一种封装和管理重试逻辑的方法。

序列化

LangGraph 中序列化有两个关键方面

  1. entrypoint 的输入和输出必须是 JSON 可序列化的。
  2. task 的输出必须是 JSON 可序列化的。

这些要求对于启用检查点和工作流恢复是必要的。请使用 JavaScript 原生类型,如对象、数组、字符串、数字和布尔值,以确保您的输入和输出是可序列化的。

序列化确保工作流状态(例如任务结果和中间值)可以可靠地保存和恢复。这对于启用人机协作、容错和并行执行至关重要。

如果工作流配置了检查点器,提供不可序列化的输入或输出将导致运行时错误。

确定性

要利用**人机协作**等功能,任何随机性都应封装在**任务**内部。这保证了当执行暂停(例如,为了人机协作)然后恢复时,即使**任务**结果是非确定性的,它也将遵循相同的*步骤序列*。

LangGraph 通过在执行**任务**和**子图**时将其结果持久化来实现此行为。设计良好的工作流可确保恢复执行遵循*相同的步骤序列*,从而能够正确检索之前计算的结果而无需重新执行它们。这对于长时间运行的**任务**或具有非确定性结果的**任务**特别有用,因为它避免重复之前完成的工作,并允许从本质上相同的点恢复。

虽然工作流的不同运行可能会产生不同的结果,但恢复**特定**运行应始终遵循相同的记录步骤序列。这使得 LangGraph 能够高效地查找在图中断之前执行的**任务**和**子图**结果,并避免重新计算它们。

幂等性

幂等性确保多次运行同一操作会产生相同的结果。这有助于防止因故障而重新运行某个步骤时出现重复的 API 调用和冗余处理。始终将 API 调用放置在**任务**函数内部以进行检查点,并设计它们在重新执行时具有幂等性。如果**任务**启动但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,**任务**将再次运行。使用幂等性键或验证现有结果以避免重复。

函数式 API 对比 图式 API

**函数式 API** 和图式 API (StateGraph) 在 LangGraph 中提供了两种不同的创建范式。以下是一些主要区别

  • **控制流**:函数式 API 不需要考虑图结构。您可以使用标准编程语言构造来定义工作流。这通常会减少您需要编写的代码量。
  • **状态管理**:**图式 API** 需要声明一个**状态 (State)**,并且可能需要定义**Reducer**来管理图状态的更新。@entrypoint@tasks 不需要显式状态管理,因为它们的状态作用域限定在函数内部,并且不在函数之间共享。
  • **检查点**:两种 API 都生成和使用检查点。在**图式 API** 中,每个超步之后都会生成一个新的检查点。在**函数式 API** 中,当任务执行时,它们的結果将保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
  • **可视化**:图式 API 使得将工作流可视化为图表变得容易,这对于调试、理解工作流以及与他人共享非常有用。函数式 API 不支持可视化,因为图表是在运行时动态生成的。

常见陷阱

处理副作用

将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时它们不会被多次执行。

在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时它将被第二次执行。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: Record<string, any>) => {
    // This code will be executed a second time when resuming the workflow.
    // Which is likely not what you want.
    await fs.writeFile("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

在此示例中,副作用被封装在一个任务中,确保在恢复时一致的执行。

import { task } from "@langchain/langgraph";

const writeToFile = task("writeToFile", async () => {
  await fs.writeFile("output.txt", "Side effect executed");
});

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: Record<string, any>) => {
    // The side effect is now encapsulated in a task.
    await writeToFile();
    const value = interrupt("question");
    return value;
  }
);

非确定性控制流

每次可能产生不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。

  • 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → ...
  • 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新随机数 (7) → ...

这在使用涉及多次中断调用的**人机协作**工作流时尤为重要。LangGraph 为每个任务/入口点保留一个恢复值列表。当中断发生时,它会与相应的恢复值匹配。这种匹配严格基于**索引**,因此恢复值的顺序应与中断的顺序匹配。

如果在恢复时未保持执行顺序,一个 interrupt 调用可能与错误的 resume 值匹配,从而导致不正确的结果。

请阅读确定性部分了解更多详情。

在此示例中,工作流使用当前时间来确定执行哪个任务。这是非确定性的,因为工作流的结果取决于其执行的时间。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { t0: number }) => {
    const t1 = Date.now();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);

在此示例中,工作流使用输入 t0 来确定执行哪个任务。这是确定性的,因为工作流的结果仅取决于输入。

import { task } from "@langchain/langgraph";

const getTime = task("getTime", () => Date.now());

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { t0: number }) => {
    const t1 = await getTime();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);

模式

下面是一些简单模式,展示了**如何**使用**函数式 API** 的示例。

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用对象。

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: { value: number; anotherValue: number }) => {
    const value = inputs.value;
    const anotherValue = inputs.anotherValue;
    ...
  }
);

await myWorkflow.invoke([{ value: 1, anotherValue: 2 }]);

并行执行

任务可以通过并发调用并等待结果来并行执行。这对于提高 I/O 密集型任务(例如,调用 LLM 的 API)的性能非常有用。

const addOne = task("addOne", (number: number) => number + 1);

const graph = entrypoint(
  { checkpointer, name: "graph" },
  async (numbers: number[]) => {
    return await Promise.all(numbers.map(addOne));
  }
);

调用子图

**函数式 API** 和**图式 API** 可以在同一个应用程序中一起使用,因为它们共享相同的底层运行时。

import { entrypoint, StateGraph } from "@langchain/langgraph";

const builder = new StateGraph();
...
const someGraph = builder.compile();

const someWorkflow = entrypoint(
  { name: "someWorkflow" },
  async (someInput: Record<string, any>) => {
    // Call a graph defined using the graph API
    const result1 = await someGraph.invoke(...);
    // Call another graph defined using the graph API
    const result2 = await anotherGraph.invoke(...);
    return {
      result1,
      result2,
    };
  }
);

调用其他入口点

您可以从**入口点 (entrypoint)** 或**任务**内部调用其他**入口点 (entrypoint)**。

const someOtherWorkflow = entrypoint(
  { name: "someOtherWorkflow" }, // Will automatically use the checkpointer from the parent entrypoint
  async (inputs: { value: number }) => {
    return inputs.value;
  }
);

const myWorkflow = entrypoint(
  { checkpointer, name: "myWorkflow" },
  async (inputs: Record<string, any>) => {
    const value = await someOtherWorkflow.invoke([{ value: 1 }]);
    return value;
  }
);

流式传输自定义数据

您可以通过在 config 上使用 write 方法从**入口点 (entrypoint)** 流式传输自定义数据。这允许您将自定义数据写入 custom 流。

import {
  entrypoint,
  task,
  MemorySaver,
  LangGraphRunnableConfig,
} from "@langchain/langgraph";

const addOne = task("addOne", (x: number) => x + 1);

const addTwo = task("addTwo", (x: number) => x + 2);

const checkpointer = new MemorySaver();

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: { number: number }, config: LangGraphRunnableConfig) => {
    config.writer?.("hello"); // Write some data to the `custom` stream
    await addOne(inputs.number); // Will write data to the `updates` stream
    config.writer?.("world"); // Write some more data to the `custom` stream
    await addTwo(inputs.number); // Will write data to the `updates` stream
    return 5;
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

const stream = await main.stream(
  { number: 1 },
  { streamMode: ["custom", "updates"], ...config }
);

for await (const chunk of stream) {
  console.log(chunk);
}
["updates", { addOne: 2 }][("updates", { addTwo: 3 })][("custom", "hello")][
  ("custom", "world")
][("updates", { main: 5 })];

错误后恢复

import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

// Global variable to track the number of attempts
let attempts = 0;

const getInfo = task("getInfo", () => {
  /*
   * Simulates a task that fails once before succeeding.
   * Throws an error on the first attempt, then returns "OK" on subsequent tries.
   */
  attempts += 1;

  if (attempts < 2) {
    throw new Error("Failure"); // Simulate a failure on the first attempt
  }
  return "OK";
});

// Initialize an in-memory checkpointer for persistence
const checkpointer = new MemorySaver();

const slowTask = task("slowTask", async () => {
  /*
   * Simulates a slow-running task by introducing a 1-second delay.
   */
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return "Ran slow task.";
});

const main = entrypoint(
  { checkpointer, name: "main" },
  async (inputs: Record<string, any>) => {
    /*
     * Main workflow function that runs the slowTask and getInfo tasks sequentially.
     *
     * Parameters:
     * - inputs: Record<string, any> containing workflow input values.
     *
     * The workflow first executes `slowTask` and then attempts to execute `getInfo`,
     * which will fail on the first invocation.
     */
    const slowTaskResult = await slowTask(); // Blocking call to slowTask
    await getInfo(); // Error will be thrown here on the first attempt
    return slowTaskResult;
  }
);

// Workflow execution configuration with a unique thread identifier
const config = {
  configurable: {
    thread_id: "1", // Unique identifier to track workflow execution
  },
};

// This invocation will take ~1 second due to the slowTask execution
try {
  // First invocation will throw an error due to the `getInfo` task failing
  await main.invoke({ anyInput: "foobar" }, config);
} catch (err) {
  // Handle the failure gracefully
}

当我们恢复执行时,我们不需要重新运行 slowTask,因为其结果已保存在检查点中。

await main.invoke(null, config);
"Ran slow task.";

人机协作 (Human-in-the-loop)

函数式 API 支持使用 interrupt 函数和 Command 原语的人机协作工作流。

请参阅以下示例了解更多详情

短期记忆

使用 getPreviousState 函数进行状态管理,并可选择使用 entrypoint.final 原语来实现短期记忆

请参阅以下操作指南了解更多详情

长期记忆

长期记忆允许跨不同的**线程 ID** 存储信息。这对于在一次对话中学习给定用户的信息并在另一次对话中使用它非常有用。

请参阅以下操作指南了解更多详情

工作流

  • 有关如何使用函数式 API 构建工作流的更多示例,请参阅工作流和代理指南。

代理