跳到内容

LangGraph 术语表

LangGraph 的核心是将代理工作流建模为图。您可以使用三个关键组件来定义代理的行为

  1. State: 一个共享数据结构,表示应用程序的当前快照。它由一个 Annotation 对象表示。

  2. Nodes: JavaScript/TypeScript 函数,用于编码代理的逻辑。它们接收当前 State 作为输入,执行一些计算或副作用,并返回更新后的 State

  3. Edges: JavaScript/TypeScript 函数,根据当前的 State 决定下一个要执行的 Node。它们可以是条件分支或固定转换。

通过组合 NodesEdges,您可以创建复杂的、循环的工作流,随时间演进 State。然而,真正的强大之处在于 LangGraph 如何管理 State。需要强调的是:NodesEdges 仅仅是 JavaScript/TypeScript 函数——它们可以包含一个 LLM,也可以只是普通的 JavaScript/TypeScript 代码。

简而言之:节点负责工作。边决定下一步做什么

LangGraph 的底层图算法使用消息传递来定义一个通用程序。当一个节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。这些接收节点随后执行其函数,将结果消息传递给下一组节点,并持续这个过程。受 Google 的 Pregel 系统启发,该程序以离散的“超级步骤”进行。

一个超级步骤可以被认为是图节点的一次迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点则属于不同的超级步骤。在图执行开始时,所有节点都处于 inactive 状态。当一个节点在其任何传入边(或“通道”)上接收到新消息(状态)时,它会变为 active。活跃节点随后运行其函数并返回更新。在每个超级步骤结束时,没有传入消息的节点会通过将自己标记为 inactive 来投票 halt(停止)。当所有节点都处于 inactive 状态且没有消息在传输中时,图的执行终止。

StateGraph

StateGraph 类是主要的图类。它通过用户定义的 State 对象(使用 Annotation 对象定义并作为第一个参数传递)进行参数化。

MessageGraph (传统)

MessageGraph 类是一种特殊类型的图。MessageGraphState 仅是一个消息数组。除了聊天机器人之外,此类别很少使用,因为大多数应用程序要求 State 比消息数组更复杂。

编译您的图

要构建您的图,您首先定义状态,然后添加节点,最后进行编译。究竟什么是编译您的图?为什么需要它?

编译是一个非常简单的步骤。它对您的图结构进行一些基本检查(例如没有孤立节点)。您还可以在此处指定运行时参数,例如检查点器和断点。您只需调用 .compile 方法即可编译图。

const graph = graphBuilder.compile(...);

在使用图之前,您必须编译它。

状态

定义图时,您做的第一件事就是定义图的 StateState 包含有关图结构的信息,以及指定如何将更新应用到状态的reducer 函数State 的 schema 将作为图中所有 NodesEdges 的输入 schema,并且应使用 Annotation 对象进行定义。所有 Nodes 都将向 State 发出更新,然后使用指定的 reducer 函数应用这些更新。

Annotation

指定图的 schema 的方法是定义一个根 Annotation 对象,其中每个键都是状态中的一个项。

多个 Schema

通常,所有图节点都与单个状态注解进行通信。这意味着它们将读取和写入相同的状态通道。但是,在某些情况下,我们希望对此进行更多控制。

  • 内部节点可以传递图中不需要的输入/输出信息。
  • 我们可能还希望为图使用不同的输入/输出 Schema。例如,输出可能只包含一个相关的输出键。

节点可以在图内部写入私有状态通道,用于内部节点通信。我们可以简单地定义一个私有注解 PrivateState。有关更多详细信息,请参见此笔记本

还可以为图定义显式输入和输出 Schema。在这些情况下,我们定义一个包含所有与图操作相关的键的“内部”Schema。但是,我们还定义了作为“内部”Schema 子集的 inputoutput Schema,以限制图的输入和输出。有关更多详细信息,请参见此指南

我们来看一个例子

import {
  Annotation,
  START,
  StateGraph,
  StateType,
  UpdateType,
} from "@langchain/langgraph";

const InputStateAnnotation = Annotation.Root({
  user_input: Annotation<string>,
});

const OutputStateAnnotation = Annotation.Root({
  graph_output: Annotation<string>,
});

const OverallStateAnnotation = Annotation.Root({
  foo: Annotation<string>,
  bar: Annotation<string>,
  user_input: Annotation<string>,
  graph_output: Annotation<string>,
});

const node1 = async (state: typeof InputStateAnnotation.State) => {
  // Write to OverallStateAnnotation
  return { foo: state.user_input + " name" };
};

const node2 = async (state: typeof OverallStateAnnotation.State) => {
  // Read from OverallStateAnnotation, write to OverallStateAnnotation
  return { bar: state.foo + " is" };
};

const node3 = async (state: typeof OverallStateAnnotation.State) => {
  // Read from OverallStateAnnotation, write to OutputStateAnnotation
  return { graph_output: state.bar + " Lance" };
};

// Most of the time the StateGraph type parameters are inferred by TypeScript,
// but this is a special case where they must be specified explicitly in order
// to avoid a type error.
const graph = new StateGraph<
  (typeof OverallStateAnnotation)["spec"],
  StateType<(typeof OverallStateAnnotation)["spec"]>,
  UpdateType<(typeof OutputStateAnnotation)["spec"]>,
  typeof START,
  (typeof InputStateAnnotation)["spec"],
  (typeof OutputStateAnnotation)["spec"]
>({
  input: InputStateAnnotation,
  output: OutputStateAnnotation,
  stateSchema: OverallStateAnnotation,
})
  .addNode("node1", node1)
  .addNode("node2", node2)
  .addNode("node3", node3)
  .addEdge("__start__", "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .compile();

await graph.invoke({ user_input: "My" });
{ graph_output: "My name is Lance" }

请注意,我们将 state: typeof InputStateAnnotation.State 作为输入 Schema 传递给 node1。但是,我们写入到 foo,这是 OverallStateAnnotation 中的一个通道。我们如何写入不包含在输入 Schema 中的状态通道?这是因为节点可以写入图状态中的任何状态通道。图状态是初始化时定义的状态通道的并集,其中包括 OverallStateAnnotation 以及过滤器 InputStateAnnotationOutputStateAnnotation

归约函数

归约函数是理解节点更新如何应用于 State 的关键。State 中的每个键都有其自己的独立归约函数。如果未明确指定归约函数,则假定对该键的所有更新都应覆盖它。让我们看几个例子来更好地理解它们。

示例 A

import { StateGraph, Annotation } from "@langchain/langgraph";

const State = Annotation.Root({
  foo: Annotation<number>,
  bar: Annotation<string[]>,
});

const graphBuilder = new StateGraph(State);

在此示例中,未为任何键指定归约函数。假设图的输入是 { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State schema - 只需要一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["bye"] }

示例 B

import { StateGraph, Annotation } from "@langchain/langgraph";

const State = Annotation.Root({
  foo: Annotation<number>,
  bar: Annotation<string[]>({
    reducer: (state: string[], update: string[]) => state.concat(update),
    default: () => [],
  }),
});

const graphBuilder = new StateGraph(State);

在此示例中,我们已将 bar 字段更新为包含 reducer 函数的对象。此函数将始终接受两个位置参数:stateupdate,其中 state 表示当前状态值,update 表示从 Node 返回的更新。请注意,第一个键保持不变。假设图的输入是 { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。请注意,Node 不需要返回整个 State schema - 只需要一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["hi", "bye"] }。请注意,此处 bar 键通过连接两个数组而更新。

在图状态中处理消息

为什么使用消息?

大多数现代 LLM 提供商都提供一个聊天模型接口,该接口接受一个消息列表作为输入。LangChain 的 ChatModel 特别接受一个 Message 对象列表作为输入。这些消息有多种形式,例如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。要了解更多关于消息对象的信息,请参阅概念指南。

在您的图中使用消息

在许多情况下,将先前的对话历史记录作为消息列表存储在图状态中会很有帮助。为此,我们可以在图状态中添加一个键(通道),用于存储 Message 对象列表,并使用归约函数对其进行注解(参见下面示例中的 messages 键)。归约函数对于告诉图如何用每个状态更新(例如,当节点发送更新时)更新状态中的 Message 对象列表至关重要。如果您不指定归约函数,则每个状态更新都将用最近提供的值覆盖消息列表。

然而,您可能还希望手动更新图状态中的消息(例如,人在回路)。如果您使用类似 (a, b) => a.concat(b) 作为归约函数,您发送到图的手动状态更新将被附加到现有消息列表中,而不是更新现有消息。为了避免这种情况,您需要一个能够跟踪消息 ID 并(如果更新)覆盖现有消息的归约函数。为了实现这一点,您可以使用预构建的 messagesStateReducer 函数。对于全新的消息,它将简单地附加到现有列表,但它也将正确处理现有消息的更新。

序列化

除了跟踪消息 ID 外,messagesStateReducer 函数还会在 messages 通道上收到状态更新时尝试将消息反序列化为 LangChain Message 对象。这允许以以下格式发送图输入/状态更新

// this is supported
{
  messages: [new HumanMessage({ content: "message" })];
}

// and this is also supported
{
  messages: [{ role: "user", content: "message" }];
}

下面是使用 messagesStateReducer 作为其归约函数的图状态注解示例。

import type { BaseMessage } from "@langchain/core/messages";
import { Annotation, type Messages } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  messages: Annotation<BaseMessage[], Messages>({
    reducer: messagesStateReducer,
  }),
});

MessagesAnnotation

由于在状态中拥有消息列表非常常见,因此存在一个预构建的注解称为 MessagesAnnotation,它使得使用消息作为图状态变得容易。MessagesAnnotation 定义了一个名为 messages 的单一键,它是一个 BaseMessage 对象列表,并使用 messagesStateReducer 归约函数。

import { MessagesAnnotation, StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(MessagesAnnotation)
  .addNode(...)
  ...

等同于像这样手动初始化您的状态

import { BaseMessage } from "@langchain/core/messages";
import { Annotation, StateGraph, messagesStateReducer } from "@langchain/langgraph";

export const StateAnnotation = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: messagesStateReducer,
    default: () => [],
  }),
});

const graph = new StateGraph(StateAnnotation)
  .addNode(...)
  ...

MessagesAnnotation 的状态有一个名为 messages 的单一键。这是一个 BaseMessage 数组,并以 messagesStateReducer 作为归约函数。messagesStateReducer 基本上将消息添加到现有列表中(它还做了一些额外的好事,例如将 OpenAI 消息格式转换为标准 LangChain 消息格式,根据消息 ID 处理更新等)。

我们经常看到消息数组是状态的一个关键组成部分,因此这种预构建的状态旨在方便使用消息。通常,需要跟踪的状态不仅仅是消息,因此我们看到人们扩展此状态并添加更多字段,例如

import { Annotation, MessagesAnnotation } from "@langchain/langgraph";

const StateWithDocuments = Annotation.Root({
  ...MessagesAnnotation.spec, // Spread in the messages state
  documents: Annotation<string[]>,
});

MessagesZodState

MessagesAnnotation 类似,存在一个预构建的 Zod schema 称为 MessagesZodState,它提供相同的功能,但使用 Zod 而不是 Annotation API 来定义状态。

import { MessagesZodState, StateGraph } from "@langchain/langgraph";

import { z } from "zod";

const graph = new StateGraph(MessagesZodState)
  .addNode(...)
  ...

有关使用 Zod 定义图状态的更多信息,请参阅定义图状态操作指南

节点

在 LangGraph 中,节点通常是 JavaScript/TypeScript 函数(同步或 async),其中第一个位置参数是状态,(可选地)第二个位置参数是“config”,包含可选的可配置参数(例如 thread_id)。

类似于 NetworkX,您可以使用 addNode 方法将这些节点添加到图中。

import { RunnableConfig } from "@langchain/core/runnables";
import { StateGraph, Annotation } from "@langchain/langgraph";

const GraphAnnotation = Annotation.Root({
  input: Annotation<string>,
  results: Annotation<string>,
});

// The state type can be extracted using `typeof <annotation variable name>.State`
const myNode = (state: typeof GraphAnnotation.State, config?: RunnableConfig) => {
  console.log("In node: ", config.configurable?.user_id);
  return {
    results: `Hello, ${state.input}!`
  };
};

// The second argument is optional
const myOtherNode = (state: typeof GraphAnnotation.State) => {
  return state;
};

const builder = new StateGraph(GraphAnnotation)
  .addNode("myNode", myNode)
  .addNode("myOtherNode", myOtherNode)
  ...

在幕后,函数被转换为 RunnableLambda,这为您的函数添加了批处理和流式支持,以及原生的跟踪和调试功能。

START 节点

START 节点是一个特殊节点,表示向图发送用户输入的节点。引用此节点的主要目的是确定应首先调用哪些节点。

import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

END 节点

END 节点是一个特殊节点,表示一个终止节点。当您想表示哪些边在完成后没有后续操作时,会引用此节点。

import { END } from "@langchain/langgraph";

graph.addEdge("nodeA", END);

边定义了逻辑如何路由以及图如何决定停止。这是代理工作以及不同节点之间如何通信的重要组成部分。有几种关键类型的边:

  • 普通边:直接从一个节点到下一个节点。
  • 条件边:调用一个函数来决定接下来要到哪个(或哪些)节点。
  • 入口点:当用户输入到达时,首先运行哪个节点。
  • 条件入口点:调用一个函数来决定当用户输入到达时,首先运行哪个(或哪些)节点。

一个节点可以有多个出站边。如果一个节点有多个出站边,那么所有这些目标节点都将在下一个超级步骤中并行执行。

普通边

如果您总是想从节点 A 到节点 B,您可以直接使用 addEdge 方法。

graph.addEdge("nodeA", "nodeB");

条件边

如果您想有选择地路由到一个或多个边(或有选择地终止),您可以使用 addConditionalEdges 方法。此方法接受一个节点名称和一个在该节点执行后调用的“路由函数”。

graph.addConditionalEdges("nodeA", routingFunction);

与节点类似,routingFunction 接受图的当前 state 并返回一个值。

默认情况下,routingFunction 的返回值用作下一个发送状态的节点名称(或节点数组)。所有这些节点都将在下一个超级步骤中并行运行。

您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。

graph.addConditionalEdges("nodeA", routingFunction, {
  true: "nodeB",
  false: "nodeC",
});

提示

如果您想在一个函数中结合状态更新和路由,请使用Command 而不是条件边。

入口点

入口点是图启动时运行的第一个节点。您可以使用虚拟 START 节点到要执行的第一个节点的 addEdge 方法来指定进入图的位置。

import { START } from "@langchain/langgraph";

const graph = new StateGraph(...)
  .addEdge(START, "nodeA")
  .compile();

条件入口点

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点的 addConditionalEdges 来实现此目的。

import { START } from "@langchain/langgraph";

const graph = new StateGraph(...)
  .addConditionalEdges(START, routingFunction)
  .compile();

您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。

const graph = new StateGraph(...)
  .addConditionalEdges(START, routingFunction, {
    true: "nodeB",
    false: "nodeC",
  })
  .compile();

Send

默认情况下,NodesEdges 是预先定义的,并操作相同的共享状态。然而,在某些情况下,确切的边可能无法预先知道,和/或您可能希望同时存在不同版本的 State。一个常见的例子是 map-reduce 设计模式。在这种设计模式中,第一个节点可能生成一个对象数组,您可能希望将某些其他节点应用于所有这些对象。对象的数量可能无法预先知道(意味着边的数量可能无法知道),并且下游 Node 的输入 State 应该不同(每个生成对象一个)。

为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点名称,第二个是要传递给该节点的状态。

const continueToJokes = (state: { subjects: string[] }) => {
  return state.subjects.map(
    (subject) => new Send("generate_joke", { subject })
  );
};

const graph = new StateGraph(...)
  .addConditionalEdges("nodeA", continueToJokes)
  .compile();

Command

提示

此功能需要 @langchain/langgraph>=0.2.31

将控制流(边)和状态更新(节点)结合起来可能很方便。例如,您可能希望在同一个节点中既执行状态更新又决定下一个要去的节点,而不是使用条件边。LangGraph 提供了一种通过从节点函数返回 Command 对象来实现此目的的方法。

import { StateGraph, Annotation, Command } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const myNode = (state: typeof StateAnnotation.State) => {
  return new Command({
    // state update
    update: {
      foo: "bar",
    },
    // control flow
    goto: "myOtherNode",
  });
};

使用 Command,您还可以实现动态控制流行为(与条件边相同)。

const myNode = async (state: typeof StateAnnotation.State) => {
  if (state.foo === "bar") {
    return new Command({
      update: {
        foo: "baz",
      },
      goto: "myOtherNode",
    });
  }
  // ...
};

重要

在您的节点函数中返回 Command 时,您还必须添加一个 ends 参数,其中包含节点要路由到的节点名称列表,例如 .addNode("myNode", myNode, { ends: ["myOtherNode"] })。这对于图编译和验证是必要的,并且表明 myNode 可以导航到 myOtherNode

请查看此操作指南,获取如何使用 Command 的端到端示例。

什么时候应该使用 Command 而不是条件边?

当您需要同时更新图状态路由到不同节点时,请使用 Command。例如,在实现多智能体交接时,路由到不同智能体并向该智能体传递一些信息很重要。

使用条件边可以在不更新状态的情况下有条件地在节点之间路由。

如果您正在使用子图,您可能希望从子图中的一个节点导航到不同的子图(即父图中的另一个节点)。为此,您可以在 Command 中指定 graph: Command.PARENT

const myNode = (state: typeof StateAnnotation.State) => {
  return new Command({
    update: { foo: "bar" },
    goto: "other_subgraph", // where `other_subgraph` is a node in the parent graph
    graph: Command.PARENT,
  });
};

注意

graph 设置为 Command.PARENT 将导航到最近的父图。

这在实现多智能体交接时特别有用。

在工具中使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具中返回 Command({ update: { my_custom_key: "foo", messages: [...] } })

import { tool } from "@langchain/core/tools";

const lookupUserInfo = tool(async (input, config) => {
  const userInfo = getUserInfo(config);
  return new Command({
    // update state keys
    update: {
      user_info: userInfo,
      messages: [
        new ToolMessage({
          content: "Successfully looked up user information",
          tool_call_id: config.toolCall.id,
        }),
      ],
    },
  });
}, {
  name: "lookup_user_info",
  description: "Use this to look up user information to better assist them with their questions.",
  schema: z.object(...)
});

重要

当从工具返回 Command 时,您必须Command.update 中包含 messages(或任何用于消息历史记录的状态键),并且 messages 中的消息列表必须包含 ToolMessage。这对于使结果消息历史记录有效是必要的(LLM 供应商要求带有工具调用的 AI 消息之后必须跟着工具结果消息)。

如果您使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode,它会自动处理工具返回的 Command 对象并将其传播到图状态。如果您正在编写调用工具的自定义节点,则需要手动将工具返回的 Command 对象作为节点更新进行传播。

人机协作 (Human-in-the-loop)

Command 是人在回路工作流的重要组成部分:当使用 interrupt() 收集用户输入时,Command 随后用于提供输入并通过 new Command({ resume: "User input" }) 恢复执行。有关更多信息,请参阅此概念指南

持久性

LangGraph 通过 检查点器 为您的代理状态提供内置持久化。检查点器在每个超级步骤保存图状态的快照,允许随时恢复。这支持了人在回路交互、内存管理和容错等功能。您甚至可以使用适当的 getupdate 方法在执行后直接操作图的状态。有关更多详细信息,请参阅概念指南

线程

LangGraph 中的线程表示您的图与用户之间的单个会话或对话。当使用检查点功能时,单个对话中的轮次(甚至单个图执行中的步骤)都由唯一的线程 ID 组织。

存储

LangGraph 通过 BaseStore 接口提供内置文档存储。与按线程 ID 保存状态的检查点器不同,存储使用自定义命名空间来组织数据。这实现了跨线程持久化,允许代理维护长期记忆、从过去的交互中学习并随时间积累知识。常见的用例包括存储用户配置文件、构建知识库和管理所有线程的全局偏好设置。

图迁移

LangGraph 可以轻松处理图定义(节点、边和状态)的迁移,即使在使用检查点器跟踪状态时也是如此。

  • 对于图末尾(即未中断)的线程,您可以更改图的整个拓扑(即所有节点和边,删除、添加、重命名等)。
  • 对于当前中断的线程,我们支持除重命名/删除节点之外的所有拓扑更改(因为该线程现在可能即将进入不再存在的节点)——如果这是一个障碍,请联系我们,我们可以优先解决。.
  • 对于修改状态,我们对添加和删除键具有完全的向后和向前兼容性
  • 重命名的状态键会丢失其在现有线程中保存的状态
  • 类型以不兼容方式更改的状态键目前可能会在更改前具有状态的线程中导致问题——如果这是一个障碍,请联系我们,我们可以优先解决。

配置

创建图时,您还可以标记图的某些部分是可配置的。这通常是为了方便在模型或系统提示之间进行切换。这允许您创建一个单一的“认知架构”(即图),但拥有它的多个不同实例。

然后,您可以使用 configurable 配置字段将此配置传递到图中。

const config = { configurable: { llm: "anthropic" } };

await graph.invoke(inputs, config);

然后,您可以在节点内部访问和使用此配置。

const nodeA = (state, config) => {
  const llmType = config?.configurable?.llm;
  let llm: BaseChatModel;
  if (llmType) {
    const llm = getLlm(llmType);
  }
  ...
};

有关配置的完整细分,请参阅此指南

递归限制

递归限制设置了图在单次执行期间可以执行的超级步骤的最大数量。一旦达到限制,LangGraph 将引发 GraphRecursionError。默认情况下,此值设置为 25 步。递归限制可以在运行时设置在任何图上,并通过配置字典传递给 .invoke/.stream。重要的是,recursionLimit 是一个独立的 config 键,不应像所有其他用户定义的配置一样传递到 configurable 键中。请参阅下面的示例。

await graph.invoke(inputs, { recursionLimit: 50 });

阅读此操作指南以了解有关递归限制如何工作的更多信息。

interrupt

使用 interrupt 函数在特定点暂停图以收集用户输入。interrupt 函数将中断信息呈现给客户端,允许开发人员在恢复执行之前收集用户输入、验证图状态或做出决策。

import { interrupt } from "@langchain/langgraph";

const humanApprovalNode = (state: typeof StateAnnotation.State) => {
  ...
  const answer = interrupt(
      // This value will be sent to the client.
      // It can be any JSON serializable value.
      { question: "is it ok to continue?"},
  );
  ...

通过将 Command 对象传递给图,并将 resume 键设置为 interrupt 函数返回的值,即可恢复图。

人在回路概念指南中阅读更多关于 interrupt 如何用于人在回路工作流的信息。

注意: interrupt 函数目前在Web 环境中不可用。

断点

断点在特定点暂停图执行,并支持逐步执行。断点由 LangGraph 的持久化层提供支持,该层在每个图步骤后保存状态。断点还可用于启用人在回路工作流,尽管我们建议为此目的使用interrupt 函数

断点概念指南中阅读更多关于断点的信息。

子图

子图是作为另一个图中的节点使用的。这不过是应用于 LangGraph 的古老的封装概念。使用子图的一些原因包括:

  • 构建多代理系统
  • 当您希望在多个图中重用一组节点时,这些节点可能共享某些状态,您可以在子图中定义它们一次,然后在多个父图中使用它们。
  • 当您希望不同的团队独立处理图的不同部分时,您可以将每个部分定义为子图,只要遵守子图接口(输入和输出 schema),父图就可以在不了解子图任何细节的情况下构建。

有两种方法可以将子图添加到父图:

  • 添加一个带已编译子图的节点:当父图和子图共享状态键且不需要在输入或输出时转换状态时,这很有用。
.addNode("subgraph", subgraphBuilder.compile());
  • 添加一个带有调用子图的函数的节点:当父图和子图具有不同的状态 schema 并且需要在调用子图之前或之后转换状态时,这很有用。
const subgraph = subgraphBuilder.compile();

const callSubgraph = async (state: typeof StateAnnotation.State) => {
  return subgraph.invoke({ subgraph_key: state.parent_key });
};

const builder = new StateGraph(...)
  .addNode("subgraph", callSubgraph)
  .compile();

我们来看每个示例。

作为编译后的图

创建子图节点最简单的方法是直接使用编译后的子图。这样做时,重要的是父图和子图的状态 schema 至少共享一个可用于通信的键。如果您的图和子图不共享任何键,则应改用编写一个调用子图的函数

注意

如果您将额外的键传递给子图节点(即,除了共享键之外),它们将被子图节点忽略。同样,如果您从子图返回额外的键,它们将被父图忽略。

import { StateGraph, Annotation } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const SubgraphStateAnnotation = Annotation.Root({
  foo: Annotation<string>, // note that this key is shared with the parent graph state
  bar: Annotation<string>,
});

// Define subgraph
const subgraphNode = async (state: typeof SubgraphStateAnnotation.State) => {
  // note that this subgraph node can communicate with
  // the parent graph via the shared "foo" key
  return { foo: state.foo + "bar" };
};

const subgraph = new StateGraph(SubgraphStateAnnotation)
  .addNode("subgraph", subgraphNode);
  ...
  .compile();

// Define parent graph
const parentGraph = new StateGraph(StateAnnotation)
  .addNode("subgraph", subgraph)
  ...
  .compile();

作为函数

您可能希望定义一个具有完全不同 schema 的子图。在这种情况下,您可以创建一个调用子图的节点函数。此函数需要将输入(父)状态转换为子图状态,然后在调用子图之前调用子图,并将结果转换回父状态,然后从节点返回状态更新。

import { StateGraph, Annotation } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const SubgraphStateAnnotation = Annotation.Root({
  // note that none of these keys are shared with the parent graph state
  bar: Annotation<string>,
  baz: Annotation<string>,
});

// Define subgraph
const subgraphNode = async (state: typeof SubgraphStateAnnotation.State) => {
  return { bar: state.bar + "baz" };
};

const subgraph = new StateGraph(SubgraphStateAnnotation)
  .addNode("subgraph", subgraphNode);
  ...
  .compile();

// Define parent graph
const subgraphWrapperNode = async (state: typeof StateAnnotation.State) => {
  // transform the state to the subgraph state
  const response = await subgraph.invoke({
    bar: state.foo,
  });
  // transform response back to the parent state
  return {
    foo: response.bar,
  };
}

const parentGraph = new StateGraph(StateAnnotation)
  .addNode("subgraph", subgraphWrapperNode)
  ...
  .compile();

可视化

能够可视化图通常是很好的,尤其当它们变得更复杂时。LangGraph 带有一个很好的内置方法,可以将图渲染为 Mermaid 图。您可以像这样使用 getGraph() 方法。

const representation = graph.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
const buffer = new Uint8Array(arrayBuffer);

您还可以查看LangGraph Studio,它是一个定制的 IDE,包含强大的可视化和调试功能。

流式传输

LangGraph 具有一流的流式传输支持。LangGraph 支持多种不同的流式传输模式:

  • "values": 这会流式传输图每一步之后状态的完整值。
  • "updates: 这会流式传输图每一步之后的状态更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新会单独流式传输。

此外,您可以使用 streamEvents 方法来流式传输节点内部发生的事件。这对于流式传输 LLM 调用的 Token 很有用。

LangGraph 具有一流的流式传输支持,包括在执行期间从图节点流式传输更新、从 LLM 调用流式传输 Token 等。有关更多信息,请参阅此概念指南