Python SDK 参考¶
LangGraph 客户端实现连接到 LangGraph API。
此模块提供异步 (LangGraphClient) 和同步 (SyncLanggraphClient) 客户端,用于与 LangGraph API 的核心资源(例如助手、线程、运行和 Cron 作业)以及其持久文档存储进行交互。
LangGraphClient
¶
LangGraph API 的顶层客户端。
属性
-
–assistants 管理图表的版本化配置。
-
–threads 处理(可能)多轮交互,例如对话线程。
-
–runs 控制图表的单个调用。
-
–crons 管理计划操作。
-
–store 与持久性、共享数据存储进行交互。
HttpClient
¶
处理对 LangGraph API 的异步请求。
在提供的 httpx 客户端之上添加额外的错误消息和内容处理。
属性
-
(client
) –AsyncClient 底层 HTTPX 异步客户端。
get(path: str, *, params: Optional[QueryParamTypes] = None) -> Any
async
¶
发送 GET 请求。
post(path: str, *, json: Optional[dict]) -> Any
async
¶
发送 POST 请求。
put(path: str, *, json: dict) -> Any
async
¶
发送 PUT 请求。
patch(path: str, *, json: dict) -> Any
async
¶
发送 PATCH 请求。
delete(path: str, *, json: Optional[Any] = None) -> None
async
¶
发送 DELETE 请求。
stream(path: str, method: str, *, json: Optional[dict] = None, params: Optional[QueryParamTypes] = None) -> AsyncIterator[StreamPart]
async
¶
使用 SSE 流式传输结果。
AssistantsClient
¶
用于管理 LangGraph 中助手的客户端。
此类提供与助手交互的方法,助手是图表的版本化配置。
示例
client = get_client()
assistant = await client.assistants.get("assistant_id_123")
get(assistant_id: str) -> Assistant
async
¶
通过 ID 获取助手。
参数
-
assistant_id
(
) –str 要获取的助手的 ID。
返回
-
Assistant
(
) –Assistant 助手对象。
使用示例
assistant = await client.assistants.get(
assistant_id="my_assistant_id"
)
print(assistant)
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'},
'version': 1,
'name': 'my_assistant'
}
get_graph(assistant_id: str, *, xray: Union[int, bool] = False) -> dict[str, list[dict[str, Any]]]
async
¶
通过 ID 获取助手的图。
参数
-
assistant_id
(
) –str 要获取其图的助手的 ID。
-
xray
(
, default:Union [int ,bool ]False
) –包括子图的图表示。如果提供整数值,则仅包括深度小于或等于该值的子图。
返回
-
Graph
(
) –dict [str ,list [dict [str ,Any ]]]JSON 格式的助手图信息。
使用示例
graph_info = await client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas(assistant_id: str) -> GraphSchema
async
¶
通过 ID 获取助手的模式。
参数
-
assistant_id
(
) –str 要获取其模式的助手的 ID。
返回
-
GraphSchema
(
) –GraphSchema 助手的图模式。
使用示例
schema = await client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs(assistant_id: str, namespace: Optional[str] = None, recurse: bool = False) -> Subgraphs
async
¶
通过 ID 获取助手的模式。
参数
-
assistant_id
(
) –str 要获取其模式的助手的 ID。
返回
-
Subgraphs
(
) –Subgraphs 助手的图模式。
create(graph_id: Optional[str], config: Optional[Config] = None, *, metadata: Json = None, assistant_id: Optional[str] = None, if_exists: Optional[OnConflictBehavior] = None, name: Optional[str] = None) -> Assistant
async
¶
创建一个新的助手。
当图表可配置并且您想要基于不同的配置创建不同的助手时,此功能非常有用。
参数
-
graph_id
(
) –Optional [str ]助手应使用的图表的 ID。图表 ID 通常在您的 langgraph.json 配置文件中设置。
-
config
(
, default:Optional [Config ]None
) –用于图表的配置。
-
metadata
(
, default:Json None
) –要添加到助手的元数据。
-
assistant_id
(
, default:Optional [str ]None
) –要使用的助手 ID,如果未提供,则默认为随机 UUID。
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –如何处理重复创建。 默认在底层设置为“raise”。 必须为“raise”(如果重复则引发错误)或“do_nothing”(返回现有助手)。
-
name
(
, default:Optional [str ]None
) –助手的名称。 默认在底层设置为“Untitled”。
返回
-
Assistant
(
) –Assistant 创建的助手。
使用示例
assistant = await client.assistants.create(
graph_id="agent",
config={"configurable": {"model_name": "openai"}},
metadata={"number":1},
assistant_id="my-assistant-id",
if_exists="do_nothing",
name="my_name"
)
update(assistant_id: str, *, graph_id: Optional[str] = None, config: Optional[Config] = None, metadata: Json = None, name: Optional[str] = None) -> Assistant
async
¶
更新助手。
使用此功能可以指向不同的图表、更新配置或更改助手的元数据。
参数
-
assistant_id
(
) –str 要更新的助手。
-
graph_id
(
, default:Optional [str ]None
) –助手应使用的图表的 ID。 图表 ID 通常在您的 langgraph.json 配置文件中设置。 如果为 None,助手将继续指向同一图表。
-
config
(
, default:Optional [Config ]None
) –用于图表的配置。
-
metadata
(
, default:Json None
) –要与现有助手元数据合并的元数据。
-
name
(
, default:Optional [str ]None
) –助手的新名称。
返回
-
Assistant
(
) –Assistant 更新后的助手。
使用示例
assistant = await client.assistants.update(
assistant_id='e280dad7-8618-443f-87f1-8e41841c180f',
graph_id="other-graph",
config={"configurable": {"model_name": "anthropic"}},
metadata={"number":2}
)
delete(assistant_id: str) -> None
async
¶
删除助手。
参数
-
assistant_id
(
) –str 要删除的助手 ID。
返回
-
None
–无
使用示例
await client.assistants.delete(
assistant_id="my_assistant_id"
)
search(*, metadata: Json = None, graph_id: Optional[str] = None, limit: int = 10, offset: int = 0) -> list[Assistant]
async
¶
搜索助手。
参数
-
metadata
(
, default:Json None
) –要按其筛选的元数据。 每个 KV 对的精确匹配过滤器。
-
graph_id
(
, default:Optional [str ]None
) –要按其筛选的图表的 ID。 图表 ID 通常在您的 langgraph.json 配置文件中设置。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
返回
-
–list [Assistant ]list[Assistant]:助手列表。
使用示例
assistants = await client.assistants.search(
metadata = {"name":"my_name"},
graph_id="my_graph_id",
limit=5,
offset=5
)
get_versions(assistant_id: str, metadata: Json = None, limit: int = 10, offset: int = 0) -> list[AssistantVersion]
async
¶
列出助手的所有版本。
参数
-
assistant_id
(
) –str 要获取版本的助手 ID。
-
metadata
(
, default:Json None
) –要按其筛选版本的元数据。 每个 KV 对的精确匹配过滤器。
-
limit
(
, default:int 10
) –要返回的最大版本数。
-
offset
(
, default:int 0
) –要跳过的版本数。
返回
-
–list [AssistantVersion ]list[AssistantVersion]:助手版本列表。
使用示例
assistant_versions = await client.assistants.get_versions(
assistant_id="my_assistant_id"
)
set_latest(assistant_id: str, version: int) -> Assistant
async
¶
更改助手的版本。
参数
-
assistant_id
(
) –str 要删除的助手 ID。
-
version
(
) –int 要更改到的版本。
返回
-
Assistant
(
) –Assistant 助手对象。
使用示例
new_version_assistant = await client.assistants.set_latest(
assistant_id="my_assistant_id",
version=3
)
ThreadsClient
¶
用于管理 LangGraph 中线程的客户端。
线程维护图表在多次交互/调用(又名运行)中的状态。 它累积并持久化图表的状态,从而允许在图表的单独调用之间保持连续性。
示例
client = get_client()
new_thread = await client.threads.create(metadata={"user_id": "123"})
get(thread_id: str) -> Thread
async
¶
通过 ID 获取线程。
参数
-
thread_id
(
) –str 要获取的线程的 ID。
返回
-
Thread
(
) –Thread 线程对象。
使用示例
thread = await client.threads.get(
thread_id="my_thread_id"
)
print(thread)
-----------------------------------------------------
{
'thread_id': 'my_thread_id',
'created_at': '2024-07-18T18:35:15.540834+00:00',
'updated_at': '2024-07-18T18:35:15.540834+00:00',
'metadata': {'graph_id': 'agent'}
}
create(*, metadata: Json = None, thread_id: Optional[str] = None, if_exists: Optional[OnConflictBehavior] = None) -> Thread
async
¶
创建一个新的线程。
参数
-
metadata
(
, default:Json None
) –添加到线程的元数据。
-
thread_id
(
, 默认:Optional [str ]None
) –线程的ID。如果为 None,ID 将随机生成 UUID。
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –如何处理重复创建。默认在底层使用 'raise'。必须是 'raise' (如果重复则引发错误) 或 'do_nothing' (返回现有线程)。
返回
-
Thread
(
) –Thread 创建的线程。
使用示例
thread = await client.threads.create(
metadata={"number":1},
thread_id="my-thread-id",
if_exists="raise"
)
update(thread_id: str, *, metadata: dict[str, Any]) -> Thread
async
¶
更新一个线程。
参数
-
thread_id
(
) –str 要更新的线程的 ID。
-
metadata
(
) –dict [str ,Any ]要与现有线程元数据合并的元数据。
返回
-
Thread
(
) –Thread 创建的线程。
使用示例
thread = await client.threads.update(
thread_id="my-thread-id",
metadata={"number":1},
)
delete(thread_id: str) -> None
async
¶
删除一个线程。
参数
-
thread_id
(
) –str 要删除的线程的 ID。
返回
-
None
–无
使用示例
await client.threads.delete(
thread_id="my_thread_id"
)
search(*, metadata: Json = None, values: Json = None, status: Optional[ThreadStatus] = None, limit: int = 10, offset: int = 0) -> list[Thread]
async
¶
搜索线程。
参数
-
metadata
(
, default:Json None
) –要基于其进行过滤的线程元数据。
-
values
(
, 默认:Json None
) –要基于其进行过滤的状态值。
-
status
(
, 默认:Optional [ThreadStatus ]None
) –要基于其进行过滤的线程状态。必须是 'idle'、'busy'、'interrupted' 或 'error' 之一。
-
limit
(
, default:int 10
) –要返回的线程数的限制。
-
offset
(
, default:int 0
) –从线程表开始搜索的偏移量。
返回
-
–list [Thread ]list[Thread]: 匹配搜索参数的线程列表。
使用示例
threads = await client.threads.search(
metadata={"number":1},
status="interrupted",
limit=15,
offset=5
)
copy(thread_id: str) -> None
async
¶
复制一个线程。
参数
-
thread_id
(
) –str 要复制的线程的 ID。
返回
-
None
–无
使用示例
await client.threads.copy(
thread_id="my_thread_id"
)
get_state(thread_id: str, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, *, subgraphs: bool = False) -> ThreadState
async
¶
获取一个线程的状态。
参数
-
thread_id
(
) –str 要获取状态的线程的 ID。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要获取状态的检查点。
-
subgraphs
(
, 默认:bool False
) –包含子图状态。
返回
-
ThreadState
(
) –ThreadState 线程的状态。
使用示例
thread_state = await client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state(thread_id: str, values: Optional[Union[dict, Sequence[dict]]], *, as_node: Optional[str] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None) -> ThreadUpdateStateResponse
async
¶
更新一个线程的状态。
参数
-
thread_id
(
) –str 要更新的线程的 ID。
-
values
(
) –Optional [Union [dict ,Sequence [dict ]]]用于更新状态的值。
-
as_node
(
, 默认:Optional [str ]None
) –就像此节点刚执行过一样更新状态。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要更新状态的检查点。
返回
-
ThreadUpdateStateResponse
(
) –ThreadUpdateStateResponse 更新线程状态后的响应。
使用示例
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history(thread_id: str, *, limit: int = 10, before: Optional[str | Checkpoint] = None, metadata: Optional[dict] = None, checkpoint: Optional[Checkpoint] = None) -> list[ThreadState]
async
¶
获取一个线程的状态历史记录。
参数
-
thread_id
(
) –str 要获取状态历史记录的线程的 ID。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –返回此子图的状态。如果为空,则默认为根图。
-
limit
(
, default:int 10
) –要返回的最大状态数。
-
before
(
, 默认:Optional [str |Checkpoint ]None
) –返回此检查点之前的状态。
-
metadata
(
, 默认:Optional [dict ]None
) –按元数据键值对过滤状态。
返回
-
–list [ThreadState ]list[ThreadState]: 线程的状态历史记录。
使用示例
thread_state = await client.threads.get_history(
thread_id="my_thread_id",
limit=5,
)
RunsClient
¶
用于管理 LangGraph 中运行的客户端。
一次运行是助手的一次调用,带有可选的输入、配置和元数据。此客户端管理运行,运行可以是有状态的(在线程上)或无状态的。
示例
client = get_client()
run = await client.runs.create(assistant_id="asst_123", thread_id="thread_456", input={"query": "Hello"})
stream(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, stream_mode: Union[StreamMode, Sequence[StreamMode]] = 'values', stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, feedback_keys: Optional[Sequence[str]] = None, on_disconnect: Optional[DisconnectMode] = None, on_completion: Optional[OnCompletionBehavior] = None, webhook: Optional[str] = None, multitask_strategy: Optional[MultitaskStrategy] = None, if_not_exists: Optional[IfNotExists] = None, after_seconds: Optional[int] = None) -> AsyncIterator[StreamPart]
¶
创建一个运行并流式传输结果。
参数
-
thread_id
(
) –Optional [str ]要分配给线程的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。不能与输入组合使用。
-
stream_mode
(
, 默认:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –要使用的流模式。
-
stream_subgraphs
(
, 默认:bool False
) –是否从子图流式传输输出。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
feedback_keys
(
, 默认:Optional [Sequence [str ]]None
) –要分配给运行的反馈键。
-
on_disconnect
(
, 默认:Optional [DisconnectMode ]None
) –要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
–AsyncIterator [StreamPart ]AsyncIterator[StreamPart]: 流结果的异步迭代器。
使用示例
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, stream_mode: Union[StreamMode, Sequence[StreamMode]] = 'values', stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[MultitaskStrategy] = None, if_not_exists: Optional[IfNotExists] = None, on_completion: Optional[OnCompletionBehavior] = None, after_seconds: Optional[int] = None) -> Run
async
¶
创建一个后台运行。
参数
-
thread_id
(
) –Optional [str ]要分配给线程的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。不能与输入组合使用。
-
stream_mode
(
, 默认:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –要使用的流模式。
-
stream_subgraphs
(
, 默认:bool False
) –是否从子图流式传输输出。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
Run
(
) –Run 创建的后台运行。
使用示例
background_run = await client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch(payloads: list[RunCreate]) -> list[Run]
async
¶
创建一批无状态后台运行。
wait(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, webhook: Optional[str] = None, on_disconnect: Optional[DisconnectMode] = None, on_completion: Optional[OnCompletionBehavior] = None, multitask_strategy: Optional[MultitaskStrategy] = None, if_not_exists: Optional[IfNotExists] = None, after_seconds: Optional[int] = None, raise_error: bool = True) -> Union[list[dict], dict[str, Any]]
async
¶
创建一个运行,等待其完成并返回最终状态。
参数
-
thread_id
(
) –Optional [str ]要在其上创建运行的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要运行的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。不能与输入组合使用。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
on_disconnect
(
, 默认:Optional [DisconnectMode ]None
) –要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
–Union [list [dict ],dict [str ,Any ]]Union[list[dict], dict[str, Any]]: 运行的输出。
使用示例
final_state_of_run = await client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list(thread_id: str, *, limit: int = 10, offset: int = 0, status: Optional[RunStatus] = None) -> List[Run]
async
¶
列出运行。
参数
-
thread_id
(
) –str 要列出运行的线程 ID。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
-
status
(
, 默认:Optional [RunStatus ]None
) –要按其过滤的运行状态。
返回
-
–List [Run ]List[Run]: 线程的运行列表。
使用示例
await client.runs.list(
thread_id="thread_id",
limit=5,
offset=5,
)
get(thread_id: str, run_id: str) -> Run
async
¶
获取一个运行。
参数
-
thread_id
(
) –str 要获取的线程 ID。
-
run_id
(
) –str 要获取的运行 ID。
返回
-
Run
(
) –Run 运行对象。
使用示例
run = await client.runs.get(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete",
)
cancel(thread_id: str, run_id: str, *, wait: bool = False, action: CancelAction = 'interrupt') -> None
async
¶
获取一个运行。
参数
-
thread_id
(
) –str 要取消的线程 ID。
-
run_id
(
) –str 要取消的运行 ID。
-
wait
(
, 默认:bool False
) –是否等待运行完成。
-
action
(
, 默认:CancelAction 'interrupt'
) –取消运行时要采取的操作。可能的值为
interrupt
或rollback
。默认为interrupt
。
返回
-
None
–无
使用示例
await client.runs.cancel(
thread_id="thread_id_to_cancel",
run_id="run_id_to_cancel",
wait=True,
action="interrupt"
)
join(thread_id: str, run_id: str) -> dict
async
¶
阻塞直到运行完成。返回线程的最终状态。
参数
-
thread_id
(
) –str 要加入的线程 ID。
-
run_id
(
) –str 要加入的运行 ID。
返回
-
–dict 无
使用示例
result =await client.runs.join(
thread_id="thread_id_to_join",
run_id="run_id_to_join"
)
join_stream(thread_id: str, run_id: str, *, cancel_on_disconnect: bool = False, stream_mode: Optional[Union[StreamMode, Sequence[StreamMode]]] = None) -> AsyncIterator[StreamPart]
¶
实时从运行中流式输出,直到运行完成。输出不会被缓冲,因此在此调用之前产生的任何输出将不会在此处接收。
参数
-
thread_id
(
) –str 要加入的线程 ID。
-
run_id
(
) –str 要加入的运行 ID。
-
cancel_on_disconnect
(
, 默认值:bool False
) –当流断开连接时是否取消运行。
-
stream_mode
(
, 默认值:Optional [Union [StreamMode ,Sequence [StreamMode ]]]None
) –要使用的流模式。必须是创建运行时传递的流模式的子集。后台运行默认为所有流模式的并集。
返回
-
–AsyncIterator [StreamPart ]无
使用示例
await client.runs.join_stream(
thread_id="thread_id_to_join",
run_id="run_id_to_join",
stream_mode=["values", "debug"]
)
delete(thread_id: str, run_id: str) -> None
async
¶
删除运行。
参数
-
thread_id
(
) –str 要删除的线程 ID。
-
run_id
(
) –str 要删除的运行 ID。
返回
-
None
–无
使用示例
await client.runs.delete(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete"
)
CronClient
¶
用于管理 LangGraph 中周期性运行(cron 任务)的客户端。
运行是助手的一次调用,带有可选的输入和配置。此客户端允许安排定期运行以自动发生。
示例
client = get_client()
cron_job = await client.crons.create_for_thread(
thread_id="thread_123",
assistant_id="asst_456",
schedule="0 9 * * *",
input={"message": "Daily update"}
)
create_for_thread(thread_id: str, assistant_id: str, *, schedule: str, input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[str] = None) -> Run
async
¶
为线程创建 cron 任务。
参数
-
thread_id
(
) –str 要在其上运行 cron 任务的线程 ID。
-
assistant_id
(
) –str 用于 cron 任务的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
schedule
(
) –str 执行此作业的 cron 计划。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给 cron 任务运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
interrupt_before
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认值:Optional [str ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
返回
-
Run
(
) –Run cron 任务运行。
使用示例
cron_run = await client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create(assistant_id: str, *, schedule: str, input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[str] = None) -> Run
async
¶
创建 cron 任务运行。
参数
-
assistant_id
(
) –str 用于 cron 任务的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
schedule
(
) –str 执行此作业的 cron 计划。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给 cron 任务运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
interrupt_before
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认值:Optional [str ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
返回
-
Run
(
) –Run cron 任务运行。
使用示例
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete(cron_id: str) -> None
async
¶
删除 cron 任务。
参数
-
cron_id
(
) –str 要删除的 cron 任务 ID。
返回
-
None
–无
使用示例
await client.crons.delete(
cron_id="cron_to_delete"
)
search(*, assistant_id: Optional[str] = None, thread_id: Optional[str] = None, limit: int = 10, offset: int = 0) -> list[Cron]
async
¶
获取 cron 任务列表。
参数
-
assistant_id
(
, default:Optional [str ]None
) –要搜索的助手 ID 或图名称。
-
thread_id
(
, 默认:Optional [str ]None
) –要搜索的线程 ID。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
返回
-
–list [Cron ]list[Cron]: 搜索返回的 cron 任务列表,
使用示例
cron_jobs = await client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
StoreClient
¶
用于与图的共享存储交互的客户端。
Store 提供了一个键值存储系统,用于在图执行之间持久化数据,从而允许跨线程进行有状态操作和数据共享。
示例
client = get_client()
await client.store.put_item(["users", "user123"], "mem-123451342", {"name": "Alice", "score": 100})
put_item(namespace: Sequence[str], /, key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, ttl: Optional[int] = None) -> None
async
¶
存储或更新一个条目。
参数
-
namespace
(
) –Sequence [str ]表示命名空间路径的字符串列表。
-
key
(
) –str 命名空间内条目的唯一标识符。
-
value
(
) –dict [str ,Any ]包含条目数据的字典。
-
index
(
, 默认值:Optional [Union [Literal [False],list [str ]]]None
) –控制搜索索引 - None(使用默认值)、False(禁用)或要索引的字段路径列表。
-
ttl
(
, 默认值:Optional [int ]None
) –条目的可选生存时间(分钟),如果为 None 则表示永不过期。
返回
-
None
–无
使用示例
await client.store.put_item(
["documents", "user123"],
key="item456",
value={"title": "My Document", "content": "Hello World"}
)
get_item(namespace: Sequence[str], /, key: str, *, refresh_ttl: Optional[bool] = None) -> Item
async
¶
检索单个条目。
参数
-
key
(
) –str 条目的唯一标识符。
-
namespace
(
) –Sequence [str ]表示命名空间路径的可选字符串列表。
-
refresh_ttl
(
, 默认值:Optional [bool ]None
) –是否在此读取操作时刷新 TTL。如果为 None,则使用存储的默认行为。
返回
-
Item
(
) –Item 检索到的条目。
使用示例
item = await client.store.get_item(
["documents", "user123"],
key="item456",
)
print(item)
----------------------------------------------------------------
{
'namespace': ['documents', 'user123'],
'key': 'item456',
'value': {'title': 'My Document', 'content': 'Hello World'},
'created_at': '2024-07-30T12:00:00Z',
'updated_at': '2024-07-30T12:00:00Z'
}
delete_item(namespace: Sequence[str], /, key: str) -> None
async
¶
删除一个条目。
参数
-
key
(
) –str 条目的唯一标识符。
-
namespace
(
) –Sequence [str ]表示命名空间路径的可选字符串列表。
返回
-
None
–无
使用示例
await client.store.delete_item(
["documents", "user123"],
key="item456",
)
search_items(namespace_prefix: Sequence[str], /, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, query: Optional[str] = None, refresh_ttl: Optional[bool] = None) -> SearchItemsResponse
async
¶
在命名空间前缀中搜索条目。
参数
-
namespace_prefix
(
) –Sequence [str ]表示命名空间前缀的字符串列表。
-
filter
(
, 默认值:Optional [dict [str ,Any ]]None
) –用于过滤结果的可选键值对字典。
-
limit
(
, default:int 10
) –要返回的最大条目数(默认为 10)。
-
offset
(
, default:int 0
) –在返回结果之前要跳过的条目数(默认为 0)。
-
query
(
, 默认值:Optional [str ]None
) –用于自然语言搜索的可选查询。
-
refresh_ttl
(
, 默认值:Optional [bool ]None
) –是否刷新此搜索返回的条目的 TTL。如果为 None,则使用存储的默认行为。
返回
-
–SearchItemsResponse List[Item]: 与搜索条件匹配的条目列表。
使用示例
items = await client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces(prefix: Optional[List[str]] = None, suffix: Optional[List[str]] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> ListNamespaceResponse
async
¶
列出命名空间,带有可选的匹配条件。
参数
-
prefix
(
, 默认值:Optional [List [str ]]None
) –表示要过滤命名空间的前缀的可选字符串列表。
-
suffix
(
, 默认值:Optional [List [str ]]None
) –表示要过滤命名空间的后缀的可选字符串列表。
-
max_depth
(
, 默认值:Optional [int ]None
) –指定要返回的命名空间最大深度的可选整数。
-
limit
(
, 默认值:int 100
) –要返回的最大命名空间数(默认为 100)。
-
offset
(
, default:int 0
) –在返回结果之前要跳过的命名空间数(默认为 0)。
返回
-
–ListNamespaceResponse List[List[str]]: 与条件匹配的命名空间列表。
使用示例
namespaces = await client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
SyncLangGraphClient
¶
用于与 LangGraph API 同步交互的客户端。
此类提供对 LangGraph API 端点的同步访问,用于管理助手、线程、运行、cron 任务和数据存储。
示例
client = get_sync_client()
assistant = client.assistants.get("asst_123")
SyncHttpClient
¶
get(path: str, *, params: Optional[QueryParamTypes] = None) -> Any
¶
发送 GET 请求。
post(path: str, *, json: Optional[dict]) -> Any
¶
发送 POST 请求。
put(path: str, *, json: dict) -> Any
¶
发送 PUT 请求。
patch(path: str, *, json: dict) -> Any
¶
发送 PATCH 请求。
delete(path: str, *, json: Optional[Any] = None) -> None
¶
发送 DELETE 请求。
stream(path: str, method: str, *, json: Optional[dict] = None, params: Optional[QueryParamTypes] = None) -> Iterator[StreamPart]
¶
使用 SSE 流式传输请求的结果。
SyncAssistantsClient
¶
用于同步管理 LangGraph 中助手的客户端。
此类提供与助手交互的方法,助手是图表的版本化配置。
示例
client = get_client()
assistant = client.assistants.get("assistant_id_123")
get(assistant_id: str) -> Assistant
¶
通过 ID 获取助手。
参数
-
assistant_id
(
) –str 要获取的助手的 ID。
返回
-
Assistant
(
) –Assistant 助手对象。
使用示例
assistant = client.assistants.get(
assistant_id="my_assistant_id"
)
print(assistant)
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'}
}
get_graph(assistant_id: str, *, xray: Union[int, bool] = False) -> dict[str, list[dict[str, Any]]]
¶
通过 ID 获取助手的图。
参数
-
assistant_id
(
) –str 要获取其图的助手的 ID。
-
xray
(
, default:Union [int ,bool ]False
) –包括子图的图表示。如果提供整数值,则仅包括深度小于或等于该值的子图。
返回
-
Graph
(
) –dict [str ,list [dict [str ,Any ]]]JSON 格式的助手图信息。
使用示例
graph_info = client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas(assistant_id: str) -> GraphSchema
¶
通过 ID 获取助手的模式。
参数
-
assistant_id
(
) –str 要获取其模式的助手的 ID。
返回
-
GraphSchema
(
) –GraphSchema 助手的图模式。
使用示例
schema = client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs(assistant_id: str, namespace: Optional[str] = None, recurse: bool = False) -> Subgraphs
¶
通过 ID 获取助手的模式。
参数
-
assistant_id
(
) –str 要获取其模式的助手的 ID。
返回
-
Subgraphs
(
) –Subgraphs 助手的图模式。
create(graph_id: Optional[str], config: Optional[Config] = None, *, metadata: Json = None, assistant_id: Optional[str] = None, if_exists: Optional[OnConflictBehavior] = None, name: Optional[str] = None) -> Assistant
¶
创建一个新的助手。
当图表可配置并且您想要基于不同的配置创建不同的助手时,此功能非常有用。
参数
-
graph_id
(
) –Optional [str ]助手应使用的图表的 ID。图表 ID 通常在您的 langgraph.json 配置文件中设置。
-
config
(
, default:Optional [Config ]None
) –用于图表的配置。
-
metadata
(
, default:Json None
) –要添加到助手的元数据。
-
assistant_id
(
, default:Optional [str ]None
) –要使用的助手 ID,如果未提供,则默认为随机 UUID。
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –如何处理重复创建。 默认在底层设置为“raise”。 必须为“raise”(如果重复则引发错误)或“do_nothing”(返回现有助手)。
-
name
(
, default:Optional [str ]None
) –助手的名称。 默认在底层设置为“Untitled”。
返回
-
Assistant
(
) –Assistant 创建的助手。
使用示例
assistant = client.assistants.create(
graph_id="agent",
config={"configurable": {"model_name": "openai"}},
metadata={"number":1},
assistant_id="my-assistant-id",
if_exists="do_nothing",
name="my_name"
)
update(assistant_id: str, *, graph_id: Optional[str] = None, config: Optional[Config] = None, metadata: Json = None, name: Optional[str] = None) -> Assistant
¶
更新助手。
使用此功能可以指向不同的图表、更新配置或更改助手的元数据。
参数
-
assistant_id
(
) –str 要更新的助手。
-
graph_id
(
, default:Optional [str ]None
) –助手应使用的图表的 ID。 图表 ID 通常在您的 langgraph.json 配置文件中设置。 如果为 None,助手将继续指向同一图表。
-
config
(
, default:Optional [Config ]None
) –用于图表的配置。
-
metadata
(
, default:Json None
) –要与现有助手元数据合并的元数据。
-
name
(
, default:Optional [str ]None
) –助手的新名称。
返回
-
Assistant
(
) –Assistant 更新后的助手。
使用示例
assistant = client.assistants.update(
assistant_id='e280dad7-8618-443f-87f1-8e41841c180f',
graph_id="other-graph",
config={"configurable": {"model_name": "anthropic"}},
metadata={"number":2}
)
delete(assistant_id: str) -> None
¶
删除助手。
参数
-
assistant_id
(
) –str 要删除的助手 ID。
返回
-
None
–无
使用示例
client.assistants.delete(
assistant_id="my_assistant_id"
)
search(*, metadata: Json = None, graph_id: Optional[str] = None, limit: int = 10, offset: int = 0) -> list[Assistant]
¶
搜索助手。
参数
-
metadata
(
, default:Json None
) –要按其筛选的元数据。 每个 KV 对的精确匹配过滤器。
-
graph_id
(
, default:Optional [str ]None
) –要按其筛选的图表的 ID。 图表 ID 通常在您的 langgraph.json 配置文件中设置。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
返回
-
–list [Assistant ]list[Assistant]:助手列表。
使用示例
assistants = client.assistants.search(
metadata = {"name":"my_name"},
graph_id="my_graph_id",
limit=5,
offset=5
)
get_versions(assistant_id: str, metadata: Json = None, limit: int = 10, offset: int = 0) -> list[AssistantVersion]
¶
列出助手的所有版本。
参数
-
assistant_id
(
) –str 要获取版本的助手 ID。
-
metadata
(
, default:Json None
) –要按其筛选版本的元数据。 每个 KV 对的精确匹配过滤器。
-
limit
(
, default:int 10
) –要返回的最大版本数。
-
offset
(
, default:int 0
) –要跳过的版本数。
返回
-
–list [AssistantVersion ]list[Assistant]:助手列表。
使用示例
assistant_versions = await client.assistants.get_versions(
assistant_id="my_assistant_id"
)
set_latest(assistant_id: str, version: int) -> Assistant
¶
更改助手的版本。
参数
-
assistant_id
(
) –str 要删除的助手 ID。
-
version
(
) –int 要更改到的版本。
返回
-
Assistant
(
) –Assistant 助手对象。
使用示例
new_version_assistant = await client.assistants.set_latest(
assistant_id="my_assistant_id",
version=3
)
SyncThreadsClient
¶
用于同步管理 LangGraph 中线程的客户端。
此类提供创建、检索和管理线程的方法,线程代表会话或有状态交互。
示例
client = get_sync_client()
thread = client.threads.create(metadata={"user_id": "123"})
get(thread_id: str) -> Thread
¶
通过 ID 获取线程。
参数
-
thread_id
(
) –str 要获取的线程的 ID。
返回
-
Thread
(
) –Thread 线程对象。
使用示例
thread = client.threads.get(
thread_id="my_thread_id"
)
print(thread)
-----------------------------------------------------
{
'thread_id': 'my_thread_id',
'created_at': '2024-07-18T18:35:15.540834+00:00',
'updated_at': '2024-07-18T18:35:15.540834+00:00',
'metadata': {'graph_id': 'agent'}
}
create(*, metadata: Json = None, thread_id: Optional[str] = None, if_exists: Optional[OnConflictBehavior] = None) -> Thread
¶
创建一个新的线程。
参数
-
metadata
(
, default:Json None
) –添加到线程的元数据。
-
thread_id
(
, 默认:Optional [str ]None
) –线程的ID。如果为 None,ID 将随机生成 UUID。
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –如何处理重复创建。默认在底层使用 'raise'。必须是 'raise' (如果重复则引发错误) 或 'do_nothing' (返回现有线程)。
返回
-
Thread
(
) –Thread 创建的线程。
使用示例
thread = client.threads.create(
metadata={"number":1},
thread_id="my-thread-id",
if_exists="raise"
)
update(thread_id: str, *, metadata: dict[str, Any]) -> Thread
¶
更新一个线程。
参数
-
thread_id
(
) –str 要更新的线程的 ID。
-
metadata
(
) –dict [str ,Any ]要与现有线程元数据合并的元数据。
返回
-
Thread
(
) –Thread 创建的线程。
使用示例
thread = client.threads.update(
thread_id="my-thread-id",
metadata={"number":1},
)
delete(thread_id: str) -> None
¶
删除一个线程。
参数
-
thread_id
(
) –str 要删除的线程的 ID。
返回
-
None
–无
使用示例
client.threads.delete(
thread_id="my_thread_id"
)
search(*, metadata: Json = None, values: Json = None, status: Optional[ThreadStatus] = None, limit: int = 10, offset: int = 0) -> list[Thread]
¶
搜索线程。
参数
-
metadata
(
, default:Json None
) –要基于其进行过滤的线程元数据。
-
values
(
, 默认:Json None
) –要基于其进行过滤的状态值。
-
status
(
, 默认:Optional [ThreadStatus ]None
) –要基于其进行过滤的线程状态。必须是 'idle'、'busy'、'interrupted' 或 'error' 之一。
-
limit
(
, default:int 10
) –要返回的线程数的限制。
-
offset
(
, default:int 0
) –从线程表开始搜索的偏移量。
返回
-
–list [Thread ]list[Thread]: 匹配搜索参数的线程列表。
使用示例
threads = client.threads.search(
metadata={"number":1},
status="interrupted",
limit=15,
offset=5
)
copy(thread_id: str) -> None
¶
复制一个线程。
参数
-
thread_id
(
) –str 要复制的线程的 ID。
返回
-
None
–无
使用示例
client.threads.copy(
thread_id="my_thread_id"
)
get_state(thread_id: str, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, *, subgraphs: bool = False) -> ThreadState
¶
获取一个线程的状态。
参数
-
thread_id
(
) –str 要获取状态的线程的 ID。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要获取状态的检查点。
-
subgraphs
(
, 默认:bool False
) –包含子图状态。
返回
-
ThreadState
(
) –ThreadState 线程的状态。
使用示例
thread_state = client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state(thread_id: str, values: Optional[Union[dict, Sequence[dict]]], *, as_node: Optional[str] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None) -> ThreadUpdateStateResponse
¶
更新一个线程的状态。
参数
-
thread_id
(
) –str 要更新的线程的 ID。
-
values
(
) –Optional [Union [dict ,Sequence [dict ]]]用于更新状态的值。
-
as_node
(
, 默认:Optional [str ]None
) –就像此节点刚执行过一样更新状态。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要更新状态的检查点。
返回
-
ThreadUpdateStateResponse
(
) –ThreadUpdateStateResponse 更新线程状态后的响应。
使用示例
response = client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history(thread_id: str, *, limit: int = 10, before: Optional[str | Checkpoint] = None, metadata: Optional[dict] = None, checkpoint: Optional[Checkpoint] = None) -> list[ThreadState]
¶
获取一个线程的状态历史记录。
参数
-
thread_id
(
) –str 要获取状态历史记录的线程的 ID。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –返回此子图的状态。如果为空,则默认为根图。
-
limit
(
, default:int 10
) –要返回的最大状态数。
-
before
(
, 默认:Optional [str |Checkpoint ]None
) –返回此检查点之前的状态。
-
metadata
(
, 默认:Optional [dict ]None
) –按元数据键值对过滤状态。
返回
-
–list [ThreadState ]list[ThreadState]: 线程的状态历史记录。
使用示例
thread_state = client.threads.get_history(
thread_id="my_thread_id",
limit=5,
before="my_timestamp",
metadata={"name":"my_name"}
)
SyncRunsClient
¶
LangGraph 中用于管理运行的同步客户端。
此类提供了创建、检索和管理运行的方法,运行代表图的单独执行。
示例
client = get_sync_client()
run = client.runs.create(thread_id="thread_123", assistant_id="asst_456")
stream(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, stream_mode: Union[StreamMode, Sequence[StreamMode]] = 'values', stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, feedback_keys: Optional[Sequence[str]] = None, on_disconnect: Optional[DisconnectMode] = None, on_completion: Optional[OnCompletionBehavior] = None, webhook: Optional[str] = None, multitask_strategy: Optional[MultitaskStrategy] = None, if_not_exists: Optional[IfNotExists] = None, after_seconds: Optional[int] = None) -> Iterator[StreamPart]
¶
创建一个运行并流式传输结果。
参数
-
thread_id
(
) –Optional [str ]要分配给线程的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。
-
stream_mode
(
, 默认:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –要使用的流模式。
-
stream_subgraphs
(
, 默认:bool False
) –是否从子图流式传输输出。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
feedback_keys
(
, 默认:Optional [Sequence [str ]]None
) –要分配给运行的反馈键。
-
on_disconnect
(
, 默认:Optional [DisconnectMode ]None
) –要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
–Iterator [StreamPart ]Iterator[StreamPart]: 流结果的迭代器。
使用示例
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, stream_mode: Union[StreamMode, Sequence[StreamMode]] = 'values', stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[MultitaskStrategy] = None, on_completion: Optional[OnCompletionBehavior] = None, if_not_exists: Optional[IfNotExists] = None, after_seconds: Optional[int] = None) -> Run
¶
创建一个后台运行。
参数
-
thread_id
(
) –Optional [str ]要分配给线程的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。
-
stream_mode
(
, 默认:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –要使用的流模式。
-
stream_subgraphs
(
, 默认:bool False
) –是否从子图流式传输输出。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
Run
(
) –Run 创建的后台运行。
使用示例
background_run = client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch(payloads: list[RunCreate]) -> list[Run]
¶
创建一批无状态后台运行。
wait(thread_id: Optional[str], assistant_id: str, *, input: Optional[dict] = None, command: Optional[Command] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[Union[All, Sequence[str]]] = None, interrupt_after: Optional[Union[All, Sequence[str]]] = None, webhook: Optional[str] = None, on_disconnect: Optional[DisconnectMode] = None, on_completion: Optional[OnCompletionBehavior] = None, multitask_strategy: Optional[MultitaskStrategy] = None, if_not_exists: Optional[IfNotExists] = None, after_seconds: Optional[int] = None) -> Union[list[dict], dict[str, Any]]
¶
创建一个运行,等待其完成并返回最终状态。
参数
-
thread_id
(
) –Optional [str ]要在其上创建运行的线程 ID。如果为 None,将创建一个无状态运行。
-
assistant_id
(
) –str 要运行的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
command
(
, 默认:Optional [Command ]None
) –要执行的命令。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
checkpoint
(
, 默认:Optional [Checkpoint ]None
) –要从其恢复的检查点。
-
interrupt_before
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认:Optional [Union [All ,Sequence [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
on_disconnect
(
, 默认:Optional [DisconnectMode ]None
) –要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
-
on_completion
(
, 默认:Optional [OnCompletionBehavior ]None
) –是否删除或保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
-
multitask_strategy
(
, 默认:Optional [MultitaskStrategy ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
-
if_not_exists
(
, 默认:Optional [IfNotExists ]None
) –如何处理丢失的线程。默认为 'reject'。必须是 'reject' (如果丢失则引发错误) 或 'create' (创建新线程)。
-
after_seconds
(
, 默认:Optional [int ]None
) –在开始运行之前要等待的秒数。用于计划将来的运行。
返回
-
–Union [list [dict ],dict [str ,Any ]]Union[list[dict], dict[str, Any]]: 运行的输出。
使用示例
final_state_of_run = client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list(thread_id: str, *, limit: int = 10, offset: int = 0) -> List[Run]
¶
列出运行。
参数
-
thread_id
(
) –str 要列出运行的线程 ID。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
返回
-
–List [Run ]List[Run]: 线程的运行列表。
使用示例
client.runs.list(
thread_id="thread_id",
limit=5,
offset=5,
)
get(thread_id: str, run_id: str) -> Run
¶
获取一个运行。
参数
-
thread_id
(
) –str 要获取的线程 ID。
-
run_id
(
) –str 要获取的运行 ID。
返回
-
Run
(
) –Run 运行对象。
使用示例
run = client.runs.get(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete",
)
cancel(thread_id: str, run_id: str, *, wait: bool = False, action: CancelAction = 'interrupt') -> None
¶
获取一个运行。
参数
-
thread_id
(
) –str 要取消的线程 ID。
-
run_id
(
) –str 要取消的运行 ID。
-
wait
(
, 默认:bool False
) –是否等待运行完成。
-
action
(
, 默认:CancelAction 'interrupt'
) –取消运行时要采取的操作。可能的值为
interrupt
或rollback
。默认为interrupt
。
返回
-
None
–无
使用示例
client.runs.cancel(
thread_id="thread_id_to_cancel",
run_id="run_id_to_cancel",
wait=True,
action="interrupt"
)
join(thread_id: str, run_id: str) -> dict
¶
阻塞直到运行完成。返回线程的最终状态。
参数
-
thread_id
(
) –str 要加入的线程 ID。
-
run_id
(
) –str 要加入的运行 ID。
返回
-
–dict 无
使用示例
client.runs.join(
thread_id="thread_id_to_join",
run_id="run_id_to_join"
)
join_stream(thread_id: str, run_id: str, *, stream_mode: Optional[Union[StreamMode, Sequence[StreamMode]]] = None, cancel_on_disconnect: bool = False) -> Iterator[StreamPart]
¶
实时从运行中流式输出,直到运行完成。输出不会被缓冲,因此在此调用之前产生的任何输出将不会在此处接收。
参数
-
thread_id
(
) –str 要加入的线程 ID。
-
run_id
(
) –str 要加入的运行 ID。
-
stream_mode
(
, 默认值:Optional [Union [StreamMode ,Sequence [StreamMode ]]]None
) –要使用的流模式。必须是创建运行时传递的流模式的子集。后台运行默认为所有流模式的并集。
-
cancel_on_disconnect
(
, 默认值:bool False
) –当流断开连接时是否取消运行。
返回
-
–Iterator [StreamPart ]无
使用示例
client.runs.join_stream(
thread_id="thread_id_to_join",
run_id="run_id_to_join",
stream_mode=["values", "debug"]
)
delete(thread_id: str, run_id: str) -> None
¶
删除运行。
参数
-
thread_id
(
) –str 要删除的线程 ID。
-
run_id
(
) –str 要删除的运行 ID。
返回
-
None
–无
使用示例
client.runs.delete(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete"
)
SyncCronClient
¶
LangGraph 中用于管理 cron 作业的同步客户端。
此类提供了创建和管理计划任务(cron 作业)的方法,用于自动图执行。
示例
client = get_sync_client()
cron_job = client.crons.create_for_thread(thread_id="thread_123", assistant_id="asst_456", schedule="0 * * * *")
create_for_thread(thread_id: str, assistant_id: str, *, schedule: str, input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[str] = None) -> Run
¶
为线程创建 cron 任务。
参数
-
thread_id
(
) –str 要在其上运行 cron 任务的线程 ID。
-
assistant_id
(
) –str 用于 cron 任务的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
schedule
(
) –str 执行此作业的 cron 计划。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给 cron 任务运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
interrupt_before
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认值:Optional [str ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
返回
-
Run
(
) –Run cron 任务运行。
使用示例
cron_run = client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create(assistant_id: str, *, schedule: str, input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, webhook: Optional[str] = None, multitask_strategy: Optional[str] = None) -> Run
¶
创建 cron 任务运行。
参数
-
assistant_id
(
) –str 用于 cron 任务的助手 ID 或图名称。如果使用图名称,将默认为从该图创建的第一个助手。
-
schedule
(
) –str 执行此作业的 cron 计划。
-
input
(
, 默认:Optional [dict ]None
) –图的输入。
-
metadata
(
, 默认:Optional [dict ]None
) –要分配给 cron 任务运行的元数据。
-
config
(
, default:Optional [Config ]None
) –助手的配置。
-
interrupt_before
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之前立即中断的节点。
-
interrupt_after
(
, 默认值:Optional [Union [All ,list [str ]]]None
) –要在节点执行之后立即中断的节点。
-
webhook
(
, 默认:Optional [str ]None
) –LangGraph API 调用完成后要调用的 Webhook。
-
multitask_strategy
(
, 默认值:Optional [str ]None
) –要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
返回
-
Run
(
) –Run cron 任务运行。
使用示例
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete(cron_id: str) -> None
¶
删除 cron 任务。
参数
-
cron_id
(
) –str 要删除的 cron 任务 ID。
返回
-
None
–无
使用示例
client.crons.delete(
cron_id="cron_to_delete"
)
search(*, assistant_id: Optional[str] = None, thread_id: Optional[str] = None, limit: int = 10, offset: int = 0) -> list[Cron]
¶
获取 cron 任务列表。
参数
-
assistant_id
(
, default:Optional [str ]None
) –要搜索的助手 ID 或图名称。
-
thread_id
(
, 默认:Optional [str ]None
) –要搜索的线程 ID。
-
limit
(
, default:int 10
) –要返回的最大结果数。
-
offset
(
, default:int 0
) –要跳过的结果数。
返回
-
–list [Cron ]list[Cron]: 搜索返回的 cron 任务列表,
使用示例
cron_jobs = client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
SyncStoreClient
¶
用于键值存储上同步操作的客户端。
提供与远程键值存储交互的方法,允许在命名空间层级结构中存储和检索项目。
示例
client = get_sync_client()
client.store.put_item(["users", "profiles"], "user123", {"name": "Alice", "age": 30})
put_item(namespace: Sequence[str], /, key: str, value: dict[str, Any], index: Optional[Union[Literal[False], list[str]]] = None, ttl: Optional[int] = None) -> None
¶
存储或更新一个条目。
参数
-
namespace
(
) –Sequence [str ]表示命名空间路径的字符串列表。
-
key
(
) –str 命名空间内条目的唯一标识符。
-
value
(
) –dict [str ,Any ]包含条目数据的字典。
-
index
(
, 默认值:Optional [Union [Literal [False],list [str ]]]None
) –控制搜索索引 - None(使用默认值)、False(禁用)或要索引的字段路径列表。
-
ttl
(
, 默认值:Optional [int ]None
) –条目的可选生存时间(分钟),如果为 None 则表示永不过期。
返回:None
使用示例
client.store.put_item(
["documents", "user123"],
key="item456",
value={"title": "My Document", "content": "Hello World"}
)
get_item(namespace: Sequence[str], /, key: str, *, refresh_ttl: Optional[bool] = None) -> Item
¶
检索单个条目。
参数
-
key
(
) –str 条目的唯一标识符。
-
namespace
(
) –Sequence [str ]表示命名空间路径的可选字符串列表。
-
refresh_ttl
(
, 默认值:Optional [bool ]None
) –是否在此读取操作时刷新 TTL。如果为 None,则使用存储的默认行为。
返回
-
Item
(
) –Item 检索到的条目。
使用示例
item = client.store.get_item(
["documents", "user123"],
key="item456",
)
print(item)
----------------------------------------------------------------
{
'namespace': ['documents', 'user123'],
'key': 'item456',
'value': {'title': 'My Document', 'content': 'Hello World'},
'created_at': '2024-07-30T12:00:00Z',
'updated_at': '2024-07-30T12:00:00Z'
}
delete_item(namespace: Sequence[str], /, key: str) -> None
¶
删除一个条目。
参数
-
key
(
) –str 条目的唯一标识符。
-
namespace
(
) –Sequence [str ]表示命名空间路径的可选字符串列表。
返回
-
None
–无
使用示例
client.store.delete_item(
["documents", "user123"],
key="item456",
)
search_items(namespace_prefix: Sequence[str], /, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0, query: Optional[str] = None, refresh_ttl: Optional[bool] = None) -> SearchItemsResponse
¶
在命名空间前缀中搜索条目。
参数
-
namespace_prefix
(
) –Sequence [str ]表示命名空间前缀的字符串列表。
-
filter
(
, 默认值:Optional [dict [str ,Any ]]None
) –用于过滤结果的可选键值对字典。
-
limit
(
, default:int 10
) –要返回的最大条目数(默认为 10)。
-
offset
(
, default:int 0
) –在返回结果之前要跳过的条目数(默认为 0)。
-
query
(
, 默认值:Optional [str ]None
) –用于自然语言搜索的可选查询。
-
refresh_ttl
(
, 默认值:Optional [bool ]None
) –是否刷新此搜索返回的条目的 TTL。如果为 None,则使用存储的默认行为。
返回
-
–SearchItemsResponse List[Item]: 与搜索条件匹配的条目列表。
使用示例
items = client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces(prefix: Optional[List[str]] = None, suffix: Optional[List[str]] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> ListNamespaceResponse
¶
列出命名空间,带有可选的匹配条件。
参数
-
prefix
(
, 默认值:Optional [List [str ]]None
) –表示要过滤命名空间的前缀的可选字符串列表。
-
suffix
(
, 默认值:Optional [List [str ]]None
) –表示要过滤命名空间的后缀的可选字符串列表。
-
max_depth
(
, 默认值:Optional [int ]None
) –指定要返回的命名空间最大深度的可选整数。
-
limit
(
, 默认值:int 100
) –要返回的最大命名空间数(默认为 100)。
-
offset
(
, default:int 0
) –在返回结果之前要跳过的命名空间数(默认为 0)。
返回
-
–ListNamespaceResponse List[List[str]]: 与条件匹配的命名空间列表。
使用示例
namespaces = client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
get_headers(api_key: Optional[str], custom_headers: Optional[dict[str, str]]) -> dict[str, str]
¶
组合 api_key 和用户提供的自定义标头。
get_client(*, url: Optional[str] = None, api_key: Optional[str] = None, headers: Optional[dict[str, str]] = None) -> LangGraphClient
¶
获取 LangGraphClient 实例。
参数
-
url
(
, 默认:Optional [str ]None
) –LangGraph API 的 URL。
-
api_key
(
, 默认:Optional [str ]None
) –API 密钥。如果未提供,将从环境中读取。优先级:1. 显式参数 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY
-
headers
(
, 默认:Optional [dict [str ,str ]]None
) –可选的自定义标头
返回
-
LangGraphClient
(
) –LangGraphClient 用于访问 AssistantsClient、ThreadsClient、RunsClient 和 CronClient 的顶级客户端。
-
–LangGraphClient ThreadsClient、RunsClient 和 CronClient。
示例
from langgraph_sdk import get_client
# get top-level LangGraphClient
client = get_client(url="http://localhost:8123")
# example usage: client.<model>.<method_name>()
assistants = await client.assistants.get(assistant_id="some_uuid")
get_sync_client(*, url: Optional[str] = None, api_key: Optional[str] = None, headers: Optional[dict[str, str]] = None) -> SyncLangGraphClient
¶
获取同步 LangGraphClient 实例。
参数
-
url
(
, 默认:Optional [str ]None
) –LangGraph API 的 URL。
-
api_key
(
, 默认:Optional [str ]None
) –API 密钥。如果未提供,将从环境中读取。优先级:1. 显式参数 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY
-
headers
(
, 默认:Optional [dict [str ,str ]]None
) –可选的自定义标头
返回: SyncLangGraphClient: 用于访问 AssistantsClient、ThreadsClient、RunsClient 和 CronClient 的顶层同步客户端。
示例
from langgraph_sdk import get_sync_client
# get top-level synchronous LangGraphClient
client = get_sync_client(url="http://localhost:8123")
# example usage: client.<model>.<method_name>()
assistant = client.assistants.get(assistant_id="some_uuid")
用于与 LangGraph API 交互的数据模型。
Json = Optional[dict[str, Any]]
module-attribute
¶
表示类似 JSON 的结构,可以是 None 或包含字符串键和任意值的字典。
RunStatus = Literal['pending', 'error', 'success', 'timeout', 'interrupted']
module-attribute
¶
表示运行状态: - "pending":运行正在等待开始。 - "error":运行遇到错误并停止。 - "success":运行成功完成。 - "timeout":运行超过了时间限制。 - "interrupted":运行被手动停止或中断。
ThreadStatus = Literal['idle', 'busy', 'interrupted', 'error']
module-attribute
¶
表示线程状态: - "idle":线程当前未处理任何任务。 - "busy":线程正在积极处理任务。 - "interrupted":线程的执行被中断。 - "error":任务处理期间发生异常。
StreamMode = Literal['values', 'messages', 'updates', 'events', 'debug', 'custom', 'messages-tuple']
module-attribute
¶
定义流式传输模式: - "values":仅流式传输值。 - "messages":流式传输完整消息。 - "updates":流式传输状态更新。 - "events":流式传输执行期间发生的事件。 - "debug":流式传输详细的调试信息。 - "custom":流式传输自定义事件。
DisconnectMode = Literal['cancel', 'continue']
module-attribute
¶
指定断开连接时的行为: - "cancel":断开连接时取消操作。 - "continue":即使断开连接也继续操作。
MultitaskStrategy = Literal['reject', 'interrupt', 'rollback', 'enqueue']
module-attribute
¶
定义如何处理多任务: - "reject":繁忙时拒绝新任务。 - "interrupt":中断当前任务以执行新任务。 - "rollback":回滚当前任务并开始新任务。 - "enqueue":将新任务排队以供稍后执行。
OnConflictBehavior = Literal['raise', 'do_nothing']
module-attribute
¶
指定冲突时的行为: - "raise":发生冲突时引发异常。 - "do_nothing":忽略冲突并继续。
OnCompletionBehavior = Literal['delete', 'keep']
module-attribute
¶
定义完成后的操作: - "delete":完成后删除资源。 - "keep":完成后保留资源。
All = Literal['*']
module-attribute
¶
表示通配符或“全部”选择器。
IfNotExists = Literal['create', 'reject']
module-attribute
¶
指定线程不存在时的行为: - "create":如果线程不存在,则创建一个新线程。 - "reject":如果线程不存在,则拒绝操作。
CancelAction = Literal['interrupt', 'rollback']
module-attribute
¶
取消运行时的操作。 - "interrupt":简单地取消运行。 - "rollback":取消运行。然后删除运行和关联的检查点。
Config
¶
基类:
调用的配置选项。
tags: list[str]
instance-attribute
¶
此调用和任何子调用的标签(例如,Chain 调用 LLM)。您可以使用这些标签来筛选调用。
recursion_limit: int
instance-attribute
¶
调用可以递归的最大次数。如果未提供,则默认为 25。
configurable: dict[str, Any]
instance-attribute
¶
先前通过 .configurable_fields() 或 .configurable_alternatives() 在此 Runnable 或子 Runnables 上配置的属性的运行时值。查看 .output_schema() 以获取已配置属性的描述。
Checkpoint
¶
GraphSchema
¶
基类:
定义图的结构和属性。
graph_id: str
instance-attribute
¶
图的 ID。
input_schema: Optional[dict]
instance-attribute
¶
图输入的模式。如果无法从图中生成 JSON 模式,则缺失。
output_schema: Optional[dict]
instance-attribute
¶
图输出的模式。如果无法从图中生成 JSON 模式,则缺失。
state_schema: Optional[dict]
instance-attribute
¶
图状态的模式。如果无法从图中生成 JSON 模式,则缺失。
config_schema: Optional[dict]
instance-attribute
¶
图配置的模式。如果无法从图中生成 JSON 模式,则缺失。
AssistantBase
¶
基类:
助手的基本模型。
assistant_id: str
instance-attribute
¶
助手的 ID。
graph_id: str
instance-attribute
¶
图的 ID。
config: Config
instance-attribute
¶
助手配置。
created_at: datetime
instance-attribute
¶
助手创建的时间。
metadata: Json
instance-attribute
¶
助手元数据。
version: int
instance-attribute
¶
助手的版本。
name: str
instance-attribute
¶
助手的名称。
AssistantVersion
¶
基类:
表示助手的特定版本。
assistant_id: str
instance-attribute
¶
助手的 ID。
graph_id: str
instance-attribute
¶
图的 ID。
config: Config
instance-attribute
¶
助手配置。
created_at: datetime
instance-attribute
¶
助手创建的时间。
metadata: Json
instance-attribute
¶
助手元数据。
version: int
instance-attribute
¶
助手的版本。
name: str
instance-attribute
¶
助手的名称。
Assistant
¶
基类:
表示具有附加属性的助手。
assistant_id: str
instance-attribute
¶
助手的 ID。
graph_id: str
instance-attribute
¶
图的 ID。
config: Config
instance-attribute
¶
助手配置。
created_at: datetime
instance-attribute
¶
助手创建的时间。
metadata: Json
instance-attribute
¶
助手元数据。
version: int
instance-attribute
¶
助手的版本。
name: str
instance-attribute
¶
助手的名称。
updated_at: datetime
instance-attribute
¶
助手最后更新的时间。
Interrupt
¶
Thread
¶
基类:
表示对话线程。
thread_id: str
instance-attribute
¶
线程的 ID。
created_at: datetime
instance-attribute
¶
线程创建的时间。
updated_at: datetime
instance-attribute
¶
线程最后更新的时间。
metadata: Json
instance-attribute
¶
线程元数据。
status: ThreadStatus
instance-attribute
¶
线程的状态,为 'idle'、'busy'、'interrupted' 之一。
values: Json
instance-attribute
¶
线程的当前状态。
interrupts: Dict[str, list[Interrupt]]
instance-attribute
¶
在此线程中抛出的中断。
ThreadTask
¶
基类:
表示线程内的任务。
ThreadState
¶
基类:
表示线程的状态。
values: Union[list[dict], dict[str, Any]]
instance-attribute
¶
状态值。
next: Sequence[str]
instance-attribute
¶
要执行的下一个节点。如果为空,则线程已完成,直到收到新的输入。
checkpoint: Checkpoint
instance-attribute
¶
检查点的 ID。
metadata: Json
instance-attribute
¶
此状态的元数据
created_at: Optional[str]
instance-attribute
¶
状态创建的时间戳
parent_checkpoint: Optional[Checkpoint]
instance-attribute
¶
父检查点的 ID。如果缺失,则为根检查点。
tasks: Sequence[ThreadTask]
instance-attribute
¶
此步骤要执行的任务。如果已经尝试过,可能包含错误。
ThreadUpdateStateResponse
¶
Run
¶
基类:
表示单个执行运行。
run_id: str
instance-attribute
¶
运行的 ID。
thread_id: str
instance-attribute
¶
线程的 ID。
assistant_id: str
instance-attribute
¶
用于此运行的助手。
created_at: datetime
instance-attribute
¶
运行创建的时间。
updated_at: datetime
instance-attribute
¶
上次更新运行的时间。
status: RunStatus
instance-attribute
¶
运行的状态。为 'pending'、'running'、"error"、'success'、"timeout"、"interrupted" 之一。
metadata: Json
instance-attribute
¶
运行元数据。
multitask_strategy: MultitaskStrategy
instance-attribute
¶
处理同一线程上并发运行的策略。
Cron
¶
基类:
表示计划任务。
cron_id: str
instance-attribute
¶
Cron 的 ID。
thread_id: Optional[str]
instance-attribute
¶
线程的 ID。
end_time: Optional[datetime]
instance-attribute
¶
停止运行 cron 的结束日期。
schedule: str
instance-attribute
¶
要运行的计划,cron 格式。
created_at: datetime
instance-attribute
¶
Cron 创建的时间。
updated_at: datetime
instance-attribute
¶
上次更新 cron 的时间。
payload: dict
instance-attribute
¶
用于创建新运行的运行有效负载。
RunCreate
¶
基类:
定义启动后台运行的参数。
thread_id: Optional[str]
instance-attribute
¶
要运行的线程的标识符。如果未提供,则运行是无状态的。
assistant_id: str
instance-attribute
¶
用于此运行的助手的标识符。
input: Optional[dict]
instance-attribute
¶
运行的初始输入数据。
metadata: Optional[dict]
instance-attribute
¶
与运行关联的其他元数据。
config: Optional[Config]
instance-attribute
¶
运行的配置选项。
checkpoint_id: Optional[str]
instance-attribute
¶
要从其恢复的检查点的标识符。
interrupt_before: Optional[list[str]]
instance-attribute
¶
要在执行之前中断的节点名称列表。
interrupt_after: Optional[list[str]]
instance-attribute
¶
要在执行之后中断的节点名称列表。
webhook: Optional[str]
instance-attribute
¶
用于发送关于运行进度的 webhook 通知的 URL。
multitask_strategy: Optional[MultitaskStrategy]
instance-attribute
¶
用于处理同一线程上并发运行的策略。
Item
¶
基类:
表示图中 Store 中的单个文档或数据条目。
Item 用于存储跨线程的记忆。
namespace: list[str]
instance-attribute
¶
Item 的命名空间。命名空间类似于文档的目录。
key: str
instance-attribute
¶
Item 在其命名空间内的唯一标识符。
通常,键不需要是全局唯一的。
value: dict[str, Any]
instance-attribute
¶
存储在 item 中的值。这是文档本身。
created_at: datetime
instance-attribute
¶
Item 创建时的时间戳。
updated_at: datetime
instance-attribute
¶
Item 上次更新时的时间戳。
ListNamespaceResponse
¶
基类:
用于列出命名空间的响应结构。
namespaces: list[list[str]]
instance-attribute
¶
命名空间路径的列表,其中每个路径都是字符串列表。
SearchItem
¶
基类:
带有来自搜索操作的可选相关性分数的 Item。
属性
-
(score
) –Optional [float ]相关性/相似度分数。当使用自然语言查询搜索兼容的存储时包含。
namespace: list[str]
instance-attribute
¶
Item 的命名空间。命名空间类似于文档的目录。
key: str
instance-attribute
¶
Item 在其命名空间内的唯一标识符。
通常,键不需要是全局唯一的。
value: dict[str, Any]
instance-attribute
¶
存储在 item 中的值。这是文档本身。
created_at: datetime
instance-attribute
¶
Item 创建时的时间戳。
updated_at: datetime
instance-attribute
¶
Item 上次更新时的时间戳。
SearchItemsResponse
¶
StreamPart
¶
Send
¶
Command
¶
基类:
表示控制图执行流程和状态的一个或多个命令。
此类型定义了可由节点返回以影响图执行的控制命令。它允许您导航到其他节点、更新图状态以及从中断处恢复。
goto: Union[Send, str, Sequence[Union[Send, str]]]
instance-attribute
¶
指定执行应继续的位置。可以是
- 要导航到的字符串节点名称
- 一个 Send 对象,用于使用特定输入执行节点
- 要按顺序执行的节点名称或 Send 对象序列
update: Union[dict[str, Any], Sequence[Tuple[str, Any]]]
instance-attribute
¶
要应用于图状态的更新。可以是
- 要合并的状态更新字典
- 用于有序更新的 (键, 值) 元组序列
resume: Any
instance-attribute
¶
在中断后恢复执行的值。与 interrupt() 结合使用以实现控制流。
Auth
¶
为您的 LangGraph 应用程序添加自定义身份验证和授权管理。
Auth 类为 LangGraph 应用程序中处理身份验证和授权提供了一个统一的系统。它支持自定义用户身份验证协议和针对不同资源和操作的细粒度授权规则。
要使用,请创建一个单独的 python 文件,并将该文件的路径添加到您的 LangGraph API 配置文件 (langgraph.json
)。在该文件中,创建 Auth 类的实例,并根据需要注册身份验证和授权处理程序。
示例 langgraph.json
文件
{
"dependencies": ["."],
"graphs": {
"agent": "./my_agent/agent.py:graph"
},
"env": ".env",
"auth": {
"path": "./auth.py:my_auth"
}
然后,LangGraph 服务器将加载您的 auth 文件,并在每次收到请求时在服务器端运行它。
基本用法
from langgraph_sdk import Auth
my_auth = Auth()
async def verify_token(token: str) -> str:
# Verify token and return user_id
# This would typically be a call to your auth server
return "user_id"
@auth.authenticate
async def authenticate(authorization: str) -> str:
# Verify token and return user_id
result = await verify_token(authorization)
if result != "user_id":
raise Auth.exceptions.HTTPException(
status_code=401, detail="Unauthorized"
)
return result
# Global fallback handler
@auth.on
async def authorize_default(params: Auth.on.value):
return False # Reject all requests (default behavior)
@auth.on.threads.create
async def authorize_thread_create(params: Auth.on.threads.create.value):
# Allow the allowed user to create a thread
assert params.get("metadata", {}).get("owner") == "allowed_user"
@auth.on.store
async def authorize_store(ctx: Auth.types.AuthContext, value: Auth.types.on):
assert ctx.user.identity in value["namespace"], "Not authorized"
请求处理流程
- 身份验证(您的
@auth.authenticate
处理程序)首先在**每个请求**上执行 - 对于授权,将调用最具体的匹配处理程序
- 如果存在针对确切资源和操作的处理程序,则使用它(例如,
@auth.on.threads.create
) - 否则,如果存在针对资源的任何操作的处理程序,则使用它(例如,
@auth.on.threads
) - 最后,如果没有特定的处理程序匹配,则使用全局处理程序(例如,
@auth.on
) - 如果未设置全局处理程序,则请求被接受
- 如果存在针对确切资源和操作的处理程序,则使用它(例如,
这允许您使用全局处理程序设置默认行为,同时根据需要覆盖特定路由。
types = types
class-attribute
instance-attribute
¶
对身份验证类型定义的引用。
提供对身份验证系统中使用的所有类型定义的访问,例如 ThreadsCreate、AssistantsRead 等。
exceptions = exceptions
class-attribute
instance-attribute
¶
对身份验证异常定义的引用。
提供对身份验证系统中使用的所有异常定义的访问,例如 HTTPException 等。
on = _On(self)
instance-attribute
¶
用于控制对特定资源访问的授权处理程序的入口点。
on 类提供了一种灵活的方式来为您的应用程序中的不同资源和操作定义授权规则。它支持三种主要使用模式
- 为所有资源和操作运行的全局处理程序
- 为资源上的所有操作运行的特定于资源的处理程序
- 用于细粒度控制的特定于资源和操作的处理程序
每个处理程序都必须是一个异步函数,它接受两个参数
- ctx (AuthContext): 包含请求上下文和已验证用户信息的上下文
- value: 正在授权的数据(类型因端点而异)
处理程序应返回以下之一
- None or True: Accept the request
- False: Reject with 403 error
- FilterType: Apply filtering rules to the response
示例
所有请求的全局处理程序
@auth.on
async def reject_unhandled_requests(ctx: AuthContext, value: Any) -> None:
print(f"Request to {ctx.path} by {ctx.user.identity}")
return False
特定于资源的处理程序。对于 threads
资源上的所有操作,这将优先于全局处理程序
@auth.on.threads
async def check_thread_access(ctx: AuthContext, value: Any) -> bool:
# Allow access only to threads created by the user
return value.get("created_by") == ctx.user.identity
特定于资源和操作的处理程序
@auth.on.threads.delete
async def prevent_thread_deletion(ctx: AuthContext, value: Any) -> bool:
# Only admins can delete threads
return "admin" in ctx.user.permissions
多个资源或操作
@auth.on(resources=["threads", "runs"], actions=["create", "update"])
async def rate_limit_writes(ctx: AuthContext, value: Any) -> bool:
# Implement rate limiting for write operations
return await check_rate_limit(ctx.user.identity)
store
资源的 Auth 有些不同,因为它的结构是开发者定义的。您通常希望在命名空间中强制执行用户凭据。Y
authenticate(fn: AH) -> AH
¶
注册身份验证处理程序函数。
身份验证处理程序负责验证凭据并返回用户范围。它可以按名称接受以下任何参数
- request (Request): The raw ASGI request object
- body (dict): The parsed request body
- path (str): The request path, e.g., "/threads/abcd-1234-abcd-1234/runs/abcd-1234-abcd-1234/stream"
- method (str): The HTTP method, e.g., "GET"
- path_params (dict[str, str]): URL path parameters, e.g., {"thread_id": "abcd-1234-abcd-1234", "run_id": "abcd-1234-abcd-1234"}
- query_params (dict[str, str]): URL query parameters, e.g., {"stream": "true"}
- headers (dict[bytes, bytes]): Request headers
- authorization (str | None): The Authorization header value (e.g., "Bearer <token>")
参数
-
fn
(
) –Callable 要注册的身份验证处理程序函数。必须返回用户的表示形式。这可以是: - 字符串(用户 ID) - 包含 {"identity": str, "permissions": list[str]} 的字典 - 或具有 identity 和 permissions 属性的对象。权限可以由您的下游处理程序可选地使用。
返回
-
–AH 注册的处理程序函数。
引发
-
–ValueError 如果已注册身份验证处理程序。
示例
基本令牌身份验证
@auth.authenticate
async def authenticate(authorization: str) -> str:
user_id = verify_token(authorization)
return user_id
接受完整的请求上下文
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> str:
user = await verify_request(method, path, headers)
return user
返回用户名和权限
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
permissions, user = await verify_request(method, path, headers)
# Permissions could be things like ["runs:read", "runs:write", "threads:read", "threads:write"]
return {
"identity": user["id"],
"permissions": permissions,
"display_name": user["name"],
}
LangGraph 的身份验证和授权类型。
此模块定义了 LangGraph 中用于身份验证、授权和请求处理的核心类型。它包括用户协议、身份验证上下文以及各种 API 操作的类型化字典。
注意
所有 typing.TypedDict 类都使用 total=False,以默认使所有字段 typing.Optional。
RunStatus = typing.Literal['pending', 'error', 'success', 'timeout', 'interrupted']
module-attribute
¶
运行执行的状态。
值
- pending: 运行已排队或正在进行中
- error: 运行失败并出现错误
- success: 运行成功完成
- timeout: 运行超出时间限制
- interrupted: 运行被手动中断
MultitaskStrategy = typing.Literal['reject', 'rollback', 'interrupt', 'enqueue']
module-attribute
¶
处理多个并发任务的策略。
值
- reject: 当一个任务正在进行时,拒绝新任务
- rollback: 取消当前任务并开始新任务
- interrupt: 中断当前任务并开始新任务
- enqueue: 将新任务排队,在当前任务完成后运行
OnConflictBehavior = typing.Literal['raise', 'do_nothing']
module-attribute
¶
遇到冲突时的行为。
值
- raise: 当冲突发生时,抛出异常
- do_nothing: 静默地忽略冲突
IfNotExists = typing.Literal['create', 'reject']
module-attribute
¶
当实体不存在时的行为。
值
- create: 创建实体
- reject: 拒绝操作
FilterType = typing.Union[typing.Dict[str, typing.Union[str, typing.Dict[typing.Literal['$eq', '$contains'], str]]], typing.Dict[str, str]]
module-attribute
¶
授权处理程序的响应类型。
支持精确匹配和运算符
- 精确匹配简写: {"field": "value"}
- 精确匹配: {"field": {"$eq": "value"}}
- 包含: {"field": {"$contains": "value"}}
ThreadStatus = typing.Literal['idle', 'busy', 'interrupted', 'error']
module-attribute
¶
线程的状态。
值
- idle: 线程可用于工作
- busy: 线程当前正在处理
- interrupted: 线程被中断
- error: 线程遇到错误
MetadataInput = typing.Dict[str, typing.Any]
module-attribute
¶
HandlerResult = typing.Union[None, bool, FilterType]
module-attribute
¶
处理程序的结果可以是: * None | True: 接受请求。 * False: 拒绝请求并返回 403 错误 * FilterType: 要应用的过滤器
Authenticator = Callable[..., Awaitable[typing.Union[MinimalUser, str, BaseUser, MinimalUserDict, typing.Mapping[str, typing.Any]],]]
module-attribute
¶
身份验证函数的类型。
身份验证器可以返回以下任一项: 1. 字符串 (user_id) 2. 包含 {"identity": str, "permissions": list[str]} 的字典 3. 具有 identity 和 permissions 属性的对象
权限可以被下游的授权逻辑使用,以确定对不同资源的访问权限。
如果以下任何参数包含在您的函数签名中,则身份验证装饰器将自动按名称注入它们
参数
-
request
(
) –Request 原始 ASGI 请求对象
-
body
(
) –dict 已解析的请求体
-
path
(
) –str 请求路径
-
method
(
) –str HTTP 方法 (GET, POST 等)
-
path_params
(
) –dict [str ,str ] | NoneURL 路径参数
-
query_params
(
) –dict [str ,str ] | NoneURL 查询参数
-
headers
(
) –dict [str ,bytes ] | None请求头
-
authorization
(
) –str | NoneAuthorization 标头值(例如 "Bearer"
")
示例
使用令牌的基本身份验证
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate1(authorization: str) -> Auth.types.MinimalUserDict:
return await get_user(authorization)
使用多个参数进行身份验证
@auth.authenticate
async def authenticate2(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
# Custom auth logic using method, path and headers
user = verify_request(method, path, headers)
return user
接受原始 ASGI 请求
MY_SECRET = "my-secret-key"
@auth.authenticate
async def get_current_user(request: Request) -> Auth.types.MinimalUserDict:
try:
token = (request.headers.get("authorization") or "").split(" ", 1)[1]
payload = jwt.decode(token, MY_SECRET, algorithms=["HS256"])
except (IndexError, InvalidTokenError):
raise HTTPException(
status_code=401,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.myauth-provider.com/auth/v1/user",
headers={"Authorization": f"Bearer {MY_SECRET}"}
)
if response.status_code != 200:
raise HTTPException(status_code=401, detail="User not found")
user_data = response.json()
return {
"identity": user_data["id"],
"display_name": user_data.get("name"),
"permissions": user_data.get("permissions", []),
"is_authenticated": True,
}
MinimalUser
¶
基类:
用户对象必须至少公开 identity 属性。
identity: str
property
¶
用户的唯一标识符。
这可以是用户名、电子邮件或用于区分系统中不同用户的任何其他唯一标识符。
MinimalUserDict
¶
基类:
用户的字典表示形式。
identity: typing_extensions.Required[str]
instance-attribute
¶
用户必需的唯一标识符。
display_name: str
instance-attribute
¶
用户的 typing.Optional 显示名称。
is_authenticated: bool
instance-attribute
¶
用户是否已通过身份验证。默认为 True。
permissions: Sequence[str]
instance-attribute
¶
与用户关联的权限列表。
您可以在 `@auth.on` 授权逻辑中使用这些权限,以确定对不同资源的访问权限。
BaseUser
¶
StudioUser
¶
从 LangGraph studio 的已验证请求填充的用户对象。
注意:可以在 `langgraph.json` 配置中禁用 Studio 身份验证。
您可以在授权处理程序 (`@auth.on`) 中使用 `isinstance` 检查,以专门控制开发人员从 LangGraph Studio UI 访问实例的权限。
示例
BaseAuthContext
¶
AuthContext
¶
基类:
包含资源和操作信息的完整身份验证上下文。
使用正在访问的特定资源和操作扩展 BaseAuthContext,从而实现细粒度的访问控制决策。
permissions: Sequence[str]
instance-attribute
¶
授予已验证用户的权限。
user: BaseUser
instance-attribute
¶
已验证的用户。
resource: typing.Literal['runs', 'threads', 'crons', 'assistants', 'store']
instance-attribute
¶
正在访问的资源。
action: typing.Literal['create', 'read', 'update', 'delete', 'search', 'create_run', 'put', 'get', 'list_namespaces']
instance-attribute
¶
正在对资源执行的操作。
大多数资源支持以下操作: - create: 创建新资源 - read: 读取有关资源的信息 - update: 更新现有资源 - delete: 删除资源 - search: 搜索资源
存储支持以下操作: - put: 在存储中添加或更新文档 - get: 从存储中获取文档 - list_namespaces: 列出存储中的命名空间
ThreadsCreate
¶
基类:
创建新线程的参数。
示例
ThreadsRead
¶
ThreadsUpdate
¶
ThreadsDelete
¶
ThreadsSearch
¶
基类:
搜索线程的参数。
用于搜索线程或运行。
metadata: MetadataInput
instance-attribute
¶
要过滤的 typing.Optional 元数据。
values: MetadataInput
instance-attribute
¶
要过滤的 typing.Optional 值。
status: typing.Optional[ThreadStatus]
instance-attribute
¶
要过滤的 typing.Optional 状态。
limit: int
instance-attribute
¶
要返回的最大结果数。
offset: int
instance-attribute
¶
分页的偏移量。
thread_id: typing.Optional[UUID]
instance-attribute
¶
要过滤的 typing.Optional 线程 ID。
RunsCreate
¶
基类:
创建运行的有效负载。
示例
create_params = {
"assistant_id": UUID("123e4567-e89b-12d3-a456-426614174000"),
"thread_id": UUID("123e4567-e89b-12d3-a456-426614174001"),
"run_id": UUID("123e4567-e89b-12d3-a456-426614174002"),
"status": "pending",
"metadata": {"owner": "user123"},
"prevent_insert_if_inflight": True,
"multitask_strategy": "reject",
"if_not_exists": "create",
"after_seconds": 10,
"kwargs": {"key": "value"},
"action": "interrupt"
}
assistant_id: typing.Optional[UUID]
instance-attribute
¶
用于此运行的 typing.Optional 助手 ID。
thread_id: typing.Optional[UUID]
instance-attribute
¶
用于此运行的 typing.Optional 线程 ID。
run_id: typing.Optional[UUID]
instance-attribute
¶
用于此运行的 typing.Optional 运行 ID。
status: typing.Optional[RunStatus]
instance-attribute
¶
此运行的 typing.Optional 状态。
metadata: MetadataInput
instance-attribute
¶
运行的 typing.Optional 元数据。
prevent_insert_if_inflight: bool
instance-attribute
¶
如果已有运行正在进行中,则阻止插入新运行。
multitask_strategy: MultitaskStrategy
instance-attribute
¶
此运行的多任务策略。
if_not_exists: IfNotExists
instance-attribute
¶
此运行的 IfNotExists。
after_seconds: int
instance-attribute
¶
创建运行前等待的秒数。
kwargs: typing.Dict[str, typing.Any]
instance-attribute
¶
传递给运行的关键字参数。
action: typing.Optional[typing.Literal['interrupt', 'rollback']]
instance-attribute
¶
如果更新现有运行,则采取的操作。
AssistantsCreate
¶
基类:
创建助手的有效负载。
示例
assistant_id: UUID
instance-attribute
¶
助手的唯一标识符。
graph_id: str
instance-attribute
¶
用于此助手的 Graph ID。
config: typing.Optional[typing.Union[typing.Dict[str, typing.Any], typing.Any]]
instance-attribute
¶
助手的 typing.Optional 配置。
metadata: MetadataInput
instance-attribute
¶
附加到助手的 typing.Optional 元数据。
if_exists: OnConflictBehavior
instance-attribute
¶
当已存在具有相同 ID 的助手时的行为。
name: str
instance-attribute
¶
助手的名称。
AssistantsRead
¶
AssistantsUpdate
¶
基类:
更新助手的有效负载。
示例
assistant_id: UUID
instance-attribute
¶
助手的唯一标识符。
graph_id: typing.Optional[str]
instance-attribute
¶
要更新的 typing.Optional Graph ID。
config: typing.Optional[typing.Union[typing.Dict[str, typing.Any], typing.Any]]
instance-attribute
¶
要更新的 typing.Optional 配置。
metadata: MetadataInput
instance-attribute
¶
要更新的 typing.Optional 元数据。
name: typing.Optional[str]
instance-attribute
¶
typing.Optional 要更新的名称。
version: typing.Optional[int]
instance-attribute
¶
typing.Optional 要更新的版本。
AssistantsSearch
¶
基类:
用于搜索助手的载荷。
示例
CronsCreate
¶
基类:
用于创建 cron 任务的载荷。
示例
payload: typing.Dict[str, typing.Any]
instance-attribute
¶
cron 任务的载荷。
schedule: str
instance-attribute
¶
cron 任务的计划。
cron_id: typing.Optional[UUID]
instance-attribute
¶
typing.Optional cron 任务的唯一标识符。
thread_id: typing.Optional[UUID]
instance-attribute
¶
typing.Optional 用于此 cron 任务的线程 ID。
user_id: typing.Optional[str]
instance-attribute
¶
typing.Optional 用于此 cron 任务的用户 ID。
end_time: typing.Optional[datetime]
instance-attribute
¶
typing.Optional cron 任务的结束时间。
CronsUpdate
¶
基类:
用于更新 cron 任务的载荷。
示例
CronsSearch
¶
基类:
用于搜索 cron 任务的载荷。
示例
StoreGet
¶
StoreSearch
¶
基类:
在指定的命名空间层级结构中搜索项目的操作。
namespace: tuple[str, ...]
instance-attribute
¶
用于定义搜索范围的前缀过滤器。
filter: typing.Optional[dict[str, typing.Any]]
instance-attribute
¶
用于根据精确匹配或比较运算符筛选结果的键值对。
limit: int
instance-attribute
¶
搜索结果中返回的最大项目数。
offset: int
instance-attribute
¶
分页要跳过的匹配项目数。
query: typing.Optional[str]
instance-attribute
¶
用于语义搜索功能的 Naturalj 语言搜索查询。
StoreListNamespaces
¶
基类:
列出和筛选存储中命名空间的操作。
namespace: typing.Optional[tuple[str, ...]]
instance-attribute
¶
前缀筛选命名空间。
suffix: typing.Optional[tuple[str, ...]]
instance-attribute
¶
用于筛选命名空间的可选条件。
max_depth: typing.Optional[int]
instance-attribute
¶
要返回的命名空间层级结构的最大深度。
注意
深度超过此级别的命名空间将被截断。
limit: int
instance-attribute
¶
要返回的最大命名空间数。
offset: int
instance-attribute
¶
分页要跳过的命名空间数。
StorePut
¶
基类:
在存储中存储、更新或删除项目的操作。
namespace: tuple[str, ...]
instance-attribute
¶
标识项目位置的层级路径。
key: str
instance-attribute
¶
项目在其命名空间内的唯一标识符。
value: typing.Optional[dict[str, typing.Any]]
instance-attribute
¶
要存储的数据,或者 None 以标记项目为删除。
index: typing.Optional[typing.Union[typing.Literal[False], list[str]]]
instance-attribute
¶
用于全文搜索的可选索引配置。
StoreDelete
¶
on
¶
不同 API 操作的类型定义的命名空间。
此类组织了跨不同资源(线程、助手、cron 任务)的创建、读取、更新、删除和搜索操作的类型定义。
用法
from langgraph_sdk import Auth
auth = Auth()
@auth.on
def handle_all(params: Auth.on.value):
raise Exception("Not authorized")
@auth.on.threads.create
def handle_thread_create(params: Auth.on.threads.create.value):
# Handle thread creation
pass
@auth.on.assistants.search
def handle_assistant_search(params: Auth.on.assistants.search.value):
# Handle assistant search
pass
auth 系统中使用的异常。
HTTPException
¶
基类:
您可以引发的 HTTP 异常,以返回特定的 HTTP 错误响应。
由于这是在 auth 模块中定义的,因此我们默认使用 401 状态代码。
参数
-
status_code
(
, 默认值:int 401
) –错误的 HTTP 状态代码。默认为 401 “未授权”。
-
detail
(
, 默认值:str | NoneNone
) –详细的错误消息。 如果为 None,则使用基于状态代码的默认消息。
-
headers
(
, 默认值:Mapping [str ,str ] | NoneNone
) –要包含在错误响应中的其他 HTTP 标头。
示例
默认
添加标头
raise HTTPException(headers={"X-Custom-Header": "Custom Value"})
# HTTPException(status_code=401, detail='Unauthorized', headers={"WWW-Authenticate": "Bearer"})
自定义错误