跳到内容

RemoteGraph

名称 描述
RemoteGraph

RemoteGraph 类是用于调用远程 API 的客户端实现

RemoteGraph

基类: PregelProtocol

RemoteGraph 类是用于调用实现 LangGraph 服务器 API 规范的远程 API 的客户端实现。

例如,RemoteGraph 类可用于调用 LangGraph Cloud 上部署的 API。

RemoteGraph 的行为与 Graph 相同,可以直接用作另一个 Graph 中的节点。

方法

名称 描述
__init__

指定 urlapi_key 和/或 headers 以创建默认的同步和异步客户端。

get_graph

按图名称获取图。

aget_graph

按图名称获取图。

get_state

获取线程的状态。

aget_state

获取线程的状态。

get_state_history

获取线程的状态历史。

aget_state_history

获取线程的状态历史。

update_state

更新线程的状态。

aupdate_state

更新线程的状态。

stream

创建一个运行并流式传输结果。

astream

创建一个运行并流式传输结果。

invoke

创建一个运行,等待其完成并返回最终状态。

ainvoke

创建一个运行,等待其完成并返回最终状态。

get_name

获取 Runnable 的名称。

get_input_schema

获取可用于验证 Runnable 输入的 pydantic 模型。

get_input_jsonschema

获取表示 Runnable 输入的 JSON Schema。

get_output_schema

获取可用于验证 Runnable 输出的 pydantic 模型。

get_output_jsonschema

获取表示 Runnable 输出的 JSON Schema。

config_schema

此 Runnable 接受的配置类型,指定为 pydantic 模型。

get_config_jsonschema

获取表示 Runnable 配置的 JSON Schema。

get_prompts

返回此 Runnable 使用的提示列表。

__or__

将此 Runnable 与另一个对象组合以创建 RunnableSequence。

__ror__

将此 Runnable 与另一个对象组合以创建 RunnableSequence。

pipe

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

pick

从此 Runnable 的输出字典中选择单个键。

assign

为此 Runnable 的字典输出分配新字段。

batch

默认实现使用线程池执行器并行运行 invoke。

batch_as_completed

在输入列表上并行运行 invoke。

abatch

默认实现使用 asyncio.gather 并行运行 ainvoke。

abatch_as_completed

在输入列表上并行运行 ainvoke。

astream_log

从 Runnable 流式传输所有输出,如报告给回调系统。

transform

transform 的默认实现,它会缓冲输入并调用 astream。

atransform

atransform 的默认实现,它会缓冲输入并调用 astream。

bind

将参数绑定到 Runnable,返回一个新的 Runnable。

with_listeners

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_alisteners

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_types

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

with_retry

创建一个新的 Runnable,它在发生异常时会重试原始 Runnable。

map

返回一个新的 Runnable,它将输入列表映射到输出列表。

with_fallbacks

为 Runnable 添加回退,返回一个新的 Runnable。

as_tool

从 Runnable 创建一个 BaseTool。

属性

名称 类型 描述
InputType type[Input]

此 Runnable 接受的输入类型,指定为类型注解。

OutputType type[Output]

此 Runnable 生成的输出类型,指定为类型注解。

input_schema type[BaseModel]

此 Runnable 接受的输入类型,指定为 pydantic 模型。

output_schema type[BaseModel]

此 Runnable 生成的输出类型,指定为 pydantic 模型。

config_specs list[ConfigurableFieldSpec]

此 Runnable 的可配置字段列表。

InputType property

InputType: type[Input]

此 Runnable 接受的输入类型,指定为类型注解。

OutputType property

OutputType: type[Output]

此 Runnable 生成的输出类型,指定为类型注解。

input_schema property

input_schema: type[BaseModel]

此 Runnable 接受的输入类型,指定为 pydantic 模型。

output_schema property

output_schema: type[BaseModel]

此 Runnable 生成的输出类型,指定为 pydantic 模型。

config_specs property

config_specs: list[ConfigurableFieldSpec]

此 Runnable 的可配置字段列表。

__init__

__init__(
    name: str,
    /,
    *,
    url: Optional[str] = None,
    api_key: Optional[str] = None,
    headers: Optional[dict[str, str]] = None,
    client: Optional[LangGraphClient] = None,
    sync_client: Optional[SyncLangGraphClient] = None,
    config: Optional[RunnableConfig] = None,
)

指定 urlapi_key 和/或 headers 以创建默认的同步和异步客户端。

如果提供了 clientsync_client,将使用它们而不是默认客户端。有关默认客户端的详细信息,请参阅 LangGraphClientSyncLangGraphClient。必须至少提供 urlclientsync_client 中的一个。

参数

名称 类型 描述 默认值
name str

图的名称。

必需
url Optional[str]

远程 API 的 URL。

None
api_key Optional[str]

用于身份验证的 API 密钥。如果未提供,将从环境变量中读取(LANGGRAPH_API_KEYLANGSMITH_API_KEYLANGCHAIN_API_KEY)。

None
headers Optional[dict[str, str]]

请求中包含的额外头部信息。

None
client Optional[LangGraphClient]

要使用的 LangGraphClient 实例,而不是创建默认客户端。

None
sync_client Optional[SyncLangGraphClient]

要使用的 SyncLangGraphClient 实例,而不是创建默认客户端。

None
config Optional[RunnableConfig]

一个可选的 RunnableConfig 实例,包含附加配置。

None

get_graph

get_graph(
    config: Optional[RunnableConfig] = None,
    *,
    xray: Union[int, bool] = False
) -> Graph

按图名称获取图。

此方法调用 GET /assistants/{assistant_id}/graph

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

此参数未使用。

None
xray Union[int, bool]

包含子图的图表示。如果提供整数值,则仅包含深度小于或等于该值的子图。

False

返回值

类型 描述
Graph

JSON 格式的助手图信息。

aget_graph async

aget_graph(
    config: Optional[RunnableConfig] = None,
    *,
    xray: Union[int, bool] = False
) -> Graph

按图名称获取图。

此方法调用 GET /assistants/{assistant_id}/graph

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

此参数未使用。

None
xray Union[int, bool]

包含子图的图表示。如果提供整数值,则仅包含深度小于或等于该值的子图。

False

返回值

类型 描述
Graph

JSON 格式的助手图信息。

get_state

get_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

获取线程的状态。

如果 config 中指定了检查点,此方法调用 POST /threads/{thread_id}/state/checkpoint,如果未指定检查点,则调用 GET /threads/{thread_id}/state

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
subgraphs bool

在状态中包含子图。

False

返回值

类型 描述
StateSnapshot

线程的最新状态。

aget_state async

aget_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

获取线程的状态。

如果 config 中指定了检查点,此方法调用 POST /threads/{thread_id}/state/checkpoint,如果未指定检查点,则调用 GET /threads/{thread_id}/state

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
subgraphs bool

在状态中包含子图。

False

返回值

类型 描述
StateSnapshot

线程的最新状态。

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: Optional[dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None
) -> Iterator[StateSnapshot]

获取线程的状态历史。

此方法调用 POST /threads/{thread_id}/history

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
filter Optional[dict[str, Any]]

用于过滤的元数据。

None
before Optional[RunnableConfig]

一个包含检查点元数据的 RunnableConfig

None
limit Optional[int]

返回的最大状态数。

None

返回值

类型 描述
Iterator[StateSnapshot]

线程的状态。

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: Optional[dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None
) -> AsyncIterator[StateSnapshot]

获取线程的状态历史。

此方法调用 POST /threads/{thread_id}/history

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
filter Optional[dict[str, Any]]

用于过滤的元数据。

None
before Optional[RunnableConfig]

一个包含检查点元数据的 RunnableConfig

None
limit Optional[int]

返回的最大状态数。

None

返回值

类型 描述
AsyncIterator[StateSnapshot]

线程的状态。

update_state

update_state(
    config: RunnableConfig,
    values: Optional[Union[dict[str, Any], Any]],
    as_node: Optional[str] = None,
) -> RunnableConfig

更新线程的状态。

此方法调用 POST /threads/{thread_id}/state

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
values Optional[Union[dict[str, Any], Any]]

要更新到状态的值。

必需
as_node Optional[str]

更新状态,就像刚刚执行了此节点一样。

None

返回值

类型 描述
RunnableConfig

更新后的线程的 RunnableConfig

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: Optional[Union[dict[str, Any], Any]],
    as_node: Optional[str] = None,
) -> RunnableConfig

更新线程的状态。

此方法调用 POST /threads/{thread_id}/state

参数

名称 类型 描述 默认值
config RunnableConfig

一个 RunnableConfig,其中包含 thread_idconfigurable 字段中。

必需
values Optional[Union[dict[str, Any], Any]]

要更新到状态的值。

必需
as_node Optional[str]

更新状态,就像刚刚执行了此节点一样。

None

返回值

类型 描述
RunnableConfig

更新后的线程的 RunnableConfig

stream

stream(
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[
        Union[StreamMode, list[StreamMode]]
    ] = None,
    interrupt_before: Optional[
        Union[All, Sequence[str]]
    ] = None,
    interrupt_after: Optional[
        Union[All, Sequence[str]]
    ] = None,
    subgraphs: bool = False,
    **kwargs: Any
) -> Iterator[Union[dict[str, Any], Any]]

创建一个运行并流式传输结果。

如果在 config 的 configurable 字段中指定了 thread_id,此方法调用 POST /threads/{thread_id}/runs/stream,否则调用 POST /runs/stream

参数

名称 类型 描述 默认值
input Union[dict[str, Any], Any]]

图的输入。

必需
config Optional[RunnableConfig]

用于图调用的 RunnableConfig

None
stream_mode Optional[Union[StreamMode, list[StreamMode]]]

要使用的流模式。

None
interrupt_before Optional[Union[All, Sequence[str]]]

在这些节点之前中断图。

None
interrupt_after Optional[Union[All, Sequence[str]]]

在这些节点之后中断图。

None
subgraphs bool

从子图流式传输。

False
**kwargs Any

要传递给 client.runs.stream 的附加参数。

{}

生成值

类型 描述
Union[dict[str, Any], Any]]

图的输出。

astream async

astream(
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[
        Union[StreamMode, list[StreamMode]]
    ] = None,
    interrupt_before: Optional[
        Union[All, Sequence[str]]
    ] = None,
    interrupt_after: Optional[
        Union[All, Sequence[str]]
    ] = None,
    subgraphs: bool = False,
    **kwargs: Any
) -> AsyncIterator[Union[dict[str, Any], Any]]

创建一个运行并流式传输结果。

如果在 config 的 configurable 字段中指定了 thread_id,此方法调用 POST /threads/{thread_id}/runs/stream,否则调用 POST /runs/stream

参数

名称 类型 描述 默认值
input Union[dict[str, Any], Any]]

图的输入。

必需
config Optional[RunnableConfig]

用于图调用的 RunnableConfig

None
stream_mode Optional[Union[StreamMode, list[StreamMode]]]

要使用的流模式。

None
interrupt_before Optional[Union[All, Sequence[str]]]

在这些节点之前中断图。

None
interrupt_after Optional[Union[All, Sequence[str]]]

在这些节点之后中断图。

None
subgraphs bool

从子图流式传输。

False
**kwargs Any

要传递给 client.runs.stream 的附加参数。

{}

生成值

类型 描述
AsyncIterator[Union[dict[str, Any], Any]]

图的输出。

invoke

invoke(
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    interrupt_before: Optional[
        Union[All, Sequence[str]]
    ] = None,
    interrupt_after: Optional[
        Union[All, Sequence[str]]
    ] = None,
    **kwargs: Any
) -> Union[dict[str, Any], Any]

创建一个运行,等待其完成并返回最终状态。

参数

名称 类型 描述 默认值
input Union[dict[str, Any], Any]]

图的输入。

必需
config Optional[RunnableConfig]

用于图调用的 RunnableConfig

None
interrupt_before Optional[Union[All, Sequence[str]]]

在这些节点之前中断图。

None
interrupt_after Optional[Union[All, Sequence[str]]]

在这些节点之后中断图。

None
**kwargs Any

要传递给 RemoteGraph.stream 的附加参数。

{}

返回值

类型 描述
Union[dict[str, Any], Any]]

图的输出。

ainvoke async

ainvoke(
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    interrupt_before: Optional[
        Union[All, Sequence[str]]
    ] = None,
    interrupt_after: Optional[
        Union[All, Sequence[str]]
    ] = None,
    **kwargs: Any
) -> Union[dict[str, Any], Any]

创建一个运行,等待其完成并返回最终状态。

参数

名称 类型 描述 默认值
input Union[dict[str, Any], Any]]

图的输入。

必需
config Optional[RunnableConfig]

用于图调用的 RunnableConfig

None
interrupt_before Optional[Union[All, Sequence[str]]]

在这些节点之前中断图。

None
interrupt_after Optional[Union[All, Sequence[str]]]

在这些节点之后中断图。

None
**kwargs Any

要传递给 RemoteGraph.astream 的附加参数。

{}

返回值

类型 描述
Union[dict[str, Any], Any]]

图的输出。

get_name

get_name(
    suffix: Optional[str] = None,
    *,
    name: Optional[str] = None
) -> str

获取 Runnable 的名称。

get_input_schema

get_input_schema(
    config: Optional[RunnableConfig] = None,
) -> type[BaseModel]

获取可用于验证 Runnable 输入的 pydantic 模型。

利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 将具有动态输入 schema,具体取决于调用 Runnable 时使用的配置。

此方法允许获取特定配置的输入 schema。

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

用于生成 schema 的配置。

None

返回值

类型 描述
type[BaseModel]

可用于验证输入的 pydantic 模型。

get_input_jsonschema

get_input_jsonschema(
    config: Optional[RunnableConfig] = None,
) -> dict[str, Any]

获取表示 Runnable 输入的 JSON Schema。

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

用于生成 schema 的配置。

None

返回值

类型 描述
dict[str, Any]

表示 Runnable 输入的 JSON Schema。

示例

.. code-block:: python

    from langchain_core.runnables import RunnableLambda

    def add_one(x: int) -> int:
        return x + 1

    runnable = RunnableLambda(add_one)

    print(runnable.get_input_jsonschema())

.. versionadded:: 0.3.0

get_output_schema

get_output_schema(
    config: Optional[RunnableConfig] = None,
) -> type[BaseModel]

获取可用于验证 Runnable 输出的 pydantic 模型。

利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 将具有动态输出 schema,具体取决于调用 Runnable 时使用的配置。

此方法允许获取特定配置的输出 schema。

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

用于生成 schema 的配置。

None

返回值

类型 描述
type[BaseModel]

可用于验证输出的 pydantic 模型。

get_output_jsonschema

get_output_jsonschema(
    config: Optional[RunnableConfig] = None,
) -> dict[str, Any]

获取表示 Runnable 输出的 JSON Schema。

参数

名称 类型 描述 默认值
config Optional[RunnableConfig]

用于生成 schema 的配置。

None

返回值

类型 描述
dict[str, Any]

表示 Runnable 输出的 JSON Schema。

示例

.. code-block:: python

    from langchain_core.runnables import RunnableLambda

    def add_one(x: int) -> int:
        return x + 1

    runnable = RunnableLambda(add_one)

    print(runnable.get_output_jsonschema())

.. versionadded:: 0.3.0

config_schema

config_schema(
    *, include: Optional[Sequence[str]] = None
) -> type[BaseModel]

此 Runnable 接受的配置类型,指定为 pydantic 模型。

要将字段标记为可配置,请参阅 configurable_fieldsconfigurable_alternatives 方法。

参数

名称 类型 描述 默认值
include Optional[Sequence[str]]

要包含在 config schema 中的字段列表。

None

返回值

类型 描述
type[BaseModel]

可用于验证配置的 pydantic 模型。

get_config_jsonschema

get_config_jsonschema(
    *, include: Optional[Sequence[str]] = None
) -> dict[str, Any]

获取表示 Runnable 配置的 JSON Schema。

参数

名称 类型 描述 默认值
include Optional[Sequence[str]]

要包含在 config schema 中的字段列表。

None

返回值

类型 描述
dict[str, Any]

表示 Runnable 配置的 JSON Schema。

.. versionadded:: 0.3.0

get_prompts

get_prompts(
    config: Optional[RunnableConfig] = None,
) -> list[BasePromptTemplate]

返回此 Runnable 使用的提示列表。

__or__

__or__(
    other: Union[
        Runnable[Any, Other],
        Callable[[Any], Other],
        Callable[[Iterator[Any]], Iterator[Other]],
        Mapping[
            str,
            Union[
                Runnable[Any, Other],
                Callable[[Any], Other],
                Any,
            ],
        ],
    ],
) -> RunnableSerializable[Input, Other]

将此 Runnable 与另一个对象组合以创建 RunnableSequence。

__ror__

__ror__(
    other: Union[
        Runnable[Other, Any],
        Callable[[Other], Any],
        Callable[[Iterator[Other]], Iterator[Any]],
        Mapping[
            str,
            Union[
                Runnable[Other, Any],
                Callable[[Other], Any],
                Any,
            ],
        ],
    ],
) -> RunnableSerializable[Other, Output]

将此 Runnable 与另一个对象组合以创建 RunnableSequence。

pipe

pipe(
    *others: Union[
        Runnable[Any, Other], Callable[[Any], Other]
    ],
    name: Optional[str] = None
) -> RunnableSerializable[Input, Other]

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

等同于 RunnableSequence(self, *others)self | others[0] | ...

示例

.. code-block:: python

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]

pick

pick(
    keys: Union[str, list[str]],
) -> RunnableSerializable[Any, Any]

从此 Runnable 的输出字典中选择单个键。

选择单个键

.. code-block:: python

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表

.. code-block:: python

from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

assign

assign(
    **kwargs: Union[
        Runnable[dict[str, Any], Any],
        Callable[[dict[str, Any]], Any],
        Mapping[
            str,
            Union[
                Runnable[dict[str, Any], Any],
                Callable[[dict[str, Any]], Any],
            ],
        ],
    ],
) -> RunnableSerializable[Any, Any]

为此 Runnable 的字典输出分配新字段。

返回一个新的 Runnable。

.. code-block:: python

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}

batch

batch(
    inputs: list[Input],
    config: Optional[
        Union[RunnableConfig, list[RunnableConfig]]
    ] = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Optional[Any]
) -> list[Output]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现非常适用于 IO 密集型 runnable。

如果可以更有效地进行批处理,子类应该覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

batch_as_completed

batch_as_completed(
    inputs: Sequence[Input],
    config: Optional[
        Union[RunnableConfig, Sequence[RunnableConfig]]
    ] = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Optional[Any]
) -> Iterator[tuple[int, Union[Output, Exception]]]

在输入列表上并行运行 invoke。

结果完成后即生成。

abatch async

abatch(
    inputs: list[Input],
    config: Optional[
        Union[RunnableConfig, list[RunnableConfig]]
    ] = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Optional[Any]
) -> list[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现非常适用于 IO 密集型 runnable。

如果可以更有效地进行批处理,子类应该覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数

名称 类型 描述 默认值
inputs list[Input]

Runnable 的输入列表。

必需
config Optional[Union[RunnableConfig, list[RunnableConfig]]]

调用 Runnable 时使用的配置。配置支持用于跟踪目的的标准键,如 'tags'、'metadata',用于控制并行工作量的 'max_concurrency' 以及其他键。请参阅 RunnableConfig 了解更多详细信息。默认为 None。

None
return_exceptions bool

是否返回异常而不是抛出异常。默认为 False。

False
kwargs Optional[Any]]

要传递给 Runnable 的附加关键字参数。

{}

返回值

类型 描述
list[Output]

Runnable 的输出列表。

abatch_as_completed async

abatch_as_completed(
    inputs: Sequence[Input],
    config: Optional[
        Union[RunnableConfig, Sequence[RunnableConfig]]
    ] = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Optional[Any]
) -> AsyncIterator[tuple[int, Union[Output, Exception]]]

在输入列表上并行运行 ainvoke。

结果完成后即生成。

参数

名称 类型 描述 默认值
inputs Sequence[Input]

Runnable 的输入列表。

必需
config Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]

在调用 Runnable 时使用的配置。配置支持用于追踪目的的标准键,例如 'tags'、'metadata',以及用于控制并行工作量的 'max_concurrency' 和其他键。更多详细信息请参阅 RunnableConfig。默认为 None。默认为 None。

None
return_exceptions bool

是否返回异常而不是抛出异常。默认为 False。

False
kwargs Optional[Any]]

要传递给 Runnable 的附加关键字参数。

{}

生成值

类型 描述
AsyncIterator[tuple[int, Union[Output, Exception]]]

一个元组,包含输入的索引和来自 Runnable 的输出。

astream_log 异步

astream_log(
    input: Any,
    config: Optional[RunnableConfig] = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Optional[Sequence[str]] = None,
    include_types: Optional[Sequence[str]] = None,
    include_tags: Optional[Sequence[str]] = None,
    exclude_names: Optional[Sequence[str]] = None,
    exclude_types: Optional[Sequence[str]] = None,
    exclude_tags: Optional[Sequence[str]] = None,
    **kwargs: Any
) -> Union[
    AsyncIterator[RunLogPatch], AsyncIterator[RunLog]
]

从 Runnable 流式传输所有输出,如报告给回调系统。

这包括所有 LLM、Retrievers、Tools 等的内部运行。

输出以 Log 对象形式流式传输,其中包含描述运行状态在每一步如何变化的 Jsonpatch 操作列表,以及运行的最终状态。

可以按顺序应用 Jsonpatch 操作来构建状态。

参数

名称 类型 描述 默认值
input Any

Runnable 的输入。

必需
config Optional[RunnableConfig]

用于 Runnable 的配置。

None
diff bool

是否在每一步之间或当前状态之间生成差异。

True
with_streamed_output_list bool

是否生成 streamed_output 列表。

True
include_names Optional[Sequence[str]]

仅包含具有这些名称的日志。

None
include_types Optional[Sequence[str]]

仅包含具有这些类型的日志。

None
include_tags Optional[Sequence[str]]

仅包含具有这些标签的日志。

None
exclude_names Optional[Sequence[str]]

排除具有这些名称的日志。

None
exclude_types Optional[Sequence[str]]

排除具有这些类型的日志。

None
exclude_tags Optional[Sequence[str]]

排除具有这些标签的日志。

None
kwargs Any

要传递给 Runnable 的附加关键字参数。

{}

生成值

类型 描述
Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

一个 RunLogPatch 或 RunLog 对象。

transform

transform(
    input: Iterator[Input],
    config: Optional[RunnableConfig] = None,
    **kwargs: Optional[Any]
) -> Iterator[Output]

transform 的默认实现,它会缓冲输入并调用 astream。

如果子类可以在输入仍在生成时开始产生输出,则应覆盖此方法。

参数

名称 类型 描述 默认值
input Iterator[Input]

一个 Runnable 输入的迭代器。

必需
config Optional[RunnableConfig]

用于 Runnable 的配置。默认为 None。

None
kwargs Optional[Any]]

要传递给 Runnable 的附加关键字参数。

{}

生成值

类型 描述
输出

Runnable 的输出。

atransform 异步

atransform(
    input: AsyncIterator[Input],
    config: Optional[RunnableConfig] = None,
    **kwargs: Optional[Any]
) -> AsyncIterator[Output]

atransform 的默认实现,它会缓冲输入并调用 astream。

如果子类可以在输入仍在生成时开始产生输出,则应覆盖此方法。

参数

名称 类型 描述 默认值
input AsyncIterator[Input]

一个 Runnable 输入的异步迭代器。

必需
config Optional[RunnableConfig]

用于 Runnable 的配置。默认为 None。

None
kwargs Optional[Any]]

要传递给 Runnable 的附加关键字参数。

{}

生成值

类型 描述
AsyncIterator[Output]

Runnable 的输出。

bind

bind(**kwargs: Any) -> Runnable[Input, Output]

将参数绑定到 Runnable,返回一个新的 Runnable。

当链中的 Runnable 需要一个不在上一个 Runnable 的输出中或不包含在用户输入中的参数时非常有用。

参数

名称 类型 描述 默认值
kwargs Any

要绑定到 Runnable 的参数。

{}

返回值

类型 描述
Runnable[Input, Output]

一个绑定了参数的新 Runnable。

示例

.. code-block:: python

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'

with_listeners

with_listeners(
    *,
    on_start: Optional[
        Union[
            Callable[[Run], None],
            Callable[[Run, RunnableConfig], None],
        ]
    ] = None,
    on_end: Optional[
        Union[
            Callable[[Run], None],
            Callable[[Run, RunnableConfig], None],
        ]
    ] = None,
    on_error: Optional[
        Union[
            Callable[[Run], None],
            Callable[[Run, RunnableConfig], None],
        ]
    ] = None
) -> Runnable[Input, Output]

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start: 在 Runnable 开始运行前调用,传入 Run 对象。 on_end: 在 Runnable 运行结束后调用,传入 Run 对象。 on_error: 如果 Runnable 抛出错误时调用,传入 Run 对象。

Run 对象包含关于运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间,以及添加到运行的任何标签或元数据。

参数

名称 类型 描述 默认值
on_start Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]

在 Runnable 开始运行前调用。默认为 None。

None
on_end Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]

在 Runnable 运行结束后调用。默认为 None。

None
on_error Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]

如果 Runnable 抛出错误时调用。默认为 None。

None

返回值

类型 描述
Runnable[Input, Output]

一个绑定了监听器的新 Runnable。

示例

.. code-block:: python

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)

with_alisteners

with_alisteners(
    *,
    on_start: Optional[AsyncListener] = None,
    on_end: Optional[AsyncListener] = None,
    on_error: Optional[AsyncListener] = None
) -> Runnable[Input, Output]

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start: 在 Runnable 开始运行前异步调用。 on_end: 在 Runnable 运行结束后异步调用。 on_error: 如果 Runnable 抛出错误时异步调用。

Run 对象包含关于运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间,以及添加到运行的任何标签或元数据。

参数

名称 类型 描述 默认值
on_start Optional[AsyncListener]

在 Runnable 开始运行前异步调用。默认为 None。

None
on_end Optional[AsyncListener]

在 Runnable 运行结束后异步调用。默认为 None。

None
on_error Optional[AsyncListener]

如果 Runnable 抛出错误时异步调用。默认为 None。

None

返回值

类型 描述
Runnable[Input, Output]

一个绑定了监听器的新 Runnable。

示例

.. code-block:: python

from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio

def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00

with_types

with_types(
    *,
    input_type: Optional[type[Input]] = None,
    output_type: Optional[type[Output]] = None
) -> Runnable[Input, Output]

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

参数

名称 类型 描述 默认值
input_type Optional[type[Input]]

要绑定到 Runnable 的输入类型。默认为 None。

None
output_type Optional[type[Output]]

要绑定到 Runnable 的输出类型。默认为 None。

None

返回值

类型 描述
Runnable[Input, Output]

一个绑定了类型的新 Runnable。

with_retry

with_retry(
    *,
    retry_if_exception_type: tuple[
        type[BaseException], ...
    ] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: Optional[
        ExponentialJitterParams
    ] = None,
    stop_after_attempt: int = 3
) -> Runnable[Input, Output]

创建一个新的 Runnable,它在发生异常时会重试原始 Runnable。

参数

名称 类型 描述 默认值
retry_if_exception_type tuple[type[BaseException], ...]

要重试的异常类型元组。默认为 (Exception,)。

(Exception,)
wait_exponential_jitter bool

是否在重试之间的等待时间中添加抖动。默认为 True。

True
stop_after_attempt int

放弃前的最大尝试次数。默认为 3。

3
exponential_jitter_params Optional[ExponentialJitterParams]

tenacity.wait_exponential_jitter 的参数。即:initial, max, exp_basejitter (均为浮点值)。

None

返回值

类型 描述
Runnable[Input, Output]

一个新的 Runnable,在异常发生时会重试原始 Runnable。

示例

.. code-block:: python

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)

map

map() -> Runnable[list[Input], list[Output]]

返回一个新的 Runnable,它将输入列表映射到输出列表。

对每个输入调用 invoke()。

返回值

类型 描述
Runnable[list[Input], list[Output]]

一个将输入列表映射到输出列表的新 Runnable。

示例

.. code-block:: python

        from langchain_core.runnables import RunnableLambda

        def _lambda(x: int) -> int:
            return x + 1

        runnable = RunnableLambda(_lambda)
        print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]

with_fallbacks

with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[
        type[BaseException], ...
    ] = (Exception,),
    exception_key: Optional[str] = None
) -> RunnableWithFallbacks[Input, Output]

为 Runnable 添加回退,返回一个新的 Runnable。

新的 Runnable 会先尝试原始 Runnable,失败后按顺序尝试每个回退 (fallback)。

参数

名称 类型 描述 默认值
fallbacks Sequence[Runnable[Input, Output]]

如果原始 Runnable 失败,则要尝试的一系列 runnables。

必需
exceptions_to_handle tuple[type[BaseException], ...]

要处理的异常类型元组。默认为 (Exception,)。

(Exception,)
exception_key Optional[str]

如果指定了字符串,则处理的异常将作为输入的一部分在指定键下传递给 fallbacks。如果为 None,异常将不会传递给 fallbacks。如果使用此参数,则基础 Runnable 及其 fallbacks 必须接受字典作为输入。默认为 None。

None

返回值

类型 描述
RunnableWithFallbacks[Input, Output]

一个新的 Runnable,它会尝试原始 Runnable,然后在失败时

RunnableWithFallbacks[Input, Output]

按顺序尝试每个回退 (fallback)。

示例

.. code-block:: python

    from typing import Iterator

    from langchain_core.runnables import RunnableGenerator


    def _generate_immediate_error(input: Iterator) -> Iterator[str]:
        raise ValueError()
        yield ""


    def _generate(input: Iterator) -> Iterator[str]:
        yield from "foo bar"


    runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
        [RunnableGenerator(_generate)]
        )
    print(''.join(runnable.stream({}))) #foo bar

参数

名称 类型 描述 默认值
fallbacks Sequence[Runnable[Input, Output]]

如果原始 Runnable 失败,则要尝试的一系列 runnables。

必需
exceptions_to_handle tuple[type[BaseException], ...]

要处理的异常类型元组。

(Exception,)
exception_key Optional[str]

如果指定了字符串,则处理的异常将作为输入的一部分在指定键下传递给 fallbacks。如果为 None,异常将不会传递给 fallbacks。如果使用此参数,则基础 Runnable 及其 fallbacks 必须接受字典作为输入。

None

返回值

类型 描述
RunnableWithFallbacks[Input, Output]

一个新的 Runnable,它会尝试原始 Runnable,然后在失败时

RunnableWithFallbacks[Input, Output]

按顺序尝试每个回退 (fallback)。

as_tool

as_tool(
    args_schema: Optional[type[BaseModel]] = None,
    *,
    name: Optional[str] = None,
    description: Optional[str] = None,
    arg_types: Optional[dict[str, type]] = None
) -> BaseTool

从 Runnable 创建一个 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。在可能的情况下,schema 会从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受字典作为输入,并且未对具体的字典键进行类型标注),可以直接使用 args_schema 指定 schema。您也可以通过传入 arg_types 来仅指定必需参数及其类型。

参数

名称 类型 描述 默认值
args_schema Optional[type[BaseModel]]

工具的 schema。默认为 None。

None
name Optional[str]

工具的名称。默认为 None。

None
description Optional[str]

工具的描述。默认为 None。

None
arg_types Optional[dict[str, type]]

参数名称到类型的字典。默认为 None。

None

返回值

类型 描述
BaseTool

一个 BaseTool 实例。

类型标注的字典输入

.. code-block:: python

from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: list[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

字典输入,通过 args_schema 指定 schema

.. code-block:: python

from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

字典输入,通过 arg_types 指定 schema

.. code-block:: python

from typing import Any
from langchain_core.runnables import RunnableLambda

def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

.. code-block:: python

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

.. versionadded:: 0.2.14

评论