如何创建用于并行执行的 Map-Reduce 分支¶
Map-Reduce 操作对于高效的任务分解和并行处理至关重要。这种方法涉及将任务分解成更小的子任务,并行处理每个子任务,并在所有完成的子任务中聚合结果。
考虑以下示例:给定用户提供的一般主题,生成一个相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。在此设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),并且我们希望将其他节点(例如,生成笑话)应用于所有这些对象(例如,主题)。但是,会遇到两个主要挑战。
(1) 当我们布局图时,对象的数目(例如,主题)可能事先未知(意味着边缘数目可能未知),以及 (2) 下游节点的输入状态应该不同(为每个生成的 对象提供一个)。
LangGraph 通过其 Send
API 解决这些挑战。通过利用条件边,Send
可以将不同的状态(例如,主题)分发到节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以不同于核心图的状态,从而实现灵活和动态的工作流管理。
设置¶
首先,让我们安装所需的软件包并设置 API 密钥
在 [1] 中
已复制!
%%capture --no-stderr
%pip install -U langchain-anthropic langgraph
%%capture --no-stderr %pip install -U langchain-anthropic langgraph
在 [1] 中
已复制!
import os
import getpass
def _set_env(name: str):
if not os.getenv(name):
os.environ[name] = getpass.getpass(f"{name}: ")
_set_env("ANTHROPIC_API_KEY")
import os import getpass def _set_env(name: str): if not os.getenv(name): os.environ[name] = getpass.getpass(f"{name}: ") _set_env("ANTHROPIC_API_KEY")
定义图¶
将 Pydantic 与 LangChain 一起使用
此笔记本使用 Pydantic v2 BaseModel
,它需要 langchain-core >= 0.3
。使用 langchain-core < 0.3
将导致错误,因为 Pydantic v1 和 v2 BaseModels
混合在一起。
在 [1] 中
已复制!
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langchain_anthropic import ChatAnthropic
from langgraph.types import Send
from langgraph.graph import END, StateGraph, START
from pydantic import BaseModel, Field
# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.
{jokes}"""
class Subjects(BaseModel):
subjects: list[str]
class Joke(BaseModel):
joke: str
class BestJoke(BaseModel):
id: int = Field(description="Index of the best joke, starting with 0", ge=0)
model = ChatAnthropic(model="claude-3-5-sonnet-20240620")
# Graph components: define the components that will make up the graph
# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
topic: str
subjects: list
# Notice here we use the operator.add
# This is because we want combine all the jokes we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
jokes: Annotated[list, operator.add]
best_selected_joke: str
# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
subject: str
# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
# Here we define the logic to map out over the generated subjects
# We will use this an edge in the graph
def continue_to_jokes(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# Here we will judge the best joke
def best_joke(state: OverallState):
jokes = "\n\n".join(state["jokes"])
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
return {"best_selected_joke": state["jokes"][response.id]}
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()
import operator from typing import Annotated from typing_extensions import TypedDict from langchain_anthropic import ChatAnthropic from langgraph.types import Send from langgraph.graph import END, StateGraph, START from pydantic import BaseModel, Field # 模型和提示 # 定义将使用的模型和提示 subjects_prompt = """生成一个包含 2 到 5 个与以下内容相关的示例的逗号分隔列表:{topic}.""" joke_prompt = """生成一个关于 {subject} 的笑话""" best_joke_prompt = """以下是关于 {topic} 的一些笑话。选择最佳的!返回最佳笑话的 ID。{jokes}""" class Subjects(BaseModel): subjects: list[str] class Joke(BaseModel): joke: str class BestJoke(BaseModel): id: int = Field(description="最佳笑话的索引,从 0 开始", ge=0) model = ChatAnthropic(model="claude-3-5-sonnet-20240620") # 图组件:定义构成图的组件 # 这将是主图的整体状态。 # 它将包含一个主题(我们期望用户提供), # 然后将生成一个主题列表,然后为 # 每个主题生成一个笑话 class OverallState(TypedDict): topic: str subjects: list # 请注意,这里我们使用 operator.add # 这是因为我们希望将从单个节点生成的 所有笑话 # 合并到一个列表中 - 这本质上 # 是“reduce”部分 jokes: Annotated[list, operator.add] best_selected_joke: str # 这将是我们将“映射”所有 # 主题以生成笑话的节点的状态 class JokeState(TypedDict): subject: str # 这是我们将用来生成笑话主题的函数 def generate_topics(state: OverallState): prompt = subjects_prompt.format(topic=state["topic"]) response = model.with_structured_output(Subjects).invoke(prompt) return {"subjects": response.subjects} # 在这里,我们根据主题生成一个笑话 def generate_joke(state: JokeState): prompt = joke_prompt.format(subject=state["subject"]) response = model.with_structured_output(Joke).invoke(prompt) return {"jokes": [response.joke]} # 在这里,我们定义了将所有生成的主题映射的逻辑 # 我们将在图中使用它作为一条边 def continue_to_jokes(state: OverallState): # 我们将返回一个 `Send` 对象列表 # 每个 `Send` 对象都包含图中节点的名称 # 以及要发送到该节点的状态 return [Send("generate_joke", {"subject": s}) for s in state["subjects"]] # 在这里,我们将评判最佳的笑话 def best_joke(state: OverallState): jokes = "\n\n".join(state["jokes"]) prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes) response = model.with_structured_output(BestJoke).invoke(prompt) return {"best_selected_joke": state["jokes"][response.id]} # 构建图:在这里,我们将所有内容放在一起构建我们的图 graph = StateGraph(OverallState) graph.add_node("generate_topics", generate_topics) graph.add_node("generate_joke", generate_joke) graph.add_node("best_joke", best_joke) graph.add_edge(START, "generate_topics") graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"]) graph.add_edge("generate_joke", "best_joke") graph.add_edge("best_joke", END) app = graph.compile()
在 [2] 中
已复制!
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
from IPython.display import Image Image(app.get_graph().draw_mermaid_png())
Out[2]
使用图¶
在 [5] 中
已复制!
# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
print(s)
# 调用图:在这里,我们调用它来为 s in app.stream({"topic": "animals"}): print(s)
{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}} {'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}} {'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}} {'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}} {'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}} {'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}