跳到内容

图定义

Graph

add_conditional_edges(source: str, path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self

从起始节点向任意数量的目标节点添加条件边。

参数

  • source (str) –

    起始节点。此条件边将在退出此节点时运行。

  • path (Union[Callable, Runnable]) –

    确定下一个节点或多个节点的可调用对象。如果不指定 path_map,它应该返回一个或多个节点。如果它返回 END,图的执行将停止。

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    路径到节点名称的可选映射。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    在执行 path 选择的节点之后要执行的节点名称。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

path 函数的返回值上没有类型提示的情况下 (例如, -> Literal["foo", "__end__"]:)

或 path_map,图可视化会假定边可以转换到图中的任何节点。

set_entry_point(key: str) -> Self

指定图中要调用的第一个节点。

等效于调用 add_edge(START, key)

参数

  • key (str) –

    要设置为入口点的节点键。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

set_conditional_entry_point(path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self

在图中设置条件入口点。

参数

  • path (Union[Callable, Runnable]) –

    确定下一个节点或多个节点的可调用对象。如果不指定 path_map,它应该返回一个或多个节点。如果它返回 END,图的执行将停止。

  • path_map (Optional[dict[str, str]], default: None ) –

    路径到节点名称的可选映射。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    在执行 path 选择的节点之后要执行的节点名称。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

set_finish_point(key: str) -> Self

将节点标记为图的完成点。

如果图到达此节点,它将停止执行。

参数

  • key (str) –

    要设置为完成点的节点键。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

CompiledGraph

基类: Pregel

stream_mode: StreamMode = stream_mode class-attribute instance-attribute

流式输出模式,默认为 'values'。

stream_eager: bool = stream_eager class-attribute instance-attribute

是否强制急切地发出流事件,对于 stream_mode "messages" 和 "custom" 自动开启。

stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels class-attribute instance-attribute

要流式传输的通道,默认为不在保留通道中的所有通道

step_timeout: Optional[float] = step_timeout class-attribute instance-attribute

等待步骤完成的最长时间(秒)。默认为 None。

debug: bool = debug if debug is not None else get_debug() instance-attribute

是否在执行期间打印调试信息。默认为 False。

checkpointer: Checkpointer = checkpointer class-attribute instance-attribute

用于保存和加载图状态的检查点。默认为 None。

store: Optional[BaseStore] = store class-attribute instance-attribute

用于 SharedValues 的内存存储。默认为 None。

retry_policy: Optional[RetryPolicy] = retry_policy class-attribute instance-attribute

运行任务时使用的重试策略。设置为 None 以禁用。

get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot

获取图的当前状态。

aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot async

获取图的当前状态。

update_state(config: RunnableConfig, values: Optional[Union[dict[str, Any], Any]], as_node: Optional[str] = None) -> RunnableConfig

使用给定值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是不明确的)。

aupdate_state(config: RunnableConfig, values: dict[str, Any] | Any, as_node: Optional[str] = None) -> RunnableConfig async

使用给定值异步更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是不明确的)。

stream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> Iterator[Union[dict[str, Any], Any]]

为单个输入流式传输图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行中使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values":在每个步骤之后发出状态中的所有值。与函数式 API 一起使用时,值在工作流程结束时发出一次。
    • "updates":仅在每个步骤之后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将分别发出。
    • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages":与元数据一起逐个令牌地发出 LLM 消息,用于节点或任务内部的任何 LLM 调用。
    • "debug":为每个步骤发出尽可能多信息的调试事件。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之前中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之后中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

产生

  • Union[dict[str, Any], Any]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> def node_a(state: State):
...     response = llm.invoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> AsyncIterator[Union[dict[str, Any], Any]] async

为单个输入流式传输图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行中使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values":在每个步骤之后发出状态中的所有值。与函数式 API 一起使用时,值在工作流程结束时发出一次。
    • "updates":仅在每个步骤之后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将分别发出。
    • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages":与元数据一起逐个令牌地发出 LLM 消息,用于节点或任务内部的任何 LLM 调用。
    • "debug":为每个步骤发出尽可能多信息的调试事件。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之前中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之后中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

产生

  • AsyncIterator[Union[dict[str, Any], Any]]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> async def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> async def node_a(state: State):
...     response = await llm.ainvoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]

使用单个输入和配置运行图。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。图运行的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]。图运行的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。要从图运行中检索的输出键。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之前中断图运行的节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之后中断图运行的节点。

  • debug (Optional[bool], default: None ) –

    可选。启用图运行的调试模式。

  • **kwargs (Any, default: {} ) –

    要传递给图运行的其他关键字参数。

返回

  • Union[dict[str, Any], Any]

    图运行的输出。如果 stream_mode 为 "values",它将返回最新的输出。

  • Union[dict[str, Any], Any]

    如果 stream_mode 不是 "values",它将返回输出块的列表。

ainvoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any] async

在单个输入上异步调用图。

参数

  • input (Union[dict[str, Any], Any]) –

    计算的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。计算的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    可选。计算的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。要包含在结果中的输出键。默认为 None。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之前中断的节点。默认为 None。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之后中断的节点。默认为 None。

  • debug (Optional[bool], default: None ) –

    可选。是否启用调试模式。默认为 None。

  • **kwargs (Any, default: {} ) –

    其他关键字参数。

返回

  • Union[dict[str, Any], Any]

    计算结果。如果 stream_mode 为 "values",它将返回最新的值。

  • Union[dict[str, Any], Any]

    如果 stream_mode 为 "chunks",它将返回块的列表。

get_graph(config: Optional[RunnableConfig] = None, *, xray: Union[int, bool] = False) -> DrawableGraph

返回计算图的可绘制表示。

StateGraph

基类: Graph

一个图,其节点通过读取和写入共享状态进行通信。每个节点的签名是 State -> Partial.

每个状态键都可以选择使用归约函数进行注释,该函数将用于聚合从多个节点接收到的该键的值。归约函数的签名是 (Value, Value) -> Value。

参数

  • state_schema (Type[Any], default: None ) –

    定义状态的模式类。

  • config_schema (Optional[Type[Any]], default: None ) –

    定义配置的模式类。使用此项可在您的 API 中公开可配置的参数。

示例

>>> from langchain_core.runnables import RunnableConfig
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.checkpoint.memory import MemorySaver
>>> from langgraph.graph import StateGraph
>>>
>>> def reducer(a: list, b: int | None) -> list:
...     if b is not None:
...         return a + [b]
...     return a
>>>
>>> class State(TypedDict):
...     x: Annotated[list, reducer]
>>>
>>> class ConfigSchema(TypedDict):
...     r: float
>>>
>>> graph = StateGraph(State, config_schema=ConfigSchema)
>>>
>>> def node(state: State, config: RunnableConfig) -> dict:
...     r = config["configurable"].get("r", 1.0)
...     x = state["x"][-1]
...     next_value = x * r * (1 - x)
...     return {"x": next_value}
>>>
>>> graph.add_node("A", node)
>>> graph.set_entry_point("A")
>>> graph.set_finish_point("A")
>>> compiled = graph.compile()
>>>
>>> print(compiled.config_specs)
[ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
>>>
>>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
>>> print(step1)
{'x': [0.5, 0.75]}

set_entry_point(key: str) -> Self

指定图中要调用的第一个节点。

等效于调用 add_edge(START, key)

参数

  • key (str) –

    要设置为入口点的节点键。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

set_conditional_entry_point(path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self

在图中设置条件入口点。

参数

  • path (Union[Callable, Runnable]) –

    确定下一个节点或多个节点的可调用对象。如果不指定 path_map,它应该返回一个或多个节点。如果它返回 END,图的执行将停止。

  • path_map (Optional[dict[str, str]], default: None ) –

    路径到节点名称的可选映射。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    在执行 path 选择的节点之后要执行的节点名称。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

set_finish_point(key: str) -> Self

将节点标记为图的完成点。

如果图到达此节点,它将停止执行。

参数

  • key (str) –

    要设置为完成点的节点键。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

add_node(node: Union[str, RunnableLike], action: Optional[RunnableLike] = None, *, metadata: Optional[dict[str, Any]] = None, input: Optional[Type[Any]] = None, retry: Optional[RetryPolicy] = None, destinations: Optional[Union[dict[str, str], tuple[str]]] = None) -> Self

向状态图中添加新节点。

将采用函数/可运行对象的名称作为节点名称。

参数

  • node (Union[str, RunnableLike]) –

    此节点将运行的函数或可运行对象。

  • action (Optional[RunnableLike], default: None ) –

    与节点关联的操作。(默认值:None)

  • metadata (Optional[dict[str, Any]], default: None ) –

    与节点关联的元数据。(默认值:None)

  • input (Optional[Type[Any]], default: None ) –

    节点的输入模式。(默认值:图的输入模式)

  • retry (Optional[RetryPolicy], default: None ) –

    节点重试策略。(默认值:None)

  • destinations (Optional[Union[dict[str, str], tuple[str]]], default: None ) –

    指示节点可以路由到何处的目的地。这对于使用返回 Command 对象的节点的无边图很有用。如果提供了字典,则键将用作目标节点名称,值将用作边的标签。如果提供了元组,则值将用作目标节点名称。注意:这仅用于图渲染,对图执行没有任何影响。

Raises: ValueError: 如果键已被用作状态键。

示例

>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
...    return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node)  # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
自定义名称

>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}

返回

  • Self ( Self ) –

    状态图的实例,允许方法链式调用。

add_edge(start_key: Union[str, list[str]], end_key: str) -> Self

从起始节点(或起始节点列表)向结束节点添加有向边。

当提供单个起始节点时,图将等待该节点完成,然后再执行结束节点。当提供多个起始节点时,图将等待所有起始节点完成,然后再执行结束节点。

参数

  • start_key (Union[str, list[str]]) –

    边的起始节点的键。

  • end_key (str) –

    边的结束节点的键。

Raises

  • ValueError

    如果起始键为“END”,或者起始键或结束键在图中不存在。

返回

  • Self ( Self ) –

    状态图的实例,允许方法链式调用。

add_conditional_edges(source: str, path: Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]], path_map: Optional[Union[dict[Hashable, str], list[str]]] = None, then: Optional[str] = None) -> Self

从起始节点向任意数量的目标节点添加条件边。

参数

  • source (str) –

    起始节点。此条件边将在退出此节点时运行。

  • path (Union[Callable, Runnable]) –

    确定下一个节点或多个节点的可调用对象。如果不指定 path_map,它应该返回一个或多个节点。如果它返回 END,图的执行将停止。

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    路径到节点名称的可选映射。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    在执行 path 选择的节点之后要执行的节点名称。

返回

  • Self ( Self ) –

    图的实例,允许方法链式调用。

path 函数的返回值上没有类型提示的情况下 (例如, -> Literal["foo", "__end__"]:)

或 path_map,图可视化会假定边可以转换到图中的任何节点。

add_sequence(nodes: Sequence[Union[RunnableLike, tuple[str, RunnableLike]]]) -> Self

添加将按提供的顺序执行的节点序列。

参数

  • nodes (Sequence[Union[RunnableLike, tuple[str, RunnableLike]]]) –

    RunnableLike 对象(例如 LangChain Runnable 或可调用对象)或 (name, RunnableLike) 元组的序列。如果未提供名称,则名称将从节点对象(例如 runnable 或可调用名称)推断出来。每个节点将按提供的顺序执行。

Raises

  • ValueError

    如果序列为空。

  • ValueError

    如果序列包含重复的节点名称。

返回

  • Self ( Self ) –

    状态图的实例,允许方法链式调用。

compile(checkpointer: Checkpointer = None, *, store: Optional[BaseStore] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, debug: bool = False, name: Optional[str] = None) -> CompiledStateGraph

将状态图编译为 CompiledGraph 对象。

编译后的图实现了 Runnable 接口,可以被调用、流式传输、批量处理和异步运行。

参数

  • checkpointer (Optional[Union[Checkpointer, Literal[False]]], default: None ) –

    检查点保存器对象或标志。如果提供,此 Checkpointer 将充当图的完全版本化的“短期记忆”,使其可以从任何点暂停、恢复和重放。如果为 None,则当用作子图时,它可能会继承父图的检查点保存器。如果为 False,它将不使用或继承任何检查点保存器。

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    要在之前中断的可选节点名称列表。

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    要在之后中断的可选节点名称列表。

  • debug (bool, default: False ) –

    指示是否启用调试模式的标志。

返回

  • CompiledStateGraph ( CompiledStateGraph ) –

    编译后的状态图。

CompiledStateGraph

基类: CompiledGraph

stream_mode: StreamMode = stream_mode class-attribute instance-attribute

流式输出模式,默认为 'values'。

stream_eager: bool = stream_eager class-attribute instance-attribute

是否强制急切地发出流事件,对于 stream_mode "messages" 和 "custom" 自动开启。

stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels class-attribute instance-attribute

要流式传输的通道,默认为不在保留通道中的所有通道

step_timeout: Optional[float] = step_timeout class-attribute instance-attribute

等待步骤完成的最长时间(秒)。默认为 None。

debug: bool = debug if debug is not None else get_debug() instance-attribute

是否在执行期间打印调试信息。默认为 False。

checkpointer: Checkpointer = checkpointer class-attribute instance-attribute

用于保存和加载图状态的检查点。默认为 None。

store: Optional[BaseStore] = store class-attribute instance-attribute

用于 SharedValues 的内存存储。默认为 None。

retry_policy: Optional[RetryPolicy] = retry_policy class-attribute instance-attribute

运行任务时使用的重试策略。设置为 None 以禁用。

get_graph(config: Optional[RunnableConfig] = None, *, xray: Union[int, bool] = False) -> DrawableGraph

返回计算图的可绘制表示。

get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot

获取图的当前状态。

aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot async

获取图的当前状态。

update_state(config: RunnableConfig, values: Optional[Union[dict[str, Any], Any]], as_node: Optional[str] = None) -> RunnableConfig

使用给定值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是不明确的)。

aupdate_state(config: RunnableConfig, values: dict[str, Any] | Any, as_node: Optional[str] = None) -> RunnableConfig async

使用给定值异步更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,则会将其设置为上次更新状态的节点(如果不是不明确的)。

stream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> Iterator[Union[dict[str, Any], Any]]

为单个输入流式传输图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行中使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values":在每个步骤之后发出状态中的所有值。与函数式 API 一起使用时,值在工作流程结束时发出一次。
    • "updates":仅在每个步骤之后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将分别发出。
    • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages":与元数据一起逐个令牌地发出 LLM 消息,用于节点或任务内部的任何 LLM 调用。
    • "debug":为每个步骤发出尽可能多信息的调试事件。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之前中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之后中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

产生

  • Union[dict[str, Any], Any]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> def node_a(state: State):
...     response = llm.invoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None, output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, subgraphs: bool = False) -> AsyncIterator[Union[dict[str, Any], Any]] async

为单个输入流式传输图步骤。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入。

  • config (Optional[RunnableConfig], default: None ) –

    运行中使用的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    流式输出的模式,默认为 self.stream_mode。选项包括

    • "values":在每个步骤之后发出状态中的所有值。与函数式 API 一起使用时,值在工作流程结束时发出一次。
    • "updates":仅在每个步骤之后发出节点或任务名称以及节点或任务返回的更新。如果在同一步骤中进行多次更新(例如,运行多个节点),则这些更新将分别发出。
    • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
    • "messages":与元数据一起逐个令牌地发出 LLM 消息,用于节点或任务内部的任何 LLM 调用。
    • "debug":为每个步骤发出尽可能多信息的调试事件。
  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之前中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    要在之后中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    是否在执行期间打印调试信息,默认为 False。

  • subgraphs (bool, default: False ) –

    是否流式传输子图,默认为 False。

产生

  • AsyncIterator[Union[dict[str, Any], Any]]

    图中每个步骤的输出。输出形状取决于 stream_mode。

示例

使用不同流模式的图

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
使用 stream_mode="values"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
使用 stream_mode="updates"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
使用 stream_mode="debug"

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}

使用 stream_mode="custom"

>>> from langgraph.types import StreamWriter
...
>>> async def node_a(state: State, writer: StreamWriter):
...     writer({"custom_data": "foo"})
...     return {"alist": ["hi"]}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
...
>>> async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
...     print(event)
{'custom_data': 'foo'}

使用 stream_mode="messages"

>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph, START
>>> from langchain_openai import ChatOpenAI
...
>>> llm = ChatOpenAI(model="gpt-4o-mini")
...
>>> class State(TypedDict):
...     question: str
...     answer: str
...
>>> async def node_a(state: State):
...     response = await llm.ainvoke(state["question"])
...     return {"answer": response.content}
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", node_a)
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()

>>> for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
...     print(event)
(AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
(AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
(AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
(AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any]

使用单个输入和配置运行图。

参数

  • input (Union[dict[str, Any], Any]) –

    图的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。图运行的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]。图运行的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。要从图运行中检索的输出键。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之前中断图运行的节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之后中断图运行的节点。

  • debug (Optional[bool], default: None ) –

    可选。启用图运行的调试模式。

  • **kwargs (Any, default: {} ) –

    要传递给图运行的其他关键字参数。

返回

  • Union[dict[str, Any], Any]

    图运行的输出。如果 stream_mode 为 "values",它将返回最新的输出。

  • Union[dict[str, Any], Any]

    如果 stream_mode 不是 "values",它将返回输出块的列表。

ainvoke(input: Union[dict[str, Any], Any], config: Optional[RunnableConfig] = None, *, stream_mode: StreamMode = 'values', output_keys: Optional[Union[str, Sequence[str]]] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, debug: Optional[bool] = None, **kwargs: Any) -> Union[dict[str, Any], Any] async

在单个输入上异步调用图。

参数

  • input (Union[dict[str, Any], Any]) –

    计算的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    可选。计算的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    可选。计算的流模式。默认为 "values"。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    可选。要包含在结果中的输出键。默认为 None。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之前中断的节点。默认为 None。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    可选。要在之后中断的节点。默认为 None。

  • debug (Optional[bool], default: None ) –

    可选。是否启用调试模式。默认为 None。

  • **kwargs (Any, default: {} ) –

    其他关键字参数。

返回

  • Union[dict[str, Any], Any]

    计算结果。如果 stream_mode 为 "values",它将返回最新的值。

  • Union[dict[str, Any], Any]

    如果 stream_mode 为 "chunks",它将返回块的列表。

add_messages(left: Messages, right: Messages, *, format: Optional[Literal['langchain-openai']] = None) -> Messages

合并两个消息列表,通过 ID 更新现有消息。

默认情况下,除非新消息与现有消息具有相同的 ID,否则这将确保状态为“仅追加”。

参数

  • left (Messages) –

    消息的基础列表。

  • right (Messages) –

    要合并到基础列表中的消息列表(或单条消息)。

  • format (Optional[Literal['langchain-openai']], 默认值: None ) –

    返回消息的格式。如果为 None,则消息将按原样返回。如果为 'langchain-openai',则消息将作为 BaseMessage 对象返回,其内容格式化为匹配 OpenAI 消息格式,这意味着内容可以是字符串、“text”块或“image_url”块,工具响应将作为其自己的 ToolMessages 返回。

    要求:必须安装 langchain-core>=0.3.11 才能使用此功能。

返回

  • Messages

    一个新消息列表,其中 right 中的消息已合并到 left 中。

  • Messages

    如果 right 中的消息与 left 中的消息具有相同的 ID,则

  • Messages

    来自 right 的消息将替换来自 left 的消息。

示例

>>> from langchain_core.messages import AIMessage, HumanMessage
>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [AIMessage(content="Hi there!", id="2")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]

>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [HumanMessage(content="Hello again", id="1")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello again', id='1')]

>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph
>>>
>>> class State(TypedDict):
...     messages: Annotated[list, add_messages]
...
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({})
{'messages': [AIMessage(content='Hello', id=...)]}

>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph, add_messages
>>>
>>> class State(TypedDict):
...     messages: Annotated[list, add_messages(format='langchain-openai')]
...
>>> def chatbot_node(state: State) -> list:
...     return {"messages": [
...         {
...             "role": "user",
...             "content": [
...                 {
...                     "type": "text",
...                     "text": "Here's an image:",
...                     "cache_control": {"type": "ephemeral"},
...                 },
...                 {
...                     "type": "image",
...                     "source": {
...                         "type": "base64",
...                         "media_type": "image/jpeg",
...                         "data": "1234",
...                     },
...                 },
...             ]
...         },
...     ]}
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", chatbot_node)
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({"messages": []})
{
    'messages': [
        HumanMessage(
            content=[
                {"type": "text", "text": "Here's an image:"},
                {
                    "type": "image_url",
                    "image_url": {"url": ""},
                },
            ],
        ),
    ]
}

..versionchanged:: 0.2.61

Support for 'format="langchain-openai"' flag added.

评论