配置
get_store() -> BaseStore
¶
在运行时从图节点或入口点任务内部访问 LangGraph 存储。
只要 StateGraph 或入口点使用存储进行初始化,就可以从任何 StateGraph 节点或函数式 API 任务内部调用,例如:
# with StateGraph
graph = (
StateGraph(...)
...
.compile(store=store)
)
# or with entrypoint
@entrypoint(store=store)
def workflow(inputs):
...
Python 3.11 之前的异步
如果您使用的是 Python 3.11 之前的版本并且正在异步运行 LangGraph,则 get_store()
将无法工作,因为它使用 contextvar 传播(仅在 Python >= 3.11 中可用)。
与 StateGraph 一起使用
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})
class State(TypedDict):
foo: int
def my_node(state: State):
my_store = get_store()
stored_value = my_store.get(("values",), "foo").value["bar"]
return {"foo": stored_value + 1}
graph = (
StateGraph(State)
.add_node(my_node)
.add_edge(START, "my_node")
.compile(store=store)
)
graph.invoke({"foo": 1})
与函数式 API 一起使用
from langgraph.func import entrypoint, task
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})
@task
def my_task(value: int):
my_store = get_store()
stored_value = my_store.get(("values",), "foo").value["bar"]
return stored_value + 1
@entrypoint(store=store)
def workflow(value: int):
return my_task(value).result()
workflow.invoke(1)
get_stream_writer() -> StreamWriter
¶
在运行时从图节点或入口点任务内部访问 LangGraph StreamWriter。
可以从任何 StateGraph 节点或函数式 API 任务内部调用。
Python 3.11 之前的异步
如果您使用的是 Python 3.11 之前的版本并且正在异步运行 LangGraph,则 get_stream_writer()
将无法工作,因为它使用 contextvar 传播(仅在 Python >= 3.11 中可用)。
与 StateGraph 一起使用
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.config import get_stream_writer
class State(TypedDict):
foo: int
def my_node(state: State):
my_stream_writer = get_stream_writer()
my_stream_writer({"custom_data": "Hello!"})
return {"foo": state["foo"] + 1}
graph = (
StateGraph(State)
.add_node(my_node)
.add_edge(START, "my_node")
.compile(store=store)
)
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
print(chunk)
与函数式 API 一起使用
from langgraph.func import entrypoint, task
from langgraph.config import get_stream_writer
@task
def my_task(value: int):
my_stream_writer = get_stream_writer()
my_stream_writer({"custom_data": "Hello!"})
return value + 1
@entrypoint(store=store)
def workflow(value: int):
return my_task(value).result()
for chunk in workflow.stream(1, stream_mode="custom"):
print(chunk)