使用小型模型构建客户支持聊天机器人¶
下面是一个将客户支持聊天机器人建模为状态机的示例。它旨在通过向小型模型提供交互中所处阶段的上下文来工作,从而减少给定 LLM 调用的决策空间,使其保持专注。
入口是一个包含一个链的节点,我们已经提示该链回答基本问题,但将与计费或技术支持相关的问题委托给其他“团队”。
根据此入口节点的响应,该节点的边缘将使用 LLM 调用来确定是直接响应用户,还是调用 billing_support
或 technical_support
节点。
- 技术支持将尝试使用更集中的提示来回答用户的问题。
- 计费代理可以选择回答用户的问题,或者可以使用 动态断点 调用人工以批准退款。
这旨在作为一个示例性的概念验证架构——你可以通过赋予各个节点执行检索、使用其他工具、在更深层次阶段委托给更强大的模型等能力来扩展此示例。
让我们深入了解!
设置¶
首先,我们需要安装所需的软件包。我们将使用一个相对较小的模型 Llama 3.1 8B,托管在 Together AI 上,以运行所需的推理。
你还需要设置一个名为 TOGETHER_AI_API_KEY
的环境变量,你可以从 Together 仪表盘中获取。
初始化模型¶
首先,我们定义将用于所有调用的 LLM 和 LangGraph 状态。
import { ChatTogetherAI } from "@langchain/community/chat_models/togetherai";
const model = new ChatTogetherAI({
model: "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
temperature: 0,
});
构建图¶
现在我们开始为节点定义逻辑。每个节点的返回值都将添加到图状态中。我们将从预构建的 MessagesAnnotation
开始,它旨在管理从节点返回的消息的格式和边缘情况。
我们还将添加另外两个状态值:一个定义下一位代表的字符串,以及一个确定人工是否已授权给定对话退款的布尔值。我们的组合状态将如下所示
此状态将传递给下一个执行的节点,如果执行完成,则将返回此状态。定义状态如下所示
import { Annotation, MessagesAnnotation } from "@langchain/langgraph";
const StateAnnotation = Annotation.Root({
...MessagesAnnotation.spec,
nextRepresentative: Annotation<string>,
refundAuthorized: Annotation<boolean>,
});
我们将在节点内计算 nextRepresentative
的值,以使从给定检查点恢复完全确定性——如果在边缘内使用 LLM,从给定状态恢复将具有一些不希望的随机性。
现在,让我们定义入口节点。它将模仿一个能够处理传入问题并以对话方式响应或路由到更专业团队的秘书。
import { z } from "zod";
import { zodToJsonSchema } from "zod-to-json-schema";
const initialSupport = async (state: typeof StateAnnotation.State) => {
const SYSTEM_TEMPLATE =
`You are frontline support staff for LangCorp, a company that sells computers.
Be concise in your responses.
You can chat with customers and help them with basic questions, but if the customer is having a billing or technical problem,
do not try to answer the question directly or gather information.
Instead, immediately transfer them to the billing or technical team by asking the user to hold for a moment.
Otherwise, just respond conversationally.`;
const supportResponse = await model.invoke([
{ role: "system", content: SYSTEM_TEMPLATE },
...state.messages,
]);
const CATEGORIZATION_SYSTEM_TEMPLATE = `You are an expert customer support routing system.
Your job is to detect whether a customer support representative is routing a user to a billing team or a technical team, or if they are just responding conversationally.`;
const CATEGORIZATION_HUMAN_TEMPLATE =
`The previous conversation is an interaction between a customer support representative and a user.
Extract whether the representative is routing the user to a billing or technical team, or whether they are just responding conversationally.
Respond with a JSON object containing a single key called "nextRepresentative" with one of the following values:
If they want to route the user to the billing team, respond only with the word "BILLING".
If they want to route the user to the technical team, respond only with the word "TECHNICAL".
Otherwise, respond only with the word "RESPOND".`;
const categorizationResponse = await model.invoke([{
role: "system",
content: CATEGORIZATION_SYSTEM_TEMPLATE,
},
...state.messages,
{
role: "user",
content: CATEGORIZATION_HUMAN_TEMPLATE,
}],
{
response_format: {
type: "json_object",
schema: zodToJsonSchema(
z.object({
nextRepresentative: z.enum(["BILLING", "TECHNICAL", "RESPOND"]),
})
)
}
});
// Some chat models can return complex content, but Together will not
const categorizationOutput = JSON.parse(categorizationResponse.content as string);
// Will append the response message to the current interaction state
return { messages: [supportResponse], nextRepresentative: categorizationOutput.nextRepresentative };
};
我们在上面使用了 Together AI 的 JSON 模式,以确保在决定下一位代表时输出可解析。
接下来,我们的计费和技术支持节点。我们在计费提示中给出了特殊说明,它可以选择通过路由到另一个代理来授权退款。
const billingSupport = async (state: typeof StateAnnotation.State) => {
const SYSTEM_TEMPLATE =
`You are an expert billing support specialist for LangCorp, a company that sells computers.
Help the user to the best of your ability, but be concise in your responses.
You have the ability to authorize refunds, which you can do by transferring the user to another agent who will collect the required information.
If you do, assume the other agent has all necessary information about the customer and their order.
You do not need to ask the user for more information.
Help the user to the best of your ability, but be concise in your responses.`;
let trimmedHistory = state.messages;
// Make the user's question the most recent message in the history.
// This helps small models stay focused.
if (trimmedHistory.at(-1)._getType() === "ai") {
trimmedHistory = trimmedHistory.slice(0, -1);
}
const billingRepResponse = await model.invoke([
{
role: "system",
content: SYSTEM_TEMPLATE,
},
...trimmedHistory,
]);
const CATEGORIZATION_SYSTEM_TEMPLATE =
`Your job is to detect whether a billing support representative wants to refund the user.`;
const CATEGORIZATION_HUMAN_TEMPLATE =
`The following text is a response from a customer support representative.
Extract whether they want to refund the user or not.
Respond with a JSON object containing a single key called "nextRepresentative" with one of the following values:
If they want to refund the user, respond only with the word "REFUND".
Otherwise, respond only with the word "RESPOND".
Here is the text:
<text>
${billingRepResponse.content}
</text>.`;
const categorizationResponse = await model.invoke([
{
role: "system",
content: CATEGORIZATION_SYSTEM_TEMPLATE,
},
{
role: "user",
content: CATEGORIZATION_HUMAN_TEMPLATE,
}
], {
response_format: {
type: "json_object",
schema: zodToJsonSchema(
z.object({
nextRepresentative: z.enum(["REFUND", "RESPOND"]),
})
)
}
});
const categorizationOutput = JSON.parse(categorizationResponse.content as string);
return {
messages: billingRepResponse,
nextRepresentative: categorizationOutput.nextRepresentative,
};
};
const technicalSupport = async (state: typeof StateAnnotation.State) => {
const SYSTEM_TEMPLATE =
`You are an expert at diagnosing technical computer issues. You work for a company called LangCorp that sells computers.
Help the user to the best of your ability, but be concise in your responses.`;
let trimmedHistory = state.messages;
// Make the user's question the most recent message in the history.
// This helps small models stay focused.
if (trimmedHistory.at(-1)._getType() === "ai") {
trimmedHistory = trimmedHistory.slice(0, -1);
}
const response = await model.invoke([
{
role: "system",
content: SYSTEM_TEMPLATE,
},
...trimmedHistory,
]);
return {
messages: response,
};
};
最后,一个处理退款的节点。由于这不是一个真实系统,这里的逻辑是存根,但在实践中,你可以在此处添加一个需要人工批准的真实工具。我们使用一个名为 NodeInterrupt
的特殊错误,以便在人工检查状态并确认退款适合后,稍后可以恢复图的执行。
import { NodeInterrupt } from "@langchain/langgraph";
const handleRefund = async (state: typeof StateAnnotation.State) => {
if (!state.refundAuthorized) {
console.log("--- HUMAN AUTHORIZATION REQUIRED FOR REFUND ---");
throw new NodeInterrupt("Human authorization required.")
}
return {
messages: {
role: "assistant",
content: "Refund processed!",
},
};
};
现在我们可以通过将上述所有函数添加为节点并将 initial_support
设置为起始节点来开始构建图。
import { StateGraph } from "@langchain/langgraph";
let builder = new StateGraph(StateAnnotation)
.addNode("initial_support", initialSupport)
.addNode("billing_support", billingSupport)
.addNode("technical_support", technicalSupport)
.addNode("handle_refund", handleRefund)
.addEdge("__start__", "initial_support");
连接节点¶
太棒了!现在让我们继续边缘。这些边缘将评估由各个节点的返回值创建的图的当前状态,并相应地路由执行。
首先,我们希望我们的 initial_support
节点要么委托给计费节点、技术节点,要么直接响应用户。这是一个可能的示例:
builder = builder.addConditionalEdges("initial_support", async (state: typeof StateAnnotation.State) => {
if (state.nextRepresentative.includes("BILLING")) {
return "billing";
} else if (state.nextRepresentative.includes("TECHNICAL")) {
return "technical";
} else {
return "conversational";
}
}, {
billing: "billing_support",
technical: "technical_support",
conversational: "__end__",
});
console.log("Added edges!");
让我们继续。我们添加一个边缘,使技术支持节点始终结束,因为它没有工具可调用。计费支持节点使用条件边缘,因为它可以调用退款工具或结束。
builder = builder
.addEdge("technical_support", "__end__")
.addConditionalEdges("billing_support", async (state) => {
if (state.nextRepresentative.includes("REFUND")) {
return "refund";
} else {
return "__end__";
}
}, {
refund: "handle_refund",
__end__: "__end__",
})
.addEdge("handle_refund", "__end__");
console.log("Added edges!");
.compile()
来完成图的构建。我们还将使用内存中的检查点器来存储状态。
import { MemorySaver } from "@langchain/langgraph";
const checkpointer = new MemorySaver();
const graph = builder.compile({
checkpointer,
});
这是当前构建的图的表示:
import * as tslab from "tslab";
const representation = graph.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
await tslab.display.png(new Uint8Array(arrayBuffer));
现在让我们测试一下!
我们可以使用 .stream()
可运行方法在执行节点生成返回值的过程中获取它们(我们还可以更细粒度地使用 .streamEvents()
获取生成中的输出,但这需要更多的解析工作)。
这是一个与计费相关的退款查询示例。由于我们定义状态的方式,输入必须是一条消息(或消息列表),表示用户的问题。
const stream = await graph.stream({
messages: [
{
role: "user",
content: "I've changed my mind and I want a refund for order #182818!",
}
]
}, {
configurable: {
thread_id: "refund_testing_id",
}
});
for await (const value of stream) {
console.log("---STEP---");
console.log(value);
console.log("---END STEP---");
}
---STEP---
{
initial_support: {
messages: AIMessage {
"id": "8beb633a396c67fd-SJC",
"content": "I'd be happy to help you with that. However, I need to check on our refund policy for you. Can you please hold for just a moment while I transfer you to our billing team? They'll be able to assist you with the refund process.",
"additional_kwargs": {},
"response_metadata": {
"tokenUsage": {
"completionTokens": 53,
"promptTokens": 116,
"totalTokens": 169
},
"finish_reason": "eos"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 116,
"output_tokens": 53,
"total_tokens": 169
}
},
nextRepresentative: 'BILLING'
}
}
---END STEP---
---STEP---
{
billing_support: {
messages: AIMessage {
"id": "8beb634908a12500-SJC",
"content": "I'd be happy to assist you with a refund. I'll transfer you to our Refunds Team, who will guide you through the process. Please hold for just a moment.\n\n(Transfer to Refunds Team)\n\nRefunds Team: Hi, I'm here to help with your refund request for order #182818. Can you please confirm your refund amount and reason for return?",
"additional_kwargs": {},
"response_metadata": {
"tokenUsage": {
"completionTokens": 77,
"promptTokens": 139,
"totalTokens": 216
},
"finish_reason": "eos"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 139,
"output_tokens": 77,
"total_tokens": 216
}
},
nextRepresentative: 'REFUND'
}
}
---END STEP---
--- HUMAN AUTHORIZATION REQUIRED FOR REFUND ---
---STEP---
{}
---END STEP---
billing_support
,但随后触发了我们的动态中断,因为图状态中未设置 refundAuthorized
。我们可以通过检查图的当前状态并注意在运行 handle_refund
时存在中断来看到这一点。
const currentState = await graph.getState({ configurable: { thread_id: "refund_testing_id" } });
console.log("CURRENT TASKS", JSON.stringify(currentState.tasks, null, 2));
CURRENT TASKS [
{
"id": "5ab19c8b-c947-5bf7-a3aa-4edae60c1a96",
"name": "handle_refund",
"interrupts": [
{
"value": "Human authorization required.",
"when": "during"
}
]
}
]
handle_refund
但这会再次触发中断,因为 refundAuthorized
未设置。如果我们将状态更新为设置 refundAuthorized
为 true,然后通过使用相同的 thread_id
并将 null
作为输入运行图来恢复图,执行将继续并且退款将处理。
await graph.updateState({ configurable: { thread_id: "refund_testing_id" } }, {
refundAuthorized: true,
});
const resumedStream = await graph.stream(null, { configurable: { thread_id: "refund_testing_id" }});
for await (const value of resumedStream) {
console.log(value);
}
现在,让我们尝试一个技术问题
const technicalStream = await graph.stream({
messages: [{
role: "user",
content: "My LangCorp computer isn't turning on because I dropped it in water.",
}]
}, {
configurable: {
thread_id: "technical_testing_id"
}
});
for await (const value of technicalStream) {
console.log(value);
}
{
initial_support: {
messages: AIMessage {
"id": "8beb66886c0c15d8-SJC",
"content": "Oh no, sorry to hear that! Water damage can be a real challenge. Have you tried unplugging it and letting it dry out for a bit? Sometimes, it's just a matter of giving it some time to recover.",
"additional_kwargs": {},
"response_metadata": {
"tokenUsage": {
"completionTokens": 47,
"promptTokens": 115,
"totalTokens": 162
},
"finish_reason": "eos"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 115,
"output_tokens": 47,
"total_tokens": 162
}
},
nextRepresentative: 'TECHNICAL'
}
}
{
technical_support: {
messages: AIMessage {
"id": "8beb66986df91701-SJC",
"content": "Sorry to hear that. Water damage can be a real challenge. Let's try to troubleshoot the issue.\n\nCan you tell me:\n\n1. How long was the computer submerged in water?\n2. Did you turn it off before it got wet, or was it on at the time?\n3. Have you tried unplugging the power cord and pressing the power button for 30 seconds to discharge any residual power?\n\nThis will help me narrow down the possible causes and suggest the next steps.",
"additional_kwargs": {},
"response_metadata": {
"tokenUsage": {
"completionTokens": 99,
"promptTokens": 70,
"totalTokens": 169
},
"finish_reason": "eos"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 70,
"output_tokens": 99,
"total_tokens": 169
}
}
}
}
我们可以看到查询正确地路由到了技术支持节点!
最后,让我们尝试一个简单的对话式响应
const conversationalStream = await graph.stream({
messages: [{
role: "user",
content: "How are you? I'm Cobb."
}]
}, {
configurable: {
thread_id: "conversational_testing_id"
}
});
for await (const value of conversationalStream) {
console.log(value);
}
{
initial_support: {
messages: AIMessage {
"id": "8beb6712294915e3-SJC",
"content": "Hi Cobb! I'm doing great, thanks for asking. How can I help you today? Are you looking to purchase a new computer or just have a question about our products?",
"additional_kwargs": {},
"response_metadata": {
"tokenUsage": {
"completionTokens": 37,
"promptTokens": 108,
"totalTokens": 145
},
"finish_reason": "eos"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 108,
"output_tokens": 37,
"total_tokens": 145
}
},
nextRepresentative: 'RESPOND'
}
}
initial_support
节点自行处理了它,没有路由到技术或计费支持。
延伸阅读¶
你可能已经注意到,每个节点的响应都会在状态历史中添加一条消息,结果我们最终得到了一系列连续的助手消息,这些消息对应于 LLM 扮演的不同客户支持角色。
使用 MessagesAnnotation
,可以通过返回一个包含要删除消息的相同 id
的 消息修饰符 来修剪此状态。请参阅此指南了解更多信息。