设置¶
首先,让我们安装所需的软件包
在 [1]
已复制!
%%capture --no-stderr
%pip install -U langgraph
%%capture --no-stderr %pip install -U langgraph
简单示例¶
让我们考虑一个玩具示例:一个接受日志并执行两个独立子任务的系统。首先,它将对它们进行总结。其次,它将总结日志中捕获的任何故障模式。这两个操作将由两个不同的子图执行。
最重要的是要认识到图之间信息传递。入口图
是父图,两个子图中的每一个都定义为入口图
中的节点。两个子图都继承自父图入口图
的状态;我只需在子图状态中指定它即可访问每个子图中的docs
(参见图)。每个子图都可以拥有自己的私有状态。并且任何我想传播回父图入口图
的值(用于最终报告)只需要在我的入口图
状态中定义即可(例如,摘要报告
和故障报告
)。
定义子图¶
在 [2]
已复制!
from typing import Optional, Annotated
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
# The structure of the logs
class Logs(TypedDict):
id: str
question: str
answer: str
grade: Optional[int]
feedback: Optional[str]
# Define custom reducer (see more on this in the "Custom reducer" section below)
def add_logs(left: list[Logs], right: list[Logs]) -> list[Logs]:
if not left:
left = []
if not right:
right = []
logs = left.copy()
left_id_to_idx = {log["id"]: idx for idx, log in enumerate(logs)}
# update if the new logs are already in the state, otherwise append
for log in right:
idx = left_id_to_idx.get(log["id"])
if idx is not None:
logs[idx] = log
else:
logs.append(log)
return logs
# Failure Analysis Subgraph
class FailureAnalysisState(TypedDict):
# keys shared with the parent graph (EntryGraphState)
logs: Annotated[list[Logs], add_logs]
failure_report: str
# subgraph key
failures: list[Logs]
def get_failures(state: FailureAnalysisState):
failures = [log for log in state["logs"] if log["grade"] == 0]
return {"failures": failures}
def generate_summary(state: FailureAnalysisState):
failures = state["failures"]
# NOTE: you can implement custom summarization logic here
failure_ids = [log["id"] for log in failures]
fa_summary = f"Poor quality of retrieval for document IDs: {', '.join(failure_ids)}"
return {"failure_report": fa_summary}
fa_builder = StateGraph(FailureAnalysisState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
# Summarization subgraph
class QuestionSummarizationState(TypedDict):
# keys that are shared with the parent graph (EntryGraphState)
summary_report: str
logs: Annotated[list[Logs], add_logs]
# subgraph keys
summary: str
def generate_summary(state: QuestionSummarizationState):
docs = state["logs"]
# NOTE: you can implement custom summarization logic here
summary = "Questions focused on usage of ChatOllama and Chroma vector store."
return {"summary": summary}
def send_to_slack(state: QuestionSummarizationState):
summary = state["summary"]
# NOTE: you can implement custom logic here, for example sending the summary generated in the previous step to Slack
return {"summary_report": summary}
qs_builder = StateGraph(QuestionSummarizationState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
from typing import Optional, Annotated from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START, END # 日志的结构 class Logs(TypedDict): id: str question: str answer: str grade: Optional[int] feedback: Optional[str] # 定义自定义 Reducer(请参阅下面“自定义 Reducer”部分中的更多信息) def add_logs(left: list[Logs], right: list[Logs]) -> list[Logs]: if not left: left = [] if not right: right = [] logs = left.copy() left_id_to_idx = {log["id"]: idx for idx, log in enumerate(logs)} # 更新如果新日志已存在于状态中,否则追加 for log in right: idx = left_id_to_idx.get(log["id"]) if idx is not None: logs[idx] = log else: logs.append(log) return logs # 故障分析子图 class FailureAnalysisState(TypedDict): # 与父图(EntryGraphState)共享的键 logs: Annotated[list[Logs], add_logs] failure_report: str # 子图键 failures: list[Logs] def get_failures(state: FailureAnalysisState): failures = [log for log in state["logs"] if log["grade"] == 0] return {"failures": failures} def generate_summary(state: FailureAnalysisState): failures = state["failures"] # 注意:您可以在此处实现自定义摘要逻辑 failure_ids = [log["id"] for log in failures] fa_summary = f"检索文档 ID 的质量差:{', '.join(failure_ids)}" return {"failure_report": fa_summary} fa_builder = StateGraph(FailureAnalysisState) fa_builder.add_node("get_failures", get_failures) fa_builder.add_node("generate_summary", generate_summary) fa_builder.add_edge(START, "get_failures") fa_builder.add_edge("get_failures", "generate_summary") fa_builder.add_edge("generate_summary", END) # 摘要子图 class QuestionSummarizationState(TypedDict): # 与父图(EntryGraphState)共享的键 summary_report: str logs: Annotated[list[Logs], add_logs] # 子图键 summary: str def generate_summary(state: QuestionSummarizationState): docs = state["logs"] # 注意:您可以在此处实现自定义摘要逻辑 summary = "问题集中在 ChatOllama 和 Chroma 向量存储的使用上。" return {"summary": summary} def send_to_slack(state: QuestionSummarizationState): summary = state["summary"] # 注意:您可以在此处实现自定义逻辑,例如将上一步生成的摘要发送到 Slack return {"summary_report": summary} qs_builder = StateGraph(QuestionSummarizationState) qs_builder.add_node("generate_summary", generate_summary) qs_builder.add_node("send_to_slack", send_to_slack) qs_builder.add_edge(START, "generate_summary") qs_builder.add_edge("generate_summary", "send_to_slack") qs_builder.add_edge("send_to_slack", END)
请注意,每个子图都有自己的状态,QuestionSummarizationState
和 FailureAnalysisState
。
定义每个子图后,我们将所有内容放在一起。
定义父图¶
在 [3]
已复制!
# Dummy logs
dummy_logs = [
Logs(
id="1",
question="How can I import ChatOllama?",
grade=1,
answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
),
Logs(
id="2",
question="How can I use Chroma vector store?",
answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
grade=0,
feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
),
Logs(
id="3",
question="How do I create react agent in langgraph?",
answer="from langgraph.prebuilt import create_react_agent",
),
]
# Entry Graph
class EntryGraphState(TypedDict):
raw_logs: Annotated[list[Logs], add_logs]
logs: Annotated[list[Logs], add_logs] # This will be used in subgraphs
failure_report: str # This will be generated in the FA subgraph
summary_report: str # This will be generated in the QS subgraph
def select_logs(state):
return {"logs": [log for log in state["raw_logs"] if "grade" in log]}
entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("select_logs", select_logs)
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
entry_builder.add_edge(START, "select_logs")
entry_builder.add_edge("select_logs", "failure_analysis")
entry_builder.add_edge("select_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)
graph = entry_builder.compile()
from IPython.display import Image, display
# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# 虚拟日志 dummy_logs = [ Logs( id="1", question="如何导入 ChatOllama?", grade=1, answer="要导入 ChatOllama,请使用:'from langchain_community.chat_models import ChatOllama。'", ), Logs( id="2", question="如何使用 Chroma 向量存储?", answer="要使用 Chroma,请定义:rag_chain = create_retrieval_chain(retriever, question_answer_chain)。", grade=0, feedback="检索到的文档总体上讨论了向量存储,但没有具体讨论 Chroma", ), Logs( id="3", question="如何在 langgraph 中创建 React Agent?", answer="from langgraph.prebuilt import create_react_agent", ), ] # 入口图 class EntryGraphState(TypedDict): raw_logs: Annotated[list[Logs], add_logs] logs: Annotated[list[Logs], add_logs] # 这将在子图中使用 failure_report: str # 这将在 FA 子图中生成 summary_report: str # 这将在 QS 子图中生成 def select_logs(state): return {"logs": [log for log in state["raw_logs"] if "grade" in log]} entry_builder = StateGraph(EntryGraphState) entry_builder.add_node("select_logs", select_logs) entry_builder.add_node("question_summarization", qs_builder.compile()) entry_builder.add_node("failure_analysis", fa_builder.compile()) entry_builder.add_edge(START, "select_logs") entry_builder.add_edge("select_logs", "failure_analysis") entry_builder.add_edge("select_logs", "question_summarization") entry_builder.add_edge("failure_analysis", END) entry_builder.add_edge("question_summarization", END) graph = entry_builder.compile() from IPython.display import Image, display # 将 xray 设置为 1 将显示嵌套图的内部结构 display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
在 [4]
已复制!
graph.invoke({"raw_logs": dummy_logs}, debug=False)
graph.invoke({"raw_logs": dummy_logs}, debug=False)
Out[4]
{'raw_logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}, {'id': '3', 'question': 'How do I create react agent in langgraph?', 'answer': 'from langgraph.prebuilt import create_react_agent'}], 'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'failure_report': 'Poor quality of retrieval for document IDs: 2', 'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}
在 [5]
已复制!
from typing import Annotated
from typing_extensions import TypedDict
# define a simple reducer
def reduce_list(left: list, right: list) -> list:
if not left:
left = []
if not right:
right = []
return left + right
# define parent and child state
class ChildState(TypedDict):
name: str
path: Annotated[list[str], reduce_list]
class ParentState(TypedDict):
name: str
path: Annotated[list[str], reduce_list]
# define a helper to build the graph
def make_graph(parent_schema, child_schema):
child_builder = StateGraph(child_schema)
child_builder.add_node("child_start", lambda state: {"path": ["child_start"]})
child_builder.add_edge(START, "child_start")
child_builder.add_node("child_middle", lambda state: {"path": ["child_middle"]})
child_builder.add_node("child_end", lambda state: {"path": ["child_end"]})
child_builder.add_edge("child_start", "child_middle")
child_builder.add_edge("child_middle", "child_end")
child_builder.add_edge("child_end", END)
builder = StateGraph(parent_schema)
builder.add_node("grandparent", lambda state: {"path": ["grandparent"]})
builder.add_edge(START, "grandparent")
builder.add_node("parent", lambda state: {"path": ["parent"]})
builder.add_node("child", child_builder.compile())
builder.add_node("sibling", lambda state: {"path": ["sibling"]})
builder.add_node("fin", lambda state: {"path": ["fin"]})
# Add connections
builder.add_edge("grandparent", "parent")
builder.add_edge("parent", "child")
builder.add_edge("parent", "sibling")
builder.add_edge("child", "fin")
builder.add_edge("sibling", "fin")
builder.add_edge("fin", END)
graph = builder.compile()
return graph
graph = make_graph(ParentState, ChildState)
from typing import Annotated from typing_extensions import TypedDict # 定义一个简单的 Reducer def reduce_list(left: list, right: list) -> list: if not left: left = [] if not right: right = [] return left + right # 定义父和子状态 class ChildState(TypedDict): name: str path: Annotated[list[str], reduce_list] class ParentState(TypedDict): name: str path: Annotated[list[str], reduce_list] # 定义一个构建图的辅助函数 def make_graph(parent_schema, child_schema): child_builder = StateGraph(child_schema) child_builder.add_node("child_start", lambda state: {"path": ["child_start"]}) child_builder.add_edge(START, "child_start") child_builder.add_node("child_middle", lambda state: {"path": ["child_middle"]}) child_builder.add_node("child_end", lambda state: {"path": ["child_end"]}) child_builder.add_edge("child_start", "child_middle") child_builder.add_edge("child_middle", "child_end") child_builder.add_edge("child_end", END) builder = StateGraph(parent_schema) builder.add_node("grandparent", lambda state: {"path": ["grandparent"]}) builder.add_edge(START, "grandparent") builder.add_node("parent", lambda state: {"path": ["parent"]}) builder.add_node("child", child_builder.compile()) builder.add_node("sibling", lambda state: {"path": ["sibling"]}) builder.add_node("fin", lambda state: {"path": ["fin"]}) # 添加连接 builder.add_edge("grandparent", "parent") builder.add_edge("parent", "child") builder.add_edge("parent", "sibling") builder.add_edge("child", "fin") builder.add_edge("sibling", "fin") builder.add_edge("fin", END) graph = builder.compile() return graph graph = make_graph(ParentState, ChildState)
在 [6]
已复制!
from IPython.display import Image, display
# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
from IPython.display import Image, display # 将 xray 设置为 1 将显示嵌套图的内部结构 display(Image(graph.get_graph(xray=1).draw_mermaid_png()))