跳到内容

如何添加和使用子图

先决条件

本指南假定您熟悉以下内容

子图允许您构建具有多个组件的复杂系统,这些组件本身就是图。使用子图的一个常见用例是构建多代理系统

添加子图时的主要问题是父图和子图如何通信,即它们如何在图执行期间相互传递状态。有两种情况

下面我们展示了如何为每种场景添加子图。

Screenshot 2024-07-11 at 1.01.28 PM.png

设置

首先,让我们安装所需的软件包

npm install @langchain/langgraph @langchain/core

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 使您可以使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序 — 阅读更多关于如何开始使用此处的信息。


添加带有编译后的子图的节点

一个常见的情况是父图和子图通过共享状态键(通道)进行通信。例如,在多代理系统中,代理通常通过共享的消息键进行通信。

如果您的子图与父图共享状态键,您可以按照以下步骤将其添加到您的图中

  1. 定义子图工作流程(下面示例中的 subgraphBuilder)并编译它
  2. 在定义父图工作流程时,将编译后的子图传递给 .addNode 方法

让我们看一个例子。

import { StateGraph, Annotation } from "@langchain/langgraph";

const SubgraphStateAnnotation = Annotation.Root({
  foo: Annotation<string>, // note that this key is shared with the parent graph state
  bar: Annotation<string>,
});

const subgraphNode1 = async (state: typeof SubgraphStateAnnotation.State) => {
  return { bar: "bar" };
};

const subgraphNode2 = async (state: typeof SubgraphStateAnnotation.State) => {
  // note that this node is using a state key ('bar') that is only available in the subgraph
  // and is sending update on the shared state key ('foo')
  return { foo: state.foo + state.bar };
};

const subgraphBuilder = new StateGraph(SubgraphStateAnnotation)
  .addNode("subgraphNode1", subgraphNode1)
  .addNode("subgraphNode2", subgraphNode2)
  .addEdge("__start__", "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2")

const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentStateAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const node1 = async (state: typeof ParentStateAnnotation.State) => {
  return {
    foo: "hi! " + state.foo,
  };
}

const builder = new StateGraph(ParentStateAnnotation)
  .addNode("node1", node1)
  // note that we're adding the compiled subgraph as a node to the parent graph
  .addNode("node2", subgraph)
  .addEdge("__start__", "node1")
  .addEdge("node1", "node2")

const graph = builder.compile();

const stream = await graph.stream({ foo: "foo" });

for await (const chunk of stream) {
  console.log(chunk);
}
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }
您可以看到,来自父图的最终输出包括子图调用的结果(字符串 "bar")。

如果您想查看来自子图的流式输出,您可以在流式处理时指定 subgraphs: True。有关从子图流式处理的更多信息,请参阅此操作指南

const streamWithSubgraphs = await graph.stream({ foo: "foo" }, { subgraphs: true });

for await (const chunk of streamWithSubgraphs) {
  console.log(chunk);
}
[ [], { node1: { foo: 'hi! foo' } } ]
[
  [ 'node2:22f27b01-fa9f-5f46-9b5b-166a80d96791' ],
  { subgraphNode1: { bar: 'bar' } }
]
[
  [ 'node2:22f27b01-fa9f-5f46-9b5b-166a80d96791' ],
  { subgraphNode2: { foo: 'hi! foobar' } }
]
[ [], { node2: { foo: 'hi! foobar' } } ]
您会注意到,块输出格式已更改为包含有关其来源子图的一些附加信息。

添加调用子图的节点函数

对于更复杂的系统,您可能希望定义与父图具有完全不同模式(无共享键)的子图。例如,在多代理 RAG 系统中,搜索代理可能只需要跟踪查询和检索到的文档。

如果您的应用程序属于这种情况,您需要定义一个调用子图的节点函数。此函数需要在调用子图之前将输入(父)状态转换为子图状态,并在返回节点的状态更新之前将结果转换回父状态。

下面我们展示了如何修改我们的原始示例以从节点内部调用子图。

注意

如果您为子图启用了检查点,则不能在同一节点内调用多个子图。有关更多信息,请参阅此页面

import { StateGraph, Annotation } from "@langchain/langgraph";

const SubgraphAnnotation = Annotation.Root({
  bar: Annotation<string>, // note that this key is shared with the parent graph state
  baz: Annotation<string>,
});

const subgraphNodeOne = async (state: typeof SubgraphAnnotation.State) => {
  return { baz: "baz" };
};

const subgraphNodeTwo = async (state: typeof SubgraphAnnotation.State) => {
  return { bar: state.bar + state.baz }
};

const subgraphCalledInFunction = new StateGraph(SubgraphAnnotation)
  .addNode("subgraphNode1", subgraphNodeOne)
  .addNode("subgraphNode2", subgraphNodeTwo)
  .addEdge("__start__", "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2")
  .compile();

// Define parent graph
const ParentAnnotation = Annotation.Root({
  foo: Annotation<string>,
});

const nodeOne = async (state: typeof ParentAnnotation.State) => {
  return {
    foo: "hi! " + state.foo,
  };
}

const nodeTwo = async (state: typeof ParentAnnotation.State) => {
  const response = await subgraphCalledInFunction.invoke({
    bar: state.foo,
  });
  return { foo: response.bar }
}

const graphWithFunction = new StateGraph(ParentStateAnnotation)
  .addNode("node1", nodeOne)
  // note that we're adding the compiled subgraph as a node to the parent graph
  .addNode("node2", nodeTwo)
  .addEdge("__start__", "node1")
  .addEdge("node1", "node2")
  .compile();

const graphWithFunctionStream = await graphWithFunction.stream({ foo: "foo" }, { subgraphs: true });
for await (const chunk of graphWithFunctionStream) {
  console.log(chunk);
}
[ [], { node1: { foo: 'hi! foo' } } ]
[
  [ 'node2:1d2bb11a-3ed1-5c58-9b6f-c7af36a1eeb7' ],
  { subgraphNode1: { baz: 'baz' } }
]
[
  [ 'node2:1d2bb11a-3ed1-5c58-9b6f-c7af36a1eeb7' ],
  { subgraphNode2: { bar: 'hi! foobaz' } }
]
[ [], { node2: { foo: 'hi! foobaz' } } ]