跳到内容

如何延迟节点执行

先决条件

本指南假定您熟悉以下内容:

当您希望延迟节点的执行直到所有其他待处理任务都完成后,延迟节点执行会很有用。这在分支长度不同时尤其重要,例如在 Map-Reduce 流程等工作流中很常见。

import { StateGraph, Annotation, START, END } from "@langchain/langgraph";

const StateAnnotation = Annotation.Root({
  aggregate: Annotation<string[]>({
    default: () => [],
    reducer: (acc, value) => [...acc, ...value],
  }),
});

const graph = new StateGraph(StateAnnotation)
  .addNode("a", (state) => {
    console.log(`Adding "A" to ${state.aggregate.join(", ")}`);
    return { aggregate: ["A"] };
  })
  .addNode("b", (state) => {
    console.log(`Adding "B" to ${state.aggregate.join(", ")}`);
    return { aggregate: ["B"] };
  })
  .addNode("b_2", (state) => {
    console.log(`Adding "B_2" to ${state.aggregate.join(", ")}`);
    return { aggregate: ["B_2"] };
  })
  .addNode("c", (state) => {
    console.log(`Adding "C" to ${state.aggregate.join(", ")}`);
    return { aggregate: ["C"] };
  })
  .addNode(
    "d",
    (state) => {
      console.log(`Adding "D" to ${state.aggregate.join(", ")}`);
      return { aggregate: ["D"] };
    },
    { defer: true }
  )
  .addEdge(START, "a")
  .addEdge("a", "b")
  .addEdge("a", "c")
  .addEdge("b", "b_2")
  .addEdge("b_2", "d")
  .addEdge("c", "d")
  .addEdge("d", END)
  .compile();
import * as tslab from "tslab";

const drawableGraph = graph.getGraph();
const image = await drawableGraph.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();

await tslab.display.png(new Uint8Array(arrayBuffer));

await graph.invoke({"aggregate": []})
Adding "A" to 
Adding "B" to A
Adding "C" to A
Adding "B_2" to A, B, C
Adding "D" to A, B, C, B_2
{ aggregate: [ 'A', 'B', 'C', 'B_2', 'D' ] }
在上面的示例中,节点 "b""c" 在同一个超步中并发执行。我们将节点 "d" 设置为 { defer: true },这样它将不会执行,直到所有待处理任务都完成。在这种情况下,这意味着 "d" 会等待整个分支 "b" 完成。