跳到内容

如何使用图 API

本指南演示了 LangGraph 的 Graph API 的基础知识。它将介绍状态,以及如何组合常见的图结构,例如序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 Map-Reduce 工作流的Send API,以及用于结合状态更新和节点间“跳转”的Command API

设置

安装 langgraph

pip install -U langgraph

设置 LangSmith 以便更好地调试

注册 LangSmith 以快速发现问题并提高您的 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控您使用 LangGraph 构建的 LLM 应用程序——在文档中了解更多入门信息。

定义和更新状态

在这里,我们将展示如何在 LangGraph 中定义和更新状态。我们将演示

  1. 如何使用状态定义图的模式
  2. 如何使用Reducer控制状态更新的处理方式。

定义状态

LangGraph 中的状态可以是 TypedDictPydantic 模型或数据类。下面我们将使用 TypedDict。有关使用 Pydantic 的详细信息,请参阅本节

默认情况下,图将具有相同的输入和输出模式,并且状态决定了该模式。有关如何定义不同的输入和输出模式,请参阅本节

让我们考虑一个使用消息的简单示例。这代表了许多 LLM 应用程序的一种通用状态表述。有关更多详细信息,请参阅我们的概念页面

API 参考:AnyMessage

from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int

此状态跟踪一个消息对象列表,以及一个额外的整数字段。

更新状态

让我们构建一个带有一个节点的示例图。我们的节点只是一个 Python 函数,它读取图的状态并对其进行更新。此函数的第一个参数将始终是状态

API 参考:AIMessage

from langchain_core.messages import AIMessage

def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("Hello!")
    return {"messages": messages + [new_message], "extra_field": 10}

此节点只是将一条消息附加到我们的消息列表中,并填充一个额外的字段。

重要

节点应直接返回状态更新,而不是修改状态。

接下来,我们定义一个包含此节点的简单图。我们使用StateGraph来定义一个在此状态上操作的图。然后我们使用add_node来填充我们的图。

API 参考:StateGraph

from langgraph.graph import StateGraph

builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()

LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查一下我们的图。有关可视化的详细信息,请参阅本节

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Simple graph with single node

在这种情况下,我们的图只执行一个节点。让我们进行一次简单的调用

API 参考:HumanMessage

from langchain_core.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}

请注意

  • 我们通过更新状态的单个键来启动调用。
  • 我们在调用结果中接收到整个状态。

为方便起见,我们经常通过美观打印来检查消息对象的内容

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

使用 Reducer 处理状态更新

状态中的每个键都可以有自己独立的Reducer函数,它控制如何应用来自节点的更新。如果未明确指定 Reducer 函数,则假定对键的所有更新都应覆盖它。

对于 TypedDict 状态模式,我们可以通过使用 Reducer 函数注释状态的相应字段来定义 Reducer。

在前面的示例中,我们的节点通过向状态中的 "messages" 键附加消息来更新它。下面,我们为此键添加一个 Reducer,以便更新自动附加

from typing_extensions import Annotated

def add(left, right):
    """Can also import `add` from the `operator` built-in."""
    return left + right

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add]
    extra_field: int

现在我们的节点可以简化了

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}
API 参考:START

from langgraph.graph import START

graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()

result = graph.invoke({"messages": [HumanMessage("Hi")]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

MessagesState

在实践中,更新消息列表还有其他考虑因素

LangGraph 包含一个内置的 Reducer add_messages 来处理这些考虑因素

API 参考:add_messages

from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    extra_field: int

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

graph = StateGraph(State).add_node(node).set_entry_point("node").compile()

input_message = {"role": "user", "content": "Hi"}

result = graph.invoke({"messages": [input_message]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

这是用于涉及聊天模型的应用程序的多功能状态表示。LangGraph 为了方便起见,包含了一个预构建的 MessagesState,因此我们可以拥有

from langgraph.graph import MessagesState

class State(MessagesState):
    extra_field: int

定义输入和输出模式

默认情况下,StateGraph 使用单个模式进行操作,所有节点都期望使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。

当指定不同的模式时,内部模式仍将用于节点之间的通信。输入模式确保提供的输入与预期结构匹配,而输出模式则过滤内部数据,以根据定义的输出模式仅返回相关信息。

下面,我们将看到如何定义不同的输入和输出模式。

API 参考:StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# Define the schema for the input
class InputState(TypedDict):
    question: str

# Define the schema for the output
class OutputState(TypedDict):
    answer: str

# Define the overall schema, combining both input and output
class OverallState(InputState, OutputState):
    pass

# Define the node that processes the input and generates an answer
def answer_node(state: InputState):
    # Example answer and an extra key
    return {"answer": "bye", "question": state["question"]}

# Build the graph with input and output schemas specified
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node)  # Add the answer node
builder.add_edge(START, "answer_node")  # Define the starting edge
builder.add_edge("answer_node", END)  # Define the ending edge
graph = builder.compile()  # Compile the graph

# Invoke the graph with an input and print the result
print(graph.invoke({"question": "hi"}))
{'answer': 'bye'}

请注意,调用的输出仅包含输出模式。

在节点间传递私有状态

在某些情况下,您可能希望节点交换对中间逻辑至关重要但无需成为图主模式一部分的信息。此私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。

下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在第一步和第二步(node_1 和 node_2)之间传递,而第三步(node_3)仅能访问公共整体状态。

API 参考:StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):
    a: str

# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):
    private_data: str

# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:
    output = {"private_data": "set by node_1"}
    print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):
    private_data: str

def node_2(state: Node2Input) -> OverallState:
    output = {"a": "set by node_2"}
    print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:
    output = {"a": "set by node_3"}
    print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()

# Invoke the graph with the initial state
response = graph.invoke(
    {
        "a": "set at start",
    }
)

print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
    Input: {'a': 'set at start'}.
    Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
    Input: {'private_data': 'set by node_1'}.
    Returned: {'a': 'set by node_2'}
Entered node `node_3`:
    Input: {'a': 'set by node_2'}.
    Returned: {'a': 'set by node_3'}

Output of graph invocation: {'a': 'set by node_3'}

将 Pydantic 模型用于图状态

在初始化时,StateGraph 接受一个 state_schema 参数,该参数指定图中节点可以访问和更新的状态的“形状”。

在我们的示例中,我们通常将 python 原生 TypedDict 用于 state_schema,但 state_schema 可以是任何类型

在这里,我们将看到如何将 Pydantic BaseModel 用于 state_schema,以便对输入添加运行时验证。

已知限制

  • 目前,图的输出将不是 Pydantic 模型实例。
  • 运行时验证仅发生在节点输入上,而不发生在输出上。
  • Pydantic 的验证错误跟踪不显示错误发生在哪一个节点。

API 参考:StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):
    a: str

def node(state: OverallState):
    return {"a": "goodbye"}

# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 is the first node
builder.add_edge(START, "node")  # Start the graph with node_1
builder.add_edge("node", END)  # End the graph after node_1
graph = builder.compile()

# Test the graph with a valid input
graph.invoke({"a": "hello"})

使用无效输入调用图

try:
    graph.invoke({"a": 123})  # Should be a string
except Exception as e:
    print("An exception was raised because `a` is an integer rather than a string.")
    print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
  Input should be a valid string [type=string_type, input_value=123, input_type=int]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type

有关 Pydantic 模型状态的其他功能,请参阅下文

序列化行为

当使用 Pydantic 模型作为状态模式时,理解序列化如何工作非常重要,尤其是在:- 将 Pydantic 对象作为输入传递时 - 从图中接收输出时 - 使用嵌套 Pydantic 模型时

让我们看看这些行为的实际应用。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class NestedModel(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: NestedModel

def process_node(state: ComplexState):
    # Node receives a validated Pydantic object
    print(f"Input state type: {type(state)}")
    print(f"Nested type: {type(state.nested)}")
    # Return a dictionary update
    return {"text": state.text + " processed", "count": state.count + 1}

# Build the graph
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

# Create a Pydantic instance for input
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")

# Invoke graph with a Pydantic instance
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")

# Convert back to Pydantic model if needed
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
运行时类型强制转换

Pydantic 对某些数据类型执行运行时类型强制转换。这可能很有用,但如果您不了解它,也可能导致意外行为。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class CoercionExample(BaseModel):
    # Pydantic will coerce string numbers to integers
    number: int
    # Pydantic will parse string booleans to bool
    flag: bool

def inspect_node(state: CoercionExample):
    print(f"number: {state.number} (type: {type(state.number)})")
    print(f"flag: {state.flag} (type: {type(state.flag)})")
    return {}

builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()

# Demonstrate coercion with string inputs that will be converted
result = graph.invoke({"number": "42", "flag": "true"})

# This would fail with a validation error
try:
    graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
    print(f"\nExpected validation error: {e}")
使用消息模型

在状态模式中使用 LangChain 消息类型时,序列化有一些重要的考虑因素。在使用消息对象进行传输时,您应该使用 AnyMessage(而不是 BaseMessage)以实现正确的序列化/反序列化。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from typing import List

class ChatState(BaseModel):
    messages: List[AnyMessage]
    context: str

def add_message(state: ChatState):
    return {"messages": state.messages + [AIMessage(content="Hello there!")]}

builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()

# Create input with a message
initial_state = ChatState(
    messages=[HumanMessage(content="Hi")], context="Customer support chat"
)

result = graph.invoke(initial_state)
print(f"Output: {result}")

# Convert back to Pydantic model to see message types
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
    print(f"Message {i}: {type(msg).__name__} - {msg.content}")

添加运行时配置

有时您希望在调用图时能够对其进行配置。例如,您可能希望能够在运行时指定要使用的 LLM 或系统提示,而不会用这些参数污染图状态

添加运行时配置

  1. 为您的配置指定一个模式
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递到图中。

请参阅下面的简单示例

API 参考:RunnableConfig | END | StateGraph | START

from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict

# 1. Specify config schema
class ConfigSchema(TypedDict):
    my_runtime_value: str

# 2. Define a graph that accesses the config in a node
class State(TypedDict):
    my_state_value: str

def node(state: State, config: RunnableConfig):
    if config["configurable"]["my_runtime_value"] == "a":
        return {"my_state_value": 1}
    elif config["configurable"]["my_runtime_value"] == "b":
        return {"my_state_value": 2}
    else:
        raise ValueError("Unknown values.")

builder = StateGraph(State, config_schema=ConfigSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)

graph = builder.compile()

# 3. Pass in configuration at runtime:
print(graph.invoke({}, {"configurable": {"my_runtime_value": "a"}}))
print(graph.invoke({}, {"configurable": {"my_runtime_value": "b"}}))
{'my_state_value': 1}
{'my_state_value': 2}

扩展示例:在运行时指定 LLM

下面我们演示一个实际示例,其中我们配置在运行时使用哪个 LLM。我们将使用 OpenAI 和 Anthropic 模型。

from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig
from langgraph.graph import MessagesState
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict

class ConfigSchema(TypedDict):
    model: str

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, config: RunnableConfig):
    model = config["configurable"].get("model", "anthropic")
    model = MODELS[model]
    response = model.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
# With no configuration, uses default (Anthropic)
response_1 = graph.invoke({"messages": [input_message]})["messages"][-1]
# Or, can set OpenAI
config = {"configurable": {"model": "openai"}}
response_2 = graph.invoke({"messages": [input_message]}, config=config)["messages"][-1]

print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
claude-3-5-haiku-20241022
gpt-4.1-mini-2025-04-14

扩展示例:在运行时指定模型和系统消息

下面我们演示一个实际示例,其中我们配置两个参数:在运行时使用的 LLM 和系统消息。

from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, MessagesState, StateGraph, START
from typing_extensions import TypedDict

class ConfigSchema(TypedDict):
    model: Optional[str]
    system_message: Optional[str]

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, config: RunnableConfig):
    model = config["configurable"].get("model", "anthropic")
    model = MODELS[model]
    messages = state["messages"]
    if system_message := config["configurable"].get("system_message"):
        messages = [SystemMessage(system_message)] + messages
    response = model.invoke(messages)
    return {"messages": [response]}

builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
config = {"configurable": {"model": "openai", "system_message": "Respond in Italian."}}
response = graph.invoke({"messages": [input_message]}, config)
for message in response["messages"]:
    message.pretty_print()
================================ Human Message ================================

hi
================================== Ai Message ==================================

Ciao! Come posso aiutarti oggi?

添加重试策略

在许多用例中,您可能希望您的节点具有自定义重试策略,例如,如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您向节点添加重试策略。

要配置重试策略,请将 retry_policy 参数传递给 add_noderetry_policy 参数接受一个 RetryPolicy 命名元组对象。下面我们使用默认参数实例化一个 RetryPolicy 对象并将其与一个节点关联

from langgraph.pregel import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry_policy=RetryPolicy(),
)

默认情况下,retry_on 参数使用 default_retry_on 函数,该函数对除以下异常之外的任何异常进行重试

  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError

此外,对于来自流行的 http 请求库(例如 requestshttpx)的异常,它仅对 5xx 状态码进行重试。

扩展示例:自定义重试策略

考虑一个我们正在从 SQL 数据库读取的示例。下面我们将两种不同的重试策略传递给节点

import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# Define a new graph
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()

添加节点缓存

节点缓存对于您希望避免重复操作的情况很有用,例如执行耗时或耗费成本的操作时。LangGraph 允许您向图中的节点添加个性化缓存策略。

要配置缓存策略,请将 cache_policy 参数传递给 add_node 函数。在以下示例中,一个 CachePolicy 对象被实例化,其生存时间为 120 秒,并使用默认的 key_func 生成器。然后将其与一个节点关联

from langgraph.types import CachePolicy

builder.add_node(
    "node_name",
    node_function,
    cache_policy=CachePolicy(ttl=120),
)

然后,要为图启用节点级缓存,请在编译图时设置 cache 参数。以下示例使用 InMemoryCache 设置带有内存缓存的图,但 SqliteCache 也可用。

from langgraph.cache.memory import InMemoryCache

graph = builder.compile(cache=InMemoryCache())

创建步骤序列

先决条件

本指南假定您熟悉上面关于状态的部分。

在这里,我们演示如何构建一个简单的步骤序列。我们将展示

  1. 如何构建顺序图
  2. 用于构建类似图的内置简写。

要添加节点序列,我们使用我们的.add_node.add_edge 方法

API 参考:START | StateGraph

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

我们还可以使用内置的简写 .add_sequence

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
为什么使用 LangGraph 将应用程序步骤拆分为序列?

LangGraph 使得为您的应用程序添加底层持久化层变得容易。这允许在节点执行之间对状态进行检查点,因此您的 LangGraph 节点管理

它们还决定了执行步骤如何流式传输,以及如何使用LangGraph Studio可视化和调试您的应用程序。

让我们演示一个端到端示例。我们将创建三个步骤的序列

  1. 在状态的键中填充一个值
  2. 更新相同的值
  3. 填充不同的值

让我们首先定义我们的状态。这管理图的模式,并且还可以指定如何应用更新。有关更多详细信息,请参阅本节

在我们的例子中,我们将只跟踪两个值

from typing_extensions import TypedDict

class State(TypedDict):
    value_1: str
    value_2: int

我们的节点只是 Python 函数,它们读取图的状态并对其进行更新。此函数的第一个参数将始终是状态

def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}

注意

请注意,在发布状态更新时,每个节点只需指定它希望更新的键的值。

默认情况下,这将覆盖相应键的值。您还可以使用Reducer来控制更新的处理方式——例如,您可以将连续的更新附加到键而不是覆盖它。有关更多详细信息,请参阅本节

最后,我们定义图。我们使用StateGraph来定义一个在此状态上操作的图。

然后,我们将使用add_nodeadd_edge来填充我们的图并定义其控制流。

API 参考:START | StateGraph

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

指定自定义名称

您可以使用 .add_node 为节点指定自定义名称

builder.add_node("my_node", step_1)

请注意

  • .add_edge 接受节点名称,对于函数,默认为 node.__name__
  • 我们必须指定图的入口点。为此,我们添加一条与START 节点的边。
  • 当没有更多节点可执行时,图将停止。

接下来,我们编译我们的图。这提供了一些关于图结构的基本检查(例如,识别孤立节点)。如果我们要通过检查点为我们的应用程序添加持久性,它也会在这里传递。

graph = builder.compile()

LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查一下我们的序列。有关可视化的详细信息,请参阅本指南

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Sequence of steps graph

让我们进行一次简单的调用

graph.invoke({"value_1": "c"})
{'value_1': 'a b', 'value_2': 10}

请注意

  • 我们通过为单个状态键提供值来启动调用。我们必须始终为至少一个键提供值。
  • 我们传入的值被第一个节点覆盖了。
  • 第二个节点更新了值。
  • 第三个节点填充了不同的值。

内置简写

langgraph>=0.2.46 包含一个内置的简写 add_sequence,用于添加节点序列。您可以按如下方式编译相同的图

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

graph = builder.compile()

graph.invoke({"value_1": "c"})    

创建分支

节点的并行执行对于加速整体图操作至关重要。LangGraph 提供对节点并行执行的原生支持,这可以显著提高基于图的工作流的性能。这种并行化通过扇出和扇入机制实现,同时利用标准边和条件边。下面是一些示例,展示如何添加创建适合您的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。通过我们的状态,我们指定 Reducer 添加操作。这将组合或累积状态中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用 Reducer 更新状态的更多详细信息,请参阅上面关于状态 Reducer的部分。

API 参考:StateGraph | START | END

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Parallel execution graph

使用 Reducer,您可以看到每个节点中添加的值都被累积。

graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']

注意

在上面的示例中,节点 "b""c" 在同一个超步中并发执行。因为它们在同一个步骤中,所以节点 "d""b""c" 都完成后执行。

重要的是,来自并行超步的更新可能无法保持一致的顺序。如果您需要来自并行超步的一致、预定顺序的更新,您应该将输出与用于排序的值一起写入状态的单独字段中。

异常处理?

LangGraph 在超步中执行节点,这意味着虽然并行分支是并行执行的,但整个超步是事务性的。如果这些分支中的任何一个引发异常,则所有更新都不会应用于状态(整个超步都会出错)。

重要的是,当使用检查点时,超步内成功节点的结果会被保存,并且在恢复时不会重复。

如果您容易出错(可能希望处理不稳定的 API 调用),LangGraph 提供了两种方法来解决此问题

  1. 您可以在节点内编写常规 Python 代码来捕获和处理异常。
  2. 您可以设置一个重试策略来指示图重试引发某些类型异常的节点。只有失败的分支会被重试,因此您不必担心执行冗余工作。

总之,这些允许您执行并行执行并完全控制异常处理。

延迟节点执行

当您希望延迟节点的执行直到所有其他待处理任务完成时,延迟节点执行很有用。这在分支具有不同长度时尤其相关,这在诸如 map-reduce 流等工作流中很常见。

上面的示例展示了当每个路径只有一个步骤时如何扇出和扇入。但是如果一个分支有多个步骤呢?让我们在 "b" 分支中添加一个节点 "b_2"

API 参考:StateGraph | START | END

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Deferred execution graph

graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']

在上面的示例中,节点 "b""c" 在同一个超步中并发执行。我们将节点 d 上的 defer=True 设置为 true,这样它就不会执行,直到所有待处理任务完成。在这种情况下,这意味着 "d" 会等待执行,直到整个 "b" 分支完成。

条件分支

如果您的扇出应根据状态在运行时变化,您可以使用 add_conditional_edges 使用图状态选择一个或多个路径。请参阅下面的示例,其中节点 a 生成一个状态更新,该更新决定了以下节点。

API 参考:StateGraph | START | END

import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # Add a key to the state. We will set this key to determine
    # how we branch.
    which: str

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"], "which": "c"}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)

def conditional_edge(state: State) -> Literal["b", "c"]:
    # Fill in arbitrary logic here that uses the state
    # to determine the next node
    return state["which"]

builder.add_conditional_edges("a", conditional_edge)

graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Conditional branching graph

result = graph.invoke({"aggregate": []})
print(result)
Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}

提示

您的条件边可以路由到多个目标节点。例如

def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

Map-Reduce 和 Send API

LangGraph 使用 Send API 支持 Map-Reduce 和其他高级分支模式。以下是其使用示例

API 参考:StateGraph | START | END | Send

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing_extensions import TypedDict

class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: list[str]
    best_selected_joke: str

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
builder.add_edge("generate_topics", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Map-reduce graph with fanout

# Call the graph: here we call it to generate a list of jokes
for step in graph.stream({"topic": "animals"}):
    print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}

创建和控制循环

在创建带有循环的图时,我们需要一种终止执行的机制。这通常通过添加一条条件边来实现,该边在达到某个终止条件后将路由到END节点。

您还可以在调用或流式传输图时设置图的递归限制。递归限制设置了图在引发错误之前允许执行的超步数。在此处阅读有关递归限制概念的更多信息这里

让我们考虑一个带循环的简单图,以更好地理解这些机制是如何工作的。

提示

要返回状态的最后一个值而不是收到递归限制错误,请参阅下一节

创建循环时,您可以包含指定终止条件的条件边

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

def route(state: State) -> Literal["b", END]:
    if termination_condition(state):
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

要控制递归限制,请在配置中指定 "recursion_limit"。这将引发一个 GraphRecursionError,您可以捕获并处理它

from langgraph.errors import GraphRecursionError

try:
    graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
    print("Recursion Error")

让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。

API 参考:StateGraph | START | END

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Simple loop graph

这种架构类似于一个ReAct 代理,其中节点 "a" 是一个工具调用模型,节点 "b" 代表工具。

在我们的 route 条件边中,我们指定当状态中的 "aggregate" 列表长度超过阈值时,我们应该结束。

调用图时,我们看到在达到终止条件之前,我们在节点 "a""b" 之间交替。

graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']

施加递归限制

在某些应用程序中,我们可能无法保证达到给定的终止条件。在这些情况下,我们可以设置图的递归限制。这将在给定数量的超步之后引发 GraphRecursionError。然后我们可以捕获并处理此异常

from langgraph.errors import GraphRecursionError

try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

扩展示例:达到递归限制时返回状态

除了引发 GraphRecursionError 之外,我们还可以向状态引入一个新键,用于跟踪到达递归限制之前剩余的步数。然后我们可以使用此键来确定是否应该结束运行。

LangGraph 实现了一个特殊的 RemainingSteps 注释。在底层,它创建了一个 ManagedValue 通道——一个状态通道,它将在我们的图运行期间存在,并且之后不再存在。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    remaining_steps: RemainingSteps

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# Define edges
def route(state: State) -> Literal["b", END]:
    if state["remaining_steps"] <= 2:
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

# Test it out
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}

扩展示例:带分支的循环

为了更好地理解递归限制的工作原理,让我们考虑一个更复杂的示例。下面我们实现一个循环,但其中一个步骤扇出到两个节点

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Node C sees {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Node D sees {state["aggregate"]}')
    return {"aggregate": ["D"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

Complex loop graph with branches

这个图看起来很复杂,但可以概念化为超步的循环

  1. 节点 A
  2. 节点 B
  3. 节点 C 和 D
  4. 节点 A
  5. ...

我们有一个由四个超步组成的循环,其中节点 C 和 D 并发执行。

像之前一样调用图,我们看到我们在达到终止条件之前完成了两次完整的“循环”

result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']

然而,如果我们把递归限制设置为四,我们只完成一个循环,因为每个循环有四个超步

from langgraph.errors import GraphRecursionError

try:
    result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步

使用异步编程范式可以在并发运行I/O 密集型代码(例如,向聊天模型提供商发出并发 API 请求)时产生显著的性能改进。

要将图的 同步 实现转换为 异步 实现,您需要

  1. 更新 nodes 使用 async def 而不是 def
  2. 更新内部代码以适当地使用 await
  3. 根据需要使用 .ainvoke.astream 调用图。

因为许多 LangChain 对象都实现了Runnable 协议,该协议具有所有 同步 方法的 异步 变体,所以将 同步 图升级到 异步 图通常非常快。

请参见下面的示例。为了演示底层 LLM 的异步调用,我们将包含一个聊天模型

pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-..."

llm = init_chat_model("openai:gpt-4.1")

👉 阅读 OpenAI 集成文档

pip install -U "langchain[anthropic]"
import os
from langchain.chat_models import init_chat_model

os.environ["ANTHROPIC_API_KEY"] = "sk-..."

llm = init_chat_model("anthropic:claude-3-5-sonnet-latest")

👉 阅读 Anthropic 集成文档

pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["AZURE_OPENAI_API_KEY"] = "..."
os.environ["AZURE_OPENAI_ENDPOINT"] = "..."
os.environ["OPENAI_API_VERSION"] = "2025-03-01-preview"

llm = init_chat_model(
    "azure_openai:gpt-4.1",
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
)

👉 阅读 Azure 集成文档

pip install -U "langchain[google-genai]"
import os
from langchain.chat_models import init_chat_model

os.environ["GOOGLE_API_KEY"] = "..."

llm = init_chat_model("google_genai:gemini-2.0-flash")

👉 阅读 Google GenAI 集成文档

pip install -U "langchain[aws]"
from langchain.chat_models import init_chat_model

# Follow the steps here to configure your credentials:
# https://docs.aws.amazon.com/bedrock/latest/userguide/getting-started.html

llm = init_chat_model(
    "anthropic.claude-3-5-sonnet-20240620-v1:0",
    model_provider="bedrock_converse",
)

👉 阅读 AWS Bedrock 集成文档

API 参考:init_chat_model | StateGraph

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph

async def node(state: MessagesState): # (1)!
    new_message = await llm.ainvoke(state["messages"]) # (2)!
    return {"messages": [new_message]}

builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()

input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]}) # (3)!
  1. 将节点声明为异步函数。
  2. 在节点内部可用时使用异步调用。
  3. 在图对象本身上使用异步调用。

异步流式传输

有关异步流式传输的示例,请参阅流式传输指南

结合控制流和状态更新与 Command

结合控制流(边)和状态更新(节点)可能很有用。例如,您可能希望在同一个节点中既执行状态更新又决定下一步要转到哪个节点。LangGraph 通过从节点函数返回一个Command对象来提供实现此目的的方法

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # state update
        update={"foo": "bar"},
        # control flow
        goto="my_other_node"
    )

下面我们展示一个端到端示例。让我们创建一个包含 3 个节点的简单图:A、B 和 C。我们将首先执行节点 A,然后根据节点 A 的输出决定下一步是转到节点 B 还是节点 C。

API 参考:StateGraph | START | Command

import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command

# Define graph state
class State(TypedDict):
    foo: str

# Define the nodes

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["a", "b"])
    # this is a replacement for a conditional edge function
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # note how Command allows you to BOTH update the graph state AND route to the next node
    return Command(
        # this is the state update
        update={"foo": value},
        # this is a replacement for an edge
        goto=goto,
    )

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}

现在我们可以用上面的节点创建 StateGraph。请注意,该图没有用于路由的条件边!这是因为控制流是使用 node_a 内部的 Command 定义的。

builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# NOTE: there are no edges between nodes A, B and C!

graph = builder.compile()

重要

您可能已经注意到,我们将 Command 用作返回类型注释,例如 Command[Literal["node_b", "node_c"]]。这对于图渲染是必需的,并且告诉 LangGraph node_a 可以导航到 node_bnode_c

from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

Command-based graph navigation

如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采用不同的路径(A -> B 或 A -> C)。

graph.invoke({"foo": ""})
Called A
Called C

如果您正在使用子图,您可能希望从子图中的一个节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )

让我们使用上面的示例来演示这一点。我们将通过将上面示例中的 node_a 更改为一个单节点图来实现,然后将其作为子图添加到我们的父图中。

使用 Command.PARENT 的状态更新

当您将更新从子图节点发送到父图节点时,对于父图和子图状态模式共享的键,您必须为父图状态中正在更新的键定义一个Reducer。请参阅下面的示例。

import operator
from typing_extensions import Annotated

class State(TypedDict):
    # NOTE: we define a reducer here
    foo: Annotated[str, operator.add]

def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    # this is a replacement for a conditional edge function
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # note how Command allows you to BOTH update the graph state AND route to the next node
    return Command(
        update={"foo": value},
        goto=goto,
        # this tells LangGraph to navigate to node_b or node_c in the parent graph
        # NOTE: this will navigate to the closest parent graph relative to the subgraph
        graph=Command.PARENT,
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # NOTE: since we've defined a reducer, we don't need to manually append
    # new characters to existing 'foo' value. instead, reducer will append these
    # automatically (via operator.add)
    return {"foo": "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": "c"}

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()

graph.invoke({"foo": ""})
Called A
Called C

在工具内部使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})

@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
    """Use this to look up user information to better assist them with their questions."""
    user_info = get_user_info(config.get("configurable", {}).get("user_id"))
    return Command(
        update={
            # update the state keys
            "user_info": user_info,
            # update the message history
            "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
        }
    )

重要

当从工具返回 Command 时,您必须Command.update 中包含 messages(或用于消息历史记录的任何状态键),并且 messages 中的消息列表必须包含一个 ToolMessage。这对于生成有效消息历史记录是必需的(LLM 提供商要求带有工具调用的 AI 消息后面跟着工具结果消息)。

如果您使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode,它会自动处理返回 Command 对象的工具并将它们传播到图状态。如果您正在编写调用工具的自定义节点,则需要手动将工具返回的 Command 对象作为来自节点的更新进行传播。

可视化您的图

在这里,我们演示如何可视化您创建的图。

您可以可视化任何任意的,包括StateGraph。让我们通过绘制分形来找点乐子 :).

API 参考:StateGraph | START | END | add_messages

import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

class MyNode:
    def __init__(self, name: str):
        self.name = name
    def __call__(self, state: State):
        return {"messages": [("assistant", f"Called node {self.name}")]}

def route(state) -> Literal["entry_node", "__end__"]:
    if len(state["messages"]) > 10:
        return "__end__"
    return "entry_node"

def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
        return
    # Number of nodes to create at this level
    num_nodes = random.randint(1, 3)  # Adjust randomness as needed
    for i in range(num_nodes):
        nm = ["A", "B", "C"][i]
        node_name = f"node_{current_node}_{nm}"
        builder.add_node(node_name, MyNode(node_name))
        builder.add_edge(current_node, node_name)
        # Recursively add more nodes
        r = random.random()
        if r > 0.2 and level + 1 < max_level:
            add_fractal_nodes(builder, node_name, level + 1, max_level)
        elif r > 0.05:
            builder.add_conditional_edges(node_name, route, node_name)
        else:
            # End
            builder.add_edge(node_name, "__end__")

def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # Optional: set a finish point if required
    builder.add_edge(entry_point, END)  # or any specific node
    return builder.compile()

app = build_fractal_graph(3)

Mermaid

我们还可以将图类转换为 Mermaid 语法。

print(app.get_graph().draw_mermaid())
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    __start__([<p>__start__</p>]):::first
    entry_node(entry_node)
    node_entry_node_A(node_entry_node_A)
    node_entry_node_B(node_entry_node_B)
    node_node_entry_node_B_A(node_node_entry_node_B_A)
    node_node_entry_node_B_B(node_node_entry_node_B_B)
    node_node_entry_node_B_C(node_node_entry_node_B_C)
    __end__([<p>__end__</p>]):::last
    __start__ --> entry_node;
    entry_node --> __end__;
    entry_node --> node_entry_node_A;
    entry_node --> node_entry_node_B;
    node_entry_node_B --> node_node_entry_node_B_A;
    node_entry_node_B --> node_node_entry_node_B_B;
    node_entry_node_B --> node_node_entry_node_B_C;
    node_entry_node_A -.-> entry_node;
    node_entry_node_A -.-> __end__;
    node_node_entry_node_B_A -.-> entry_node;
    node_node_entry_node_B_A -.-> __end__;
    node_node_entry_node_B_B -.-> entry_node;
    node_node_entry_node_B_B -.-> __end__;
    node_node_entry_node_B_C -.-> entry_node;
    node_node_entry_node_B_C -.-> __end__;
    classDef default fill:#f2f0ff,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#bfb6fc

PNG

如果需要,我们可以将图渲染为 .png 文件。这里我们可以使用三种选项

  • 使用 Mermaid.ink API(不需要额外的包)
  • 使用 Mermaid + Pyppeteer(需要 pip install pyppeteer
  • 使用 graphviz(需要 pip install graphviz

使用 Mermaid.Ink

默认情况下,draw_mermaid_png() 使用 Mermaid.Ink 的 API 生成图表。

API 参考:CurveStyle | MermaidDrawMethod | NodeStyles

from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(Image(app.get_graph().draw_mermaid_png()))

Fractal graph visualization

使用 Mermaid + Pyppeteer

import nest_asyncio

nest_asyncio.apply()  # Required for Jupyter Notebook to run async functions

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)

使用 Graphviz

try:
    display(Image(app.get_graph().draw_png()))
except ImportError:
    print(
        "You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
    )