在 [ ]
已复制!
%%capture --no-stderr
%pip install -U langgraph langchain-openai
%%capture --no-stderr %pip install -U langgraph langchain-openai
在 [1]
已复制!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
在 [1]
已复制!
from typing import Optional, Annotated
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
# The structure of the logs
class Logs(TypedDict):
id: str
question: str
answer: str
grade: Optional[int]
feedback: Optional[str]
# Define custom reducer (see more on this in the "Custom reducer" section below)
def add_logs(left: list[Logs], right: list[Logs]) -> list[Logs]:
if not left:
left = []
if not right:
right = []
logs = left.copy()
left_id_to_idx = {log["id"]: idx for idx, log in enumerate(logs)}
# update if the new logs are already in the state, otherwise append
for log in right:
idx = left_id_to_idx.get(log["id"])
if idx is not None:
logs[idx] = log
else:
logs.append(log)
return logs
# Failure Analysis Subgraph
class FailureAnalysisState(TypedDict):
# keys shared with the parent graph (EntryGraphState)
logs: Annotated[list[Logs], add_logs]
failure_report: str
# subgraph key
failures: list[Logs]
def get_failures(state: FailureAnalysisState):
failures = [log for log in state["logs"] if log["grade"] == 0]
return {"failures": failures}
def generate_summary(state: FailureAnalysisState):
failures = state["failures"]
# NOTE: you can implement custom summarization logic here
failure_ids = [log["id"] for log in failures]
fa_summary = f"Poor quality of retrieval for document IDs: {', '.join(failure_ids)}"
return {"failure_report": fa_summary}
fa_builder = StateGraph(FailureAnalysisState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
# Summarization subgraph
class QuestionSummarizationState(TypedDict):
# keys that are shared with the parent graph (EntryGraphState)
summary_report: str
logs: Annotated[list[Logs], add_logs]
# subgraph keys
summary: str
def generate_summary(state: QuestionSummarizationState):
docs = state["logs"]
# NOTE: you can implement custom summarization logic here
summary = "Questions focused on usage of ChatOllama and Chroma vector store."
return {"summary": summary}
def send_to_slack(state: QuestionSummarizationState):
summary = state["summary"]
# NOTE: you can implement custom logic here, for example sending the summary generated in the previous step to Slack
return {"summary_report": summary}
qs_builder = StateGraph(QuestionSummarizationState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
from typing import Optional, Annotated from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START, END # 日志的结构 class Logs(TypedDict): id: str question: str answer: str grade: Optional[int] feedback: Optional[str] # 定义自定义 reducer(有关此内容的更多信息,请参阅下面的“自定义 reducer”部分) def add_logs(left: list[Logs], right: list[Logs]) -> list[Logs]: if not left: left = [] if not right: right = [] logs = left.copy() left_id_to_idx = {log["id"]: idx for idx, log in enumerate(logs)} # 如果新日志已存在于状态中,则更新,否则追加 for log in right: idx = left_id_to_idx.get(log["id"]) if idx is not None: logs[idx] = log else: logs.append(log) return logs # 故障分析子图 class FailureAnalysisState(TypedDict): # 与父图共享的键(EntryGraphState) logs: Annotated[list[Logs], add_logs] failure_report: str # 子图键 failures: list[Logs] def get_failures(state: FailureAnalysisState): failures = [log for log in state["logs"] if log["grade"] == 0] return {"failures": failures} def generate_summary(state: FailureAnalysisState): failures = state["failures"] # 注意:您可以在此处实现自定义摘要逻辑 failure_ids = [log["id"] for log in failures] fa_summary = f"检索文档 ID 质量低下:{', '.join(failure_ids)}" return {"failure_report": fa_summary} fa_builder = StateGraph(FailureAnalysisState) fa_builder.add_node("get_failures", get_failures) fa_builder.add_node("generate_summary", generate_summary) fa_builder.add_edge(START, "get_failures") fa_builder.add_edge("get_failures", "generate_summary") fa_builder.add_edge("generate_summary", END) # 摘要子图 class QuestionSummarizationState(TypedDict): # 与父图共享的键(EntryGraphState) summary_report: str logs: Annotated[list[Logs], add_logs] # 子图键 summary: str def generate_summary(state: QuestionSummarizationState): docs = state["logs"] # 注意:您可以在此处实现自定义摘要逻辑 summary = "问题集中于 ChatOllama 和 Chroma 向量存储的使用。" return {"summary": summary} def send_to_slack(state: QuestionSummarizationState): summary = state["summary"] # 注意:您可以在此处实现自定义逻辑,例如将上一步骤生成的摘要发送到 Slack return {"summary_report": summary} qs_builder = StateGraph(QuestionSummarizationState) qs_builder.add_node("generate_summary", generate_summary) qs_builder.add_node("send_to_slack", send_to_slack) qs_builder.add_edge(START, "generate_summary") qs_builder.add_edge("generate_summary", "send_to_slack") qs_builder.add_edge("send_to_slack", END)
定义父图¶
在 [2]
已复制!
# Entry Graph
class EntryGraphState(TypedDict):
raw_logs: Annotated[list[Logs], add_logs]
logs: Annotated[list[Logs], add_logs] # This will be used in subgraphs
failure_report: str # This will be generated in the FA subgraph
summary_report: str # This will be generated in the QS subgraph
def select_logs(state):
return {"logs": [log for log in state["raw_logs"] if "grade" in log]}
entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("select_logs", select_logs)
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
entry_builder.add_edge(START, "select_logs")
entry_builder.add_edge("select_logs", "failure_analysis")
entry_builder.add_edge("select_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)
graph = entry_builder.compile()
from IPython.display import Image, display
# Setting xray to 1 will show the internal structure of the nested graph
display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
# 入口图 class EntryGraphState(TypedDict): raw_logs: Annotated[list[Logs], add_logs] logs: Annotated[list[Logs], add_logs] # 这将在子图中使用 failure_report: str # 这将在 FA 子图中生成 summary_report: str # 这将在 QS 子图中生成 def select_logs(state): return {"logs": [log for log in state["raw_logs"] if "grade" in log]} entry_builder = StateGraph(EntryGraphState) entry_builder.add_node("select_logs", select_logs) entry_builder.add_node("question_summarization", qs_builder.compile()) entry_builder.add_node("failure_analysis", fa_builder.compile()) entry_builder.add_edge(START, "select_logs") entry_builder.add_edge("select_logs", "failure_analysis") entry_builder.add_edge("select_logs", "question_summarization") entry_builder.add_edge("failure_analysis", END) entry_builder.add_edge("question_summarization", END) graph = entry_builder.compile() from IPython.display import Image, display # 将 xray 设置为 1 将显示嵌套图的内部结构 display(Image(graph.get_graph(xray=1).draw_mermaid_png()))
在 [4]
已复制!
# Dummy logs
dummy_logs = [
Logs(
id="1",
question="How can I import ChatOllama?",
grade=1,
answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
),
Logs(
id="2",
question="How can I use Chroma vector store?",
answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
grade=0,
feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
),
Logs(
id="3",
question="How do I create react agent in langgraph?",
answer="from langgraph.prebuilt import create_react_agent",
),
]
input = {"raw_logs": dummy_logs}
# 虚拟日志 dummy_logs = [ Logs( id="1", question="如何导入 ChatOllama?", grade=1, answer="要导入 ChatOllama,请使用:'from langchain_community.chat_models import ChatOllama.'", ), Logs( id="2", question="如何使用 Chroma 向量存储?", answer="要使用 Chroma,请定义:rag_chain = create_retrieval_chain(retriever, question_answer_chain).", grade=0, feedback="检索到的文档讨论了一般向量存储,但没有专门讨论 Chroma", ), Logs( id="3", question="如何在 langgraph 中创建 react 代理?", answer="from langgraph.prebuilt import create_react_agent", ), ] input = {"raw_logs": dummy_logs}
正常流式传输¶
首先,让我们检查正常流式传输的输出
在 [5]
已复制!
for chunk in graph.stream(input, stream_mode="updates"):
node_name = list(chunk.keys())[0]
print(f"---------- Update from node {node_name} ---------")
print(chunk[node_name])
for chunk in graph.stream(input, stream_mode="updates"): node_name = list(chunk.keys())[0] print(f"---------- 来自节点 {node_name} 的更新 ---------") print(chunk[node_name])
---------- Update from node select_logs --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]} ---------- Update from node failure_analysis --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'failure_report': 'Poor quality of retrieval for document IDs: 2'} ---------- Update from node question_summarization --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}
在 [12]
已复制!
# Format the namespace slightly nicer
def format_namespace(namespace):
return (
namespace[-1].split(":")[0] + " subgraph"
if len(namespace) > 0
else "parent graph"
)
for namespace, chunk in graph.stream(input, stream_mode="updates", subgraphs=True):
node_name = list(chunk.keys())[0]
print(
f"---------- Update from node {node_name} in {format_namespace(namespace)} ---------"
)
print(chunk[node_name])
# 格式化命名空间更友好一些 def format_namespace(namespace): return ( namespace[-1].split(":")[0] + " 子图" if len(namespace) > 0 else "父图" ) for namespace, chunk in graph.stream(input, stream_mode="updates", subgraphs=True): node_name = list(chunk.keys())[0] print( f"---------- 来自节点 {node_name} 的更新,位于 {format_namespace(namespace)} 中 ---------" ) print(chunk[node_name])
---------- Update from node select_logs in parent graph --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]} ---------- Update from node get_failures in failure_analysis subgraph --------- {'failures': [{'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]} ---------- Update from node generate_summary in failure_analysis subgraph --------- {'failure_report': 'Poor quality of retrieval for document IDs: 2'} ---------- Update from node failure_analysis in parent graph --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'failure_report': 'Poor quality of retrieval for document IDs: 2'} ---------- Update from node generate_summary in question_summarization subgraph --------- {'summary': 'Questions focused on usage of ChatOllama and Chroma vector store.'} ---------- Update from node send_to_slack in question_summarization subgraph --------- {'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'} ---------- Update from node question_summarization in parent graph --------- {'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}
您会注意到的第一件事是,我们不再只收到块,我们还收到命名空间,这些命名空间告诉我们当前位于哪个子图中。
如果您仔细查看日志,您会发现我们现在收到了每个子图内部节点进行的更新,例如,我们现在看到了 get_failure
节点对 summary_report
状态通道进行的更新,该节点位于 failure_analysis
子图中。当我们没有设置 subgraphs=True
时,我们只看到子图 failure_analysis
进行的整体更新。