如何添加动态断点¶
定义图¶
在 [1]
已复制!
import {
Annotation,
MemorySaver,
NodeInterrupt,
StateGraph,
} from "@langchain/langgraph";
const StateAnnotation = Annotation.Root({
input: Annotation<string>,
});
const step1 = async (state: typeof StateAnnotation.State) => {
console.log("---Step 1---");
return state;
};
const step2 = async (state: typeof StateAnnotation.State) => {
// Let's optionally raise a NodeInterrupt
// if the length of the input is longer than 5 characters
if (state.input?.length > 5) {
throw new NodeInterrupt(`Received input that is longer than 5 characters: ${state.input}`);
}
console.log("---Step 2---");
return state;
};
const step3 = async (state: typeof StateAnnotation.State) => {
console.log("---Step 3---");
return state;
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(StateAnnotation)
.addNode("step1", step1)
.addNode("step2", step2)
.addNode("step3", step3)
.addEdge("__start__", "step1")
.addEdge("step1", "step2")
.addEdge("step2", "step3")
.addEdge("step3", "__end__")
.compile({ checkpointer });
import { Annotation, MemorySaver, NodeInterrupt, StateGraph, } from "@langchain/langgraph"; const StateAnnotation = Annotation.Root({ input: Annotation, }); const step1 = async (state: typeof StateAnnotation.State) => { console.log("---Step 1---"); return state; }; const step2 = async (state: typeof StateAnnotation.State) => { // 让我们有条件地抛出 NodeInterrupt // 如果输入的长度超过 5 个字符 if (state.input?.length > 5) { throw new NodeInterrupt(`收到的输入长度超过 5 个字符:${state.input}`); } console.log("---Step 2---"); return state; }; const step3 = async (state: typeof StateAnnotation.State) => { console.log("---Step 3---"); return state; }; const checkpointer = new MemorySaver(); const graph = new StateGraph(StateAnnotation) .addNode("step1", step1) .addNode("step2", step2) .addNode("step3", step3) .addEdge("__start__", "step1") .addEdge("step1", "step2") .addEdge("step2", "step3") .addEdge("step3", "__end__") .compile({ checkpointer });
在 [2]
已复制!
import * as tslab from "tslab";
const representation = graph.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
await tslab.display.png(new Uint8Array(arrayBuffer));
import * as tslab from "tslab"; const representation = graph.getGraph(); const image = await representation.drawMermaidPng(); const arrayBuffer = await image.arrayBuffer(); await tslab.display.png(new Uint8Array(arrayBuffer));
使用动态中断运行图¶
首先,让我们使用长度 <= 5 个字符的输入运行图。这应该安全地忽略我们定义的中断条件,并在图执行结束时返回原始输入。
在 [3]
已复制!
const initialInput = { input: "hello" };
const config = {
configurable: {
thread_id: "1",
},
streamMode: "values" as const,
};
const stream = await graph.stream(initialInput, config);
for await (const event of stream) {
console.log(event);
}
const initialInput = { input: "hello" }; const config = { configurable: { thread_id: "1", }, streamMode: "values" as const, }; const stream = await graph.stream(initialInput, config); for await (const event of stream) { console.log(event); }
{ input: 'hello' } ---Step 1--- { input: 'hello' } ---Step 2--- { input: 'hello' } ---Step 3--- { input: 'hello' }
如果我们现在检查图,我们可以看到没有更多任务要执行,并且图确实完成了执行。
在 [4]
已复制!
const state = await graph.getState(config);
console.log(state.next);
console.log(state.tasks);
const state = await graph.getState(config); console.log(state.next); console.log(state.tasks);
[] []
现在,让我们使用长度超过 5 个字符的输入运行图。这应该触发我们在 step2
节点内部通过抛出 NodeInterrupt
错误定义的动态中断。
在 [5]
已复制!
const longInput = { input: "hello world" };
const config2 = {
configurable: {
thread_id: "2",
},
streamMode: "values" as const,
};
const streamWithInterrupt = await graph.stream(longInput, config2);
for await (const event of streamWithInterrupt) {
console.log(event);
}
const longInput = { input: "hello world" }; const config2 = { configurable: { thread_id: "2", }, streamMode: "values" as const, }; const streamWithInterrupt = await graph.stream(longInput, config2); for await (const event of streamWithInterrupt) { console.log(event); }
{ input: 'hello world' } ---Step 1--- { input: 'hello world' }
我们可以看到图现在在执行 step2
时停止了。如果我们现在检查图状态,我们可以看到有关哪个节点设置为下一个执行节点(step2
)的信息,以及哪个节点引发了中断(也是 step2
),以及有关中断的额外信息。
在 [6]
已复制!
const state2 = await graph.getState(config2);
console.log(state2.next);
console.log(JSON.stringify(state2.tasks, null, 2));
const state2 = await graph.getState(config2); console.log(state2.next); console.log(JSON.stringify(state2.tasks, null, 2));
[ 'step2' ] [ { "id": "c91a38f7-2aec-5c38-a3f0-60fba6efe73c", "name": "step2", "interrupts": [ { "value": "Received input that is longer than 5 characters: hello world", "when": "during" } ] } ]
如果我们尝试从断点恢复图,我们将再次中断,因为我们的输入和图状态没有改变。
在 [7]
已复制!
// NOTE: to resume the graph from a dynamic interrupt we use the same syntax as
// regular interrupts -- we pass null as the input
const resumedStream = await graph.stream(null, config2);
for await (const event of resumedStream) {
console.log(event);
}
// 注意:要从动态中断恢复图,我们使用与 // 常规中断相同的语法 -- 我们将 null 作为输入传递 const resumedStream = await graph.stream(null, config2); for await (const event of resumedStream) { console.log(event); }
在 [8]
已复制!
const state3 = await graph.getState(config2);
console.log(state3.next);
console.log(JSON.stringify(state2.tasks, null, 2));
const state3 = await graph.getState(config2); console.log(state3.next); console.log(JSON.stringify(state2.tasks, null, 2));
[ 'step2' ] [ { "id": "c91a38f7-2aec-5c38-a3f0-60fba6efe73c", "name": "step2", "interrupts": [ { "value": "Received input that is longer than 5 characters: hello world", "when": "during" } ] } ]
更新图状态¶
为了解决这个问题,我们可以做几件事。
首先,我们可以简单地使用更短的输入在另一个线程上运行图,就像我们在开始时那样。或者,如果我们想从断点恢复图执行,我们可以更新状态以具有长度小于 5 个字符的输入(我们的中断条件)。
在 [9]
已复制!
// NOTE: this update will be applied as of the last successful node before the interrupt,
// i.e. `step1`, right before the node with an interrupt
await graph.updateState(config2, { input: "short" });
const updatedStream = await graph.stream(null, config2);
for await (const event of updatedStream) {
console.log(event);
}
const state4 = await graph.getState(config2);
console.log(state4.next);
console.log(state4.values);
// 注意:此更新将从中断之前最后一个成功节点开始应用, // 例如 `step1`,就在中断节点之前 await graph.updateState(config2, { input: "short" }); const updatedStream = await graph.stream(null, config2); for await (const event of updatedStream) { console.log(event); } const state4 = await graph.getState(config2); console.log(state4.next); console.log(state4.values);
---Step 2--- { input: 'short' } ---Step 3--- { input: 'short' } [] { input: 'short' }
您也可以 **作为节点 step2
**(中断节点)更新状态,这将完全跳过该节点
在 [10]
已复制!
const config3 = {
configurable: {
thread_id: "3",
},
streamMode: "values" as const,
};
const skipStream = await graph.stream({ input: "hello world" }, config3);
// Run the graph until the first interruption
for await (const event of skipStream) {
console.log(event);
}
const config3 = { configurable: { thread_id: "3", }, streamMode: "values" as const, }; const skipStream = await graph.stream({ input: "hello world" }, config3); // 运行图直到第一次中断 for await (const event of skipStream) { console.log(event); }
{ input: 'hello world' } ---Step 1--- { input: 'hello world' }
在 [11]
已复制!
// NOTE: this update will skip the node `step2` entirely
await graph.updateState(config3, undefined, "step2");
// Resume the stream
for await (const event of await graph.stream(null, config3)) {
console.log(event);
}
const state5 = await graph.getState(config3);
console.log(state5.next);
console.log(state5.values);
// 注意:此更新将完全跳过节点 `step2` await graph.updateState(config3, undefined, "step2"); // 恢复流 for await (const event of await graph.stream(null, config3)) { console.log(event); } const state5 = await graph.getState(config3); console.log(state5.next); console.log(state5.values);
---Step 3--- { input: 'hello world' } [] { input: 'hello world' }