{ "cells": [ { "cell_type": "markdown", "id": "562ddb82", "metadata": {}, "source": [ "# How to stream LLM tokens from your graph\n", "\n", "In this example, we will stream tokens from the language model powering an\n", "agent. We will use a ReAct agent as an example. The tl;dr is to use\n", "[streamEvents](https://js.langchain.com/v0.2/docs/how_to/chat_streaming/#stream-events)\n", "([API Ref](https://api.js.langchain.com/classes/langchain_core_runnables.Runnable.html#streamEvents)).\n", "\n", "
\n", "

Note

\n", "

\n", " If you are using a version of @langchain/core < 0.2.3, when calling chat models or LLMs you need to call await model.stream() within your nodes to get token-by-token streaming events, and aggregate final outputs if needed to update the graph state. In later versions of @langchain/core, this occurs automatically, and you can call await model.invoke().\n", "
\n", " For more on how to upgrade @langchain/core, check out the instructions here.\n", "

\n", "
\n", "\n", "This how-to guide closely follows the others in this directory, showing how to\n", "incorporate the functionality into a prototypical agent in LangGraph.\n", "\n", "
\n", "

Streaming Support

\n", "

\n", " Token streaming is supported by many, but not all chat models. Check to see if your LLM integration supports token streaming here (doc). Note that some integrations may support general token streaming but lack support for streaming tool calls.\n", "

\n", "
\n", "\n", "
\n", "

Note

\n", "

\n", " In this how-to, we will create our agent from scratch to be transparent (but verbose). You can accomplish similar functionality using the createReactAgent({ llm, tools }) (API doc) constructor. This may be more appropriate if you are used to LangChain's AgentExecutor class.\n", "

\n", "
\n", "\n", "## Setup\n", "\n", "This guide will use OpenAI's GPT-4o model. We will optionally set our API key\n", "for [LangSmith tracing](https://smith.langchain.com/), which will give us\n", "best-in-class observability.\n", "\n", "---" ] }, { "cell_type": "code", "execution_count": 1, "id": "8e76833b", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "// process.env.OPENAI_API_KEY = \"sk_...\";\n", "\n", "// Optional, add tracing in LangSmith\n", "// process.env.LANGCHAIN_API_KEY = \"ls__...\";\n", "// process.env.LANGCHAIN_CALLBACKS_BACKGROUND = \"true\";\n", "// process.env.LANGCHAIN_TRACING = \"true\";\n", "// process.env.LANGCHAIN_PROJECT = \"Stream Tokens: LangGraphJS\";" ] }, { "cell_type": "markdown", "id": "ab95dc97", "metadata": {}, "source": [ "## Define the state\n", "\n", "The state is the interface for all of the nodes in our graph.\n" ] }, { "cell_type": "code", "execution_count": 2, "id": "1648124b", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import { Annotation } from \"@langchain/langgraph\";\n", "import { BaseMessage } from \"@langchain/core/messages\";\n", "\n", "const StateAnnotation = Annotation.Root({\n", " messages: Annotation({\n", " reducer: (x, y) => x.concat(y),\n", " }),\n", "});" ] }, { "cell_type": "markdown", "id": "da50fbd8", "metadata": {}, "source": [ "## Set up the tools\n", "\n", "First define the tools you want to use. For this simple example, we'll create a placeholder search engine, but see the documentation [here](https://js.langchain.com/v0.2/docs/how_to/custom_tools) on how to create your own custom tools." ] }, { "cell_type": "code", "execution_count": 3, "id": "a8f1ae1c", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import { tool } from \"@langchain/core/tools\";\n", "import { z } from \"zod\";\n", "\n", "const searchTool = tool((_) => {\n", " // This is a placeholder for the actual implementation\n", " return \"Cold, with a low of 3℃\";\n", "}, {\n", " name: \"search\",\n", " description:\n", " \"Use to surf the web, fetch current information, check the weather, and retrieve other information.\",\n", " schema: z.object({\n", " query: z.string().describe(\"The query to use in your search.\"),\n", " }),\n", "});\n", "\n", "await searchTool.invoke({ query: \"What's the weather like?\" });\n", "\n", "const tools = [searchTool];" ] }, { "cell_type": "markdown", "id": "19b27cb3", "metadata": {}, "source": [ "We can now wrap these tools in a prebuilt\n", "[ToolNode](/langgraphjs/reference/classes/langgraph_prebuilt.ToolNode.html).\n", "This object will actually run the tools (functions) whenever they are invoked by\n", "our LLM." ] }, { "cell_type": "code", "execution_count": 4, "id": "f02278b1", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import { ToolNode } from \"@langchain/langgraph/prebuilt\";\n", "\n", "const toolNode = new ToolNode(tools);" ] }, { "cell_type": "markdown", "id": "dd55ee5a", "metadata": {}, "source": [ "## Set up the model\n", "\n", "Now load the [chat model](https://js.langchain.com/v0.2/docs/concepts/#chat-models).\n", "\n", "1. It should work with messages. We will represent all agent state in the form\n", " of messages, so it needs to be able to work well with them.\n", "2. It should work with\n", " [tool calling](https://js.langchain.com/v0.2/docs/how_to/tool_calling/#passing-tools-to-llms),\n", " meaning it can return function arguments in its response.\n", "\n", "
\n", "

Note

\n", "

\n", " These model requirements are not general requirements for using LangGraph - they are just requirements for this one example.\n", "

\n", "
" ] }, { "cell_type": "code", "execution_count": 5, "id": "9c7210e7", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import { ChatOpenAI } from \"@langchain/openai\";\n", "\n", "const model = new ChatOpenAI({ model: \"gpt-4o\", temperature: 0 });" ] }, { "cell_type": "markdown", "id": "73e59248", "metadata": {}, "source": [ "After you've done this, we should make sure the model knows that it has these\n", "tools available to call. We can do this by calling\n", "[bindTools](https://v01.api.js.langchain.com/classes/langchain_core_language_models_chat_models.BaseChatModel.html#bindTools)." ] }, { "cell_type": "code", "execution_count": 6, "id": "b4ff23ee", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "const boundModel = model.bindTools(tools);" ] }, { "cell_type": "markdown", "id": "dbe67356", "metadata": {}, "source": [ "## Define the graph\n", "\n", "We can now put it all together." ] }, { "cell_type": "code", "execution_count": 7, "id": "0ba603bb", "metadata": {}, "outputs": [], "source": [ "import { StateGraph, END } from \"@langchain/langgraph\";\n", "import { AIMessage } from \"@langchain/core/messages\";\n", "\n", "const routeMessage = (state: typeof StateAnnotation.State) => {\n", " const { messages } = state;\n", " const lastMessage = messages[messages.length - 1] as AIMessage;\n", " // If no tools are called, we can finish (respond to the user)\n", " if (!lastMessage?.tool_calls?.length) {\n", " return END;\n", " }\n", " // Otherwise if there is, we continue and call the tools\n", " return \"tools\";\n", "};\n", "\n", "const callModel = async (\n", " state: typeof StateAnnotation.State,\n", ") => {\n", " // For versions of @langchain/core < 0.2.3, you must call `.stream()`\n", " // and aggregate the message from chunks instead of calling `.invoke()`.\n", " const { messages } = state;\n", " const responseMessage = await boundModel.invoke(messages);\n", " return { messages: [responseMessage] };\n", "};\n", "\n", "const workflow = new StateGraph(StateAnnotation)\n", " .addNode(\"agent\", callModel)\n", " .addNode(\"tools\", toolNode)\n", " .addEdge(\"__start__\", \"agent\")\n", " .addConditionalEdges(\"agent\", routeMessage)\n", " .addEdge(\"tools\", \"agent\");\n", "\n", "const agent = workflow.compile();" ] }, { "cell_type": "code", "execution_count": 8, "id": "a88cf20a", "metadata": {}, "outputs": [ { "data": { "image/png": "" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import * as tslab from \"tslab\";\n", "\n", "const runnableGraph = agent.getGraph();\n", "const image = await runnableGraph.drawMermaidPng();\n", "const arrayBuffer = await image.arrayBuffer();\n", "\n", "await tslab.display.png(new Uint8Array(arrayBuffer));" ] }, { "cell_type": "markdown", "id": "055aacad", "metadata": {}, "source": [ "## How to stream tool calls\n", "\n", "You can now run your agent. Let's first look at an example of streaming back intermediate tool calls. This is not supported by all providers, but some support token-level streaming of tool invocations.\n", "\n", "To get the partially populated tool calls, you can access the message chunks' `tool_call_chunks` property:" ] }, { "cell_type": "code", "execution_count": 9, "id": "c704d23c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[\n", " {\n", " name: 'search',\n", " args: '',\n", " id: 'call_ziGo5u8fYyqQ78SdLZTEC9Vg',\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: '{\"',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: 'query',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: '\":\"',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: 'current',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: ' weather',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n", "[\n", " {\n", " name: undefined,\n", " args: '\"}',\n", " id: undefined,\n", " index: 0,\n", " type: 'tool_call_chunk'\n", " }\n", "]\n" ] } ], "source": [ "import type { AIMessageChunk } from \"@langchain/core/messages\";\n", "\n", "const eventStream = await agent.streamEvents(\n", " { messages: [{role: \"user\", content: \"What's the weather like today?\" }] },\n", " {\n", " version: \"v2\",\n", " },\n", ");\n", "\n", "for await (const { event, data } of eventStream) {\n", " if (event === \"on_chat_model_stream\") {\n", " const msg = data.chunk as AIMessageChunk;\n", " if (msg.tool_call_chunks !== undefined && msg.tool_call_chunks.length > 0) {\n", " console.log(msg.tool_call_chunks);\n", " }\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "1d9b168b", "metadata": {}, "source": [ "Because this is a ReAct-style agent, this will only log intermediate steps and not the final response because the model generates a final response with no tool calls when it no longer needs to gather more information from calling tools.\n", "\n", "## Streaming final responses\n", "\n", "### ReAct agents\n", "\n", "For ReAct-style agents, you know that as soon as you start message chunks with no `tool_call_chunks`, the model is responding directly to the user. So we can flip the conditional like this to only log tokens from the final response:" ] }, { "cell_type": "code", "execution_count": 10, "id": "86f843bb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "\n", "The\n", " weather\n", " today\n", " is\n", " cold\n", ",\n", " with\n", " a\n", " low\n", " of\n", " \n", "3\n", "℃\n", ".\n", "\n" ] } ], "source": [ "const eventStreamFinalRes = await agent.streamEvents(\n", " { messages: [{ role: \"user\", content: \"What's the weather like today?\" }] },\n", " { version: \"v2\" });\n", "\n", "for await (const { event, data } of eventStreamFinalRes) {\n", " if (event === \"on_chat_model_stream\") {\n", " const msg = data.chunk as AIMessageChunk;\n", " if (!msg.tool_call_chunks?.length) {\n", " console.log(msg.content);\n", " }\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "f13b4790", "metadata": {}, "source": [ "### Other graphs\n", "\n", "If your graph has multiple model calls in multiple nodes and there's one that will always be called last, you can distinguish that model by assigning it a run name or a tag. To illustrate this, declare a new graph like this:" ] }, { "cell_type": "code", "execution_count": 11, "id": "0fea2f20", "metadata": {}, "outputs": [], "source": [ "const OtherGraphAnnotation = Annotation.Root({\n", " messages: Annotation({\n", " reducer: (x, y) => x.concat(y),\n", " }),\n", "});\n", "\n", "const respond = async (state: typeof OtherGraphAnnotation.State) => {\n", " const { messages } = state;\n", " const model = new ChatOpenAI({ model: \"gpt-4o\", temperature: 0 });\n", " const responseMessage = await model.invoke(messages);\n", " return {\n", " messages: [responseMessage],\n", " }\n", "};\n", "\n", "const summarize = async (state: typeof OtherGraphAnnotation.State) => {\n", " const { messages } = state;\n", " // Assign the final model call a run name\n", " const model = new ChatOpenAI({\n", " model: \"gpt-4o\",\n", " temperature: 0\n", " }).withConfig({ runName: \"Summarizer\" });\n", " const userMessage = { role: \"human\", content: \"Now, summarize the above messages\" };\n", " const responseMessage = await model.invoke([\n", " ...messages,\n", " userMessage,\n", " ]);\n", " return { \n", " messages: [userMessage, responseMessage]\n", " };\n", "}\n", "\n", "const otherWorkflow = new StateGraph(OtherGraphAnnotation)\n", " .addNode(\"respond\", respond)\n", " .addNode(\"summarize\", summarize)\n", " .addEdge(\"__start__\", \"respond\")\n", " .addEdge(\"respond\", \"summarize\")\n", " .addEdge(\"summarize\", \"__end__\");\n", "\n", "const otherGraph = otherWorkflow.compile();" ] }, { "cell_type": "code", "execution_count": 12, "id": "2149f527", "metadata": {}, "outputs": [ { "data": { "image/png": "" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "const otherRunnableGraph = otherGraph.getGraph();\n", "const otherImage = await otherRunnableGraph.drawMermaidPng();\n", "const otherArrayBuffer = await otherImage.arrayBuffer();\n", "\n", "await tslab.display.png(new Uint8Array(otherArrayBuffer));" ] }, { "cell_type": "markdown", "id": "5ff9d991", "metadata": {}, "source": [ "Now when we call `streamEvents`, we can see that we can now filter on run name to only see the final summary generation of the current chat history:" ] }, { "cell_type": "code", "execution_count": 13, "id": "51381303", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "You\n", " asked\n", " about\n", " the\n", " capital\n", " of\n", " Nepal\n", ",\n", " and\n", " I\n", " responded\n", " that\n", " it\n", " is\n", " Kathmandu\n", ".\n", "\n" ] } ], "source": [ "const otherEventStream = await otherGraph.streamEvents(\n", " { messages: [{ role: \"user\", content: \"What's the capital of Nepal?\" }] },\n", " { version: \"v2\" },\n", " { includeNames: [\"Summarizer\"] }\n", ");\n", "\n", "for await (const { event, data } of otherEventStream) {\n", " if (event === \"on_chat_model_stream\") {\n", " console.log(data.chunk.content);\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "46998967", "metadata": {}, "source": [ "And you can see the resulting chunks are only ones from the final summary model call.\n", "\n", "## Next steps\n", "\n", "You've now seen some ways to stream LLM tokens from within your graph. Next, check out some of the other how-tos around streaming by going [to this page](/langgraphjs/how-tos/#streaming)." ] } ], "metadata": { "kernelspec": { "display_name": "TypeScript", "language": "typescript", "name": "tslab" }, "language_info": { "codemirror_mode": { "mode": "typescript", "name": "javascript", "typescript": true }, "file_extension": ".ts", "mimetype": "text/typescript", "name": "typescript", "version": "3.7.2" } }, "nbformat": 4, "nbformat_minor": 5 }