跳到内容

如何创建分支以并行执行节点

LangGraph 原生支持扇出和扇入,可以使用常规边或 conditionalEdges

这使您可以并行运行节点,以加快整个图的执行速度。

以下是一些示例,展示了如何添加为您工作的分支数据流。

设置

首先,安装 LangGraph.js

yarn add @langchain/langgraph @langchain/core

本指南将使用 OpenAI 的 GPT-4o 模型。我们将选择性地设置我们的 API 密钥以进行 LangSmith tracing,这将为我们提供一流的可观察性。

// process.env.OPENAI_API_KEY = "sk_...";

// Optional, add tracing in LangSmith
// process.env.LANGCHAIN_API_KEY = "ls__..."
// process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "true";
process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "true";
process.env.LANGCHAIN_TRACING_V2 = "true";
process.env.LANGCHAIN_PROJECT = "Branching: LangGraphJS";
Branching: LangGraphJS

扇出,扇入

首先,我们将创建一个简单的图,它会分支出去然后再分支回来。当合并回来时,来自所有分支的状态更新将由您的 reducer(下面的 aggregate 方法)应用。

import { END, START, StateGraph, Annotation } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  aggregate: Annotation<string[]>({
    reducer: (x, y) => x.concat(y),
  })
});

// Create the graph
const nodeA = (state: typeof StateAnnotation.State) => {
  console.log(`Adding I'm A to ${state.aggregate}`);
  return { aggregate: [`I'm A`] };
};
const nodeB = (state: typeof StateAnnotation.State) => {
  console.log(`Adding I'm B to ${state.aggregate}`);
  return { aggregate: [`I'm B`] };
};
const nodeC = (state: typeof StateAnnotation.State) => {
  console.log(`Adding I'm C to ${state.aggregate}`);
  return { aggregate: [`I'm C`] };
};
const nodeD = (state: typeof StateAnnotation.State) => {
  console.log(`Adding I'm D to ${state.aggregate}`);
  return { aggregate: [`I'm D`] };
};

const builder = new StateGraph(StateAnnotation)
  .addNode("a", nodeA)
  .addEdge(START, "a")
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addNode("d", nodeD)
  .addEdge("a", "b")
  .addEdge("a", "c")
  .addEdge("b", "d")
  .addEdge("c", "d")
  .addEdge("d", END);

const graph = builder.compile();
import * as tslab from "tslab";

const representation = graph.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();

await tslab.display.png(new Uint8Array(arrayBuffer));

// Invoke the graph
const baseResult = await graph.invoke({ aggregate: [] });
console.log("Base Result: ", baseResult);
Adding I'm A to 
Adding I'm B to I'm A
Adding I'm C to I'm A
Adding I'm D to I'm A,I'm B,I'm C
Base Result:  { aggregate: [ "I'm A", "I'm B", "I'm C", "I'm D" ] }

条件分支

如果您的扇出不是确定性的,您可以像这样直接使用 addConditionalEdges

const ConditionalBranchingAnnotation = Annotation.Root({
  aggregate: Annotation<string[]>({
    reducer: (x, y) => x.concat(y),
  }),
  which: Annotation<string>({
    reducer: (x: string, y: string) => (y ?? x),
  })
})

// Create the graph
const nodeA2 = (state: typeof ConditionalBranchingAnnotation.State) => {
  console.log(`Adding I'm A to ${state.aggregate}`);
  return { aggregate: [`I'm A`] };
};
const nodeB2 = (state: typeof ConditionalBranchingAnnotation.State) => {
  console.log(`Adding I'm B to ${state.aggregate}`);
  return { aggregate: [`I'm B`] };
};
const nodeC2 = (state: typeof ConditionalBranchingAnnotation.State) => {
  console.log(`Adding I'm C to ${state.aggregate}`);
  return { aggregate: [`I'm C`] };
};
const nodeD2 = (state: typeof ConditionalBranchingAnnotation.State) => {
  console.log(`Adding I'm D to ${state.aggregate}`);
  return { aggregate: [`I'm D`] };
};
const nodeE2 = (state: typeof ConditionalBranchingAnnotation.State) => {
  console.log(`Adding I'm E to ${state.aggregate}`);
  return { aggregate: [`I'm E`] };
};

// Define the route function
function routeCDorBC(state: typeof ConditionalBranchingAnnotation.State): string[] {
  if (state.which === "cd") {
    return ["c", "d"];
  }
  return ["b", "c"];
}

const builder2 = new StateGraph(ConditionalBranchingAnnotation)
  .addNode("a", nodeA2)
  .addEdge(START, "a")
  .addNode("b", nodeB2)
  .addNode("c", nodeC2)
  .addNode("d", nodeD2)
  .addNode("e", nodeE2)
  // Add conditional edges
  // Third parameter is to support visualizing the graph
  .addConditionalEdges("a", routeCDorBC, ["b", "c", "d"])
  .addEdge("b", "e")
  .addEdge("c", "e")
  .addEdge("d", "e")
  .addEdge("e", END);

const graph2 = builder2.compile();
import * as tslab from "tslab";

const representation2 = graph2.getGraph();
const image2 = await representation2.drawMermaidPng();
const arrayBuffer2 = await image2.arrayBuffer();

await tslab.display.png(new Uint8Array(arrayBuffer2));

// Invoke the graph
let g2result = await graph2.invoke({ aggregate: [], which: "bc" });
console.log("Result 1: ", g2result);
Adding I'm A to 
Adding I'm B to I'm A
Adding I'm C to I'm A
Adding I'm E to I'm A,I'm B,I'm C
Result 1:  { aggregate: [ "I'm A", "I'm B", "I'm C", "I'm E" ], which: 'bc' }

g2result = await graph2.invoke({ aggregate: [], which: "cd" });
console.log("Result 2: ", g2result);
Adding I'm A to 
Adding I'm C to I'm A
Adding I'm D to I'm A
Adding I'm E to I'm A,I'm C,I'm D
``````output
Result 2:  { aggregate: [ "I'm A", "I'm C", "I'm D", "I'm E" ], which: 'cd' }

稳定排序

当扇出时,节点以单个“superstep”并行运行。一旦 superstep 完成,来自每个 superstep 的更新都按顺序应用于状态。

如果您需要来自并行 superstep 的更新的一致的、预定的顺序,您应该将输出(以及标识键)写入状态中的单独字段,然后通过从每个扇出节点向 rendezvous point 添加常规 edge,在“sink”节点中组合它们。

例如,假设我想按“可靠性”对并行步骤的输出进行排序。

type ScoredValue = {
  value: string;
  score: number;
};

const reduceFanouts = (left?: ScoredValue[], right?: ScoredValue[]) => {
  if (!left) {
    left = [];
  }
  if (!right || right?.length === 0) {
    // Overwrite. Similar to redux.
    return [];
  }
  return left.concat(right);
};

const StableSortingAnnotation = Annotation.Root({
  aggregate: Annotation<string[]>({
    reducer: (x, y) => x.concat(y),
  }),
  which: Annotation<string>({
    reducer: (x: string, y: string) => (y ?? x),
  }),
  fanoutValues: Annotation<ScoredValue[]>({
    reducer: reduceFanouts,
  }),
})


class ParallelReturnNodeValue {
  private _value: string;
  private _score: number;

  constructor(nodeSecret: string, score: number) {
    this._value = nodeSecret;
    this._score = score;
  }

  public call(state: typeof StableSortingAnnotation.State) {
    console.log(`Adding ${this._value} to ${state.aggregate}`);
    return { fanoutValues: [{ value: this._value, score: this._score }] };
  }
}

// Create the graph

const nodeA3 = (state: typeof StableSortingAnnotation.State) => {
  console.log(`Adding I'm A to ${state.aggregate}`);
  return { aggregate: ["I'm A"] };
};

const nodeB3 = new ParallelReturnNodeValue("I'm B", 0.1);
const nodeC3 = new ParallelReturnNodeValue("I'm C", 0.9);
const nodeD3 = new ParallelReturnNodeValue("I'm D", 0.3);

const aggregateFanouts = (state: typeof StableSortingAnnotation.State) => {
  // Sort by score (reversed)
  state.fanoutValues.sort((a, b) => b.score - a.score);
  return {
    aggregate: state.fanoutValues.map((v) => v.value).concat(["I'm E"]),
    fanoutValues: [],
  };
};

// Define the route function
function routeBCOrCD(state: typeof StableSortingAnnotation.State): string[] {
  if (state.which === "cd") {
    return ["c", "d"];
  }
  return ["b", "c"];
}

const builder3 = new StateGraph(StableSortingAnnotation)
  .addNode("a", nodeA3)
  .addEdge(START, "a")
  .addNode("b", nodeB3.call.bind(nodeB3))
  .addNode("c", nodeC3.call.bind(nodeC3))
  .addNode("d", nodeD3.call.bind(nodeD3))
  .addNode("e", aggregateFanouts)
  .addConditionalEdges("a", routeBCOrCD, ["b", "c", "d"])
  .addEdge("b", "e")
  .addEdge("c", "e")
  .addEdge("d", "e")
  .addEdge("e", END);

const graph3 = builder3.compile();

// Invoke the graph
let g3result = await graph3.invoke({ aggregate: [], which: "bc" });
console.log("Result 1: ", g3result);
Adding I'm A to 
Adding I'm B to I'm A
Adding I'm C to I'm A
Result 1:  {
  aggregate: [ "I'm A", "I'm C", "I'm B", "I'm E" ],
  which: 'bc',
  fanoutValues: []
}
在这种情况下,我们的 aggregateFanouts “sink”节点获取了映射的值,然后以一致的方式对它们进行了排序。请注意,因为它为 fanoutValues 返回一个空数组,所以我们的 reduceFanouts reducer 函数决定覆盖状态中的先前值。

let g3result2 = await graph3.invoke({ aggregate: [], which: "cd" });
console.log("Result 2: ", g3result2);
Adding I'm A to 
Adding I'm C to I'm A
Adding I'm D to I'm A
Result 2:  {
  aggregate: [ "I'm A", "I'm C", "I'm D", "I'm E" ],
  which: 'cd',
  fanoutValues: []
}