{ "cells": [ { "cell_type": "markdown", "id": "15c4bd28", "metadata": {}, "source": [ "# How to stream custom data\n", "\n", "
\n", "

Prerequisites

\n", "

\n", " This guide assumes familiarity with the following:\n", "

\n", "

\n", "
\n", "\n", "The most common use case for streaming from inside a node is to stream LLM tokens, but you may also want to stream custom data.\n", "\n", "For example, if you have a long-running tool call, you can dispatch custom events between the steps and use these custom events to monitor progress. You could also surface these custom events to an end user of your application to show them how the current task is progressing.\n", "\n", "You can do so in two ways:\n", "* using graph's `.stream` / `.astream` methods with `stream_mode=\"custom\"`\n", "* emitting custom events using [adispatch_custom_events](https://python.langchain.com/docs/how_to/callbacks_custom_events/).\n", "\n", "Below we'll see how to use both APIs.\n", "\n", "## Setup\n", "\n", "First, let's install our required packages" ] }, { "cell_type": "code", "execution_count": 1, "id": "e1a20f31", "metadata": {}, "outputs": [], "source": [ "%%capture --no-stderr\n", "%pip install -U langgraph" ] }, { "cell_type": "markdown", "id": "12297071", "metadata": {}, "source": [ "
\n", "

Set up LangSmith for LangGraph development

\n", "

\n", " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", "

\n", "
" ] }, { "cell_type": "markdown", "id": "29814253-ca9b-4844-a8a5-d6b19fbdbdba", "metadata": {}, "source": [ "## Stream custom data using `.stream / .astream`" ] }, { "cell_type": "markdown", "id": "b729644a-b65f-4e69-ad45-f2e88ffb4e9d", "metadata": {}, "source": [ "### Define the graph" ] }, { "cell_type": "code", "execution_count": 2, "id": "9731c40f-5ce7-460d-b2ad-33185529c99d", "metadata": {}, "outputs": [], "source": [ "from langchain_core.messages import AIMessage\n", "from langgraph.graph import START, StateGraph, MessagesState, END\n", "from langgraph.types import StreamWriter\n", "\n", "\n", "async def my_node(\n", " state: MessagesState,\n", " writer: StreamWriter, # <-- provide StreamWriter to write chunks to be streamed\n", "):\n", " chunks = [\n", " \"Four\",\n", " \"score\",\n", " \"and\",\n", " \"seven\",\n", " \"years\",\n", " \"ago\",\n", " \"our\",\n", " \"fathers\",\n", " \"...\",\n", " ]\n", " for chunk in chunks:\n", " # write the chunk to be streamed using stream_mode=custom\n", " writer(chunk)\n", "\n", " return {\"messages\": [AIMessage(content=\" \".join(chunks))]}\n", "\n", "\n", "# Define a new graph\n", "workflow = StateGraph(MessagesState)\n", "\n", "workflow.add_node(\"model\", my_node)\n", "workflow.add_edge(START, \"model\")\n", "workflow.add_edge(\"model\", END)\n", "\n", "app = workflow.compile()" ] }, { "cell_type": "markdown", "id": "ecd69eed-9624-4640-b0af-c9f82b190900", "metadata": {}, "source": [ "### Stream content" ] }, { "cell_type": "code", "execution_count": 5, "id": "00a91b15-82c7-443c-acb6-a7406df15cee", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Four\n", "score\n", "and\n", "seven\n", "years\n", "ago\n", "our\n", "fathers\n", "...\n" ] } ], "source": [ "from langchain_core.messages import HumanMessage\n", "\n", "inputs = [HumanMessage(content=\"What are you thinking about?\")]\n", "async for chunk in app.astream({\"messages\": inputs}, stream_mode=\"custom\"):\n", " print(chunk, flush=True)" ] }, { "cell_type": "markdown", "id": "c7b9f1f0-c170-40dc-9c22-289483dfbc99", "metadata": {}, "source": [ "You will likely need to use [multiple streaming modes](https://langchain-ai.github.io/langgraph/how-tos/stream-multiple/) as you will\n", "want access to both the custom data and the state updates." ] }, { "cell_type": "code", "execution_count": 6, "id": "f8ed22d4-6ce6-4b04-a68b-2ea516e3ab15", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "('custom', 'Four')\n", "('custom', 'score')\n", "('custom', 'and')\n", "('custom', 'seven')\n", "('custom', 'years')\n", "('custom', 'ago')\n", "('custom', 'our')\n", "('custom', 'fathers')\n", "('custom', '...')\n", "('updates', {'model': {'messages': [AIMessage(content='Four score and seven years ago our fathers ...', additional_kwargs={}, response_metadata={})]}})\n" ] } ], "source": [ "from langchain_core.messages import HumanMessage\n", "\n", "inputs = [HumanMessage(content=\"What are you thinking about?\")]\n", "async for chunk in app.astream({\"messages\": inputs}, stream_mode=[\"custom\", \"updates\"]):\n", " print(chunk, flush=True)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ca976d6a-7c64-4603-8bb4-dee95428c33d", "metadata": {}, "source": [ "## Stream custom data using `.astream_events`\n", "\n", "If you are already using graph's `.astream_events` method in your workflow, you can also stream custom data by emitting custom events using `adispatch_custom_event`\n", "\n", "
\n", "

ASYNC IN PYTHON<=3.10

\n", "

\n", "\n", "LangChain cannot automatically propagate configuration, including callbacks necessary for `astream_events()`, to child runnables if you are running async code in python<=3.10. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n", "\n", "If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the node below with `adispatch_custom_event`.\n", "\n", "If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in other Python versions.\n", "

\n", "
" ] }, { "cell_type": "markdown", "id": "b390a9fe-2d5f-4e82-a1ea-c7c0186b8559", "metadata": {}, "source": [ "### Define the graph" ] }, { "cell_type": "code", "execution_count": 19, "id": "486a01a0", "metadata": {}, "outputs": [], "source": [ "from langchain_core.runnables import RunnableConfig, RunnableLambda\n", "from langchain_core.callbacks.manager import adispatch_custom_event\n", "\n", "\n", "async def my_node(state: MessagesState, config: RunnableConfig):\n", " chunks = [\n", " \"Four\",\n", " \"score\",\n", " \"and\",\n", " \"seven\",\n", " \"years\",\n", " \"ago\",\n", " \"our\",\n", " \"fathers\",\n", " \"...\",\n", " ]\n", " for chunk in chunks:\n", " await adispatch_custom_event(\n", " \"my_custom_event\",\n", " {\"chunk\": chunk},\n", " config=config, # <-- propagate config\n", " )\n", "\n", " return {\"messages\": [AIMessage(content=\" \".join(chunks))]}\n", "\n", "\n", "# Define a new graph\n", "workflow = StateGraph(MessagesState)\n", "\n", "workflow.add_node(\"model\", my_node)\n", "workflow.add_edge(START, \"model\")\n", "workflow.add_edge(\"model\", END)\n", "\n", "app = workflow.compile()" ] }, { "cell_type": "markdown", "id": "7dcded03-6776-405e-afae-005a3212d3e4", "metadata": {}, "source": [ "### Stream content" ] }, { "cell_type": "code", "execution_count": 20, "id": "ce773a40", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Four|score|and|seven|years|ago|our|fathers|...|" ] } ], "source": [ "from langchain_core.messages import HumanMessage\n", "\n", "inputs = [HumanMessage(content=\"What are you thinking about?\")]\n", "async for event in app.astream_events({\"messages\": inputs}, version=\"v2\"):\n", " tags = event.get(\"tags\", [])\n", " if event[\"event\"] == \"on_custom_event\" and event[\"name\"] == \"my_custom_event\":\n", " data = event[\"data\"]\n", " if data:\n", " print(data[\"chunk\"], end=\"|\", flush=True)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" } }, "nbformat": 4, "nbformat_minor": 5 }