跳到内容

如何创建用于并行执行的 Map-Reduce 分支

Map-reduce 操作对于高效的任务分解和并行处理至关重要。这种方法包括将任务分解为更小的子任务,并行处理每个子任务,并聚合所有已完成子任务的结果。

考虑以下示例:给定用户提出的一个通用主题,生成一个相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。在这种设计模式中,第一个节点可以生成一个对象列表(例如,相关主题),我们希望将另一个节点(例如,生成笑话)应用于所有这些对象(例如,主题)。然而,出现了两个主要挑战。

(1)对象(例如,主题)的数量可能事先未知(意味着当我们布局图时,边的数量可能未知),以及(2)下游节点的输入状态应该是不同的(每个生成的对象一个)。

LangGraph 通过其 Send API 解决了这些挑战。通过利用条件边,Send 可以将不同的状态(例如,主题)分发到节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流程管理。

Screenshot 2024-07-12 at 9.45.40 AM.png

设置

此示例将需要一些依赖项。首先,安装 LangGraph 库,以及 @langchain/anthropic 包,因为我们将在本示例中使用 Anthropic LLM

npm install @langchain/langgraph @langchain/anthropic @langchain/core

接下来,设置您的 Anthropic API 密钥

process.env.ANTHROPIC_API_KEY = 'YOUR_API_KEY'
import { z } from "zod";
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, END, START, Annotation, Send } from "@langchain/langgraph";

/* Model and prompts */

// Define model and prompts we will use
const subjectsPrompt = "Generate a comma separated list of between 2 and 5 examples related to: {topic}."
const jokePrompt = "Generate a joke about {subject}"
const bestJokePrompt = `Below are a bunch of jokes about {topic}. Select the best one! Return the ID (index) of the best one.

{jokes}`

// Zod schemas for getting structured output from the LLM
const Subjects = z.object({
  subjects: z.array(z.string()),
});
const Joke = z.object({
  joke: z.string(),
});
const BestJoke = z.object({
  id: z.number(),
});

const model = new ChatAnthropic({
  model: "claude-3-5-sonnet-20240620",
});

/* Graph components: define the components that will make up the graph */

// This will be the overall state of the main graph.
// It will contain a topic (which we expect the user to provide)
// and then will generate a list of subjects, and then a joke for
// each subject
const OverallState = Annotation.Root({
  topic: Annotation<string>,
  subjects: Annotation<string[]>,
  // Notice here we pass a reducer function.
  // This is because we want combine all the jokes we generate
  // from individual nodes back into one list.
  jokes: Annotation<string[]>({
    reducer: (state, update) => state.concat(update),
  }),
  bestSelectedJoke: Annotation<string>,
});

// This will be the state of the node that we will "map" all
// subjects to in order to generate a joke
interface JokeState {
  subject: string;
}

// This is the function we will use to generate the subjects of the jokes
const generateTopics = async (
  state: typeof OverallState.State
): Promise<Partial<typeof OverallState.State>> => {
  const prompt = subjectsPrompt.replace("topic", state.topic);
  const response = await model
    .withStructuredOutput(Subjects, { name: "subjects" })
    .invoke(prompt);
  return { subjects: response.subjects };
};

// Function to generate a joke
const generateJoke = async (state: JokeState): Promise<{ jokes: string[] }> => {
  const prompt = jokePrompt.replace("subject", state.subject);
  const response = await model
    .withStructuredOutput(Joke, { name: "joke" })
    .invoke(prompt);
  return { jokes: [response.joke] };
};

// Here we define the logic to map out over the generated subjects
// We will use this an edge in the graph
const continueToJokes = (state: typeof OverallState.State) => {
  // We will return a list of `Send` objects
  // Each `Send` object consists of the name of a node in the graph
  // as well as the state to send to that node
  return state.subjects.map((subject) => new Send("generateJoke", { subject }));
};

// Here we will judge the best joke
const bestJoke = async (
  state: typeof OverallState.State
): Promise<Partial<typeof OverallState.State>> => {
  const jokes = state.jokes.join("\n\n");
  const prompt = bestJokePrompt
    .replace("jokes", jokes)
    .replace("topic", state.topic);
  const response = await model
    .withStructuredOutput(BestJoke, { name: "best_joke" })
    .invoke(prompt);
  return { bestSelectedJoke: state.jokes[response.id] };
};

// Construct the graph: here we put everything together to construct our graph
const graph = new StateGraph(OverallState)
  .addNode("generateTopics", generateTopics)
  .addNode("generateJoke", generateJoke)
  .addNode("bestJoke", bestJoke)
  .addEdge(START, "generateTopics")
  .addConditionalEdges("generateTopics", continueToJokes)
  .addEdge("generateJoke", "bestJoke")
  .addEdge("bestJoke", END);

const app = graph.compile();
import * as tslab from "tslab";

const representation = app.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();

tslab.display.png(new Uint8Array(arrayBuffer));

// Call the graph: here we call it to generate a list of jokes
for await (const s of await app.stream({ topic: "animals" })) {
  console.log(s);
}
{
  generateTopics: { subjects: [ 'lion', 'elephant', 'penguin', 'dolphin' ] }
}
{
  generateJoke: {
    jokes: [ "Why don't lions like fast food? Because they can't catch it!" ]
  }
}
{
  generateJoke: {
    jokes: [
      "Why don't elephants use computers? Because they're afraid of the mouse!"
    ]
  }
}
{
  generateJoke: {
    jokes: [
      "Why don't dolphins use smartphones? They're afraid of phishing!"
    ]
  }
}
{
  generateJoke: {
    jokes: [
      "Why don't you see penguins in Britain? Because they're afraid of Wales!"
    ]
  }
}
{
  bestJoke: {
    bestSelectedJoke: "Why don't elephants use computers? Because they're afraid of the mouse!"
  }
}