跳到内容

使用流式 React

如何将 LangGraph 集成到您的 React 应用程序中# 如何将 LangGraph 集成到您的 React 应用程序中

useStream() React Hook 提供了一种将 LangGraph 无缝集成到您的 React 应用程序中的方式。它处理流式传输、状态管理和分支逻辑的所有复杂性,让您专注于构建出色的聊天体验。

主要功能

  • 消息流式传输:处理消息块流以形成完整的消息
  • 消息、中断、加载状态和错误的自动状态管理
  • 对话分支:从聊天历史中的任意点创建备用对话路径
  • UI 无关设计:使用您自己的组件和样式

让我们探索如何在您的 React 应用程序中使用 useStream()

useStream() 为创建定制的聊天体验提供了坚实的基础。对于预构建的聊天组件和界面,我们还建议查看 CopilotKitassistant-ui

安装

npm install @langchain/langgraph-sdk @langchain/core

示例

"use client";

import { useStream } from "@langchain/langgraph-sdk/react";
import type { Message } from "@langchain/langgraph-sdk";

export default function App() {
  const thread = useStream<{ messages: Message[] }>({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    messagesKey: "messages",
  });

  return (
    <div>
      <div>
        {thread.messages.map((message) => (
          <div key={message.id}>{message.content as string}</div>
        ))}
      </div>

      <form
        onSubmit={(e) => {
          e.preventDefault();

          const form = e.target as HTMLFormElement;
          const message = new FormData(form).get("message") as string;

          form.reset();
          thread.submit({ messages: [{ type: "human", content: message }] });
        }}
      >
        <input type="text" name="message" />

        {thread.isLoading ? (
          <button key="stop" type="button" onClick={() => thread.stop()}>
            Stop
          </button>
        ) : (
          <button keytype="submit">Send</button>
        )}
      </form>
    </div>
  );
}

自定义您的 UI

useStream() Hook 在幕后负责所有复杂的状体管理,为您提供简单的接口来构建您的 UI。以下是您开箱即用的功能:

  • 线程状态管理
  • 加载和错误状态
  • 中断
  • 消息处理和更新
  • 分支支持

以下是一些如何有效使用这些功能的示例:

加载状态

isLoading 属性告诉您流何时处于活动状态,使您能够:

  • 显示加载指示器
  • 在处理期间禁用输入字段
  • 显示取消按钮
export default function App() {
  const { isLoading, stop } = useStream<{ messages: Message[] }>({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    messagesKey: "messages",
  });

  return (
    <form>
      {isLoading && (
        <button key="stop" type="button" onClick={() => stop()}>
          Stop
        </button>
      )}
    </form>
  );
}

页面刷新后恢复流

通过设置 reconnectOnMount: trueuseStream() Hook 可以在挂载时自动恢复正在进行的运行。这对于页面刷新后继续流式传输非常有用,确保在停机期间生成的任何消息和事件都不会丢失。

const thread = useStream<{ messages: Message[] }>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  reconnectOnMount: true,
});

默认情况下,创建的运行 ID 存储在 window.sessionStorage 中,您可以通过在 reconnectOnMount 中传递自定义存储来替换它。此存储用于持久化线程中正在进行的运行 ID(在 lg:stream:${threadId} 键下)。

const thread = useStream<{ messages: Message[] }>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  reconnectOnMount: () => window.localStorage,
});

您还可以通过使用运行回调来持久化运行元数据,并使用 joinStream 函数来恢复流,从而手动管理恢复过程。请务必在创建运行时传递 streamResumable: true;否则某些事件可能会丢失。

import type { Message } from "@langchain/langgraph-sdk";
import { useStream } from "@langchain/langgraph-sdk/react";
import { useCallback, useState, useEffect, useRef } from "react";

export default function App() {
  const [threadId, onThreadId] = useSearchParam("threadId");

  const thread = useStream<{ messages: Message[] }>({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",

    threadId,
    onThreadId,

    onCreated: (run) => {
      window.sessionStorage.setItem(`resume:${run.thread_id}`, run.run_id);
    },
    onFinish: (_, run) => {
      window.sessionStorage.removeItem(`resume:${run?.thread_id}`);
    },
  });

  // Ensure that we only join the stream once per thread.
  const joinedThreadId = useRef<string | null>(null);
  useEffect(() => {
    if (!threadId) return;

    const resume = window.sessionStorage.getItem(`resume:${threadId}`);
    if (resume && joinedThreadId.current !== threadId) {
      thread.joinStream(resume);
      joinedThreadId.current = threadId;
    }
  }, [threadId]);

  return (
    <form
      onSubmit={(e) => {
        e.preventDefault();
        const form = e.target as HTMLFormElement;
        const message = new FormData(form).get("message") as string;
        thread.submit(
          { messages: [{ type: "human", content: message }] },
          { streamResumable: true }
        );
      }}
    >
      <div>
        {thread.messages.map((message) => (
          <div key={message.id}>{message.content as string}</div>
        ))}
      </div>
      <input type="text" name="message" />
      <button type="submit">Send</button>
    </form>
  );
}

// Utility method to retrieve and persist data in URL as search param
function useSearchParam(key: string) {
  const [value, setValue] = useState<string | null>(() => {
    const params = new URLSearchParams(window.location.search);
    return params.get(key) ?? null;
  });

  const update = useCallback(
    (value: string | null) => {
      setValue(value);

      const url = new URL(window.location.href);
      if (value == null) {
        url.searchParams.delete(key);
      } else {
        url.searchParams.set(key, value);
      }

      window.history.pushState({}, "", url.toString());
    },
    [key]
  );

  return [value, update] as const;
}
```

### Thread Management

Keep track of conversations with built-in thread management. You can access the current thread ID and get notified when new threads are created:

```tsx
const [threadId, setThreadId] = useState<string | null>(null);

const thread = useStream<{ messages: Message[] }>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",

  threadId: threadId,
  onThreadId: setThreadId,
});

我们建议将 threadId 存储在 URL 的查询参数中,以便用户在页面刷新后恢复对话。

消息处理

useStream() Hook 将跟踪从服务器接收到的消息块,并将它们连接起来形成一个完整的消息。完成的消息块可以通过 messages 属性检索。

默认情况下,messagesKey 设置为 messages,它会将新的消息块附加到 values["messages"]。如果您的消息存储在不同的键中,您可以更改 messagesKey 的值。

import type { Message } from "@langchain/langgraph-sdk";
import { useStream } from "@langchain/langgraph-sdk/react";

export default function HomePage() {
  const thread = useStream<{ messages: Message[] }>({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    messagesKey: "messages",
  });

  return (
    <div>
      {thread.messages.map((message) => (
        <div key={message.id}>{message.content as string}</div>
      ))}
    </div>
  );
}

在内部,useStream() Hook 将使用 streamMode: "messages-tuple" 从您的图节点内的任何 LangChain 聊天模型调用中接收消息流(即单个 LLM 令牌)。在流式传输指南中了解更多关于消息流式传输的信息。

中断

useStream() Hook 暴露了 interrupt 属性,该属性将填充线程的最后一个中断。您可以使用中断来:

  • 在执行节点之前渲染确认 UI
  • 等待人工输入,允许代理向用户提出澄清问题

如何处理中断指南中了解更多关于中断的信息。

const thread = useStream<{ messages: Message[] }, { InterruptType: string }>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  messagesKey: "messages",
});

if (thread.interrupt) {
  return (
    <div>
      Interrupted! {thread.interrupt.value}
      <button
        type="button"
        onClick={() => {
          // `resume` can be any value that the agent accepts
          thread.submit(undefined, { command: { resume: true } });
        }}
      >
        Resume
      </button>
    </div>
  );
}

分支

对于每条消息,您可以使用 getMessagesMetadata() 获取消息首次出现的第一个检查点。然后,您可以从第一个检查点之前的检查点创建一个新的运行,以在线程中创建一个新分支。

可以通过以下方式创建分支:

  1. 编辑上一条用户消息。
  2. 请求重新生成上一条助手消息。
"use client";

import type { Message } from "@langchain/langgraph-sdk";
import { useStream } from "@langchain/langgraph-sdk/react";
import { useState } from "react";

function BranchSwitcher({
  branch,
  branchOptions,
  onSelect,
}: {
  branch: string | undefined;
  branchOptions: string[] | undefined;
  onSelect: (branch: string) => void;
}) {
  if (!branchOptions || !branch) return null;
  const index = branchOptions.indexOf(branch);

  return (
    <div className="flex items-center gap-2">
      <button
        type="button"
        onClick={() => {
          const prevBranch = branchOptions[index - 1];
          if (!prevBranch) return;
          onSelect(prevBranch);
        }}
      >
        Prev
      </button>
      <span>
        {index + 1} / {branchOptions.length}
      </span>
      <button
        type="button"
        onClick={() => {
          const nextBranch = branchOptions[index + 1];
          if (!nextBranch) return;
          onSelect(nextBranch);
        }}
      >
        Next
      </button>
    </div>
  );
}

function EditMessage({
  message,
  onEdit,
}: {
  message: Message;
  onEdit: (message: Message) => void;
}) {
  const [editing, setEditing] = useState(false);

  if (!editing) {
    return (
      <button type="button" onClick={() => setEditing(true)}>
        Edit
      </button>
    );
  }

  return (
    <form
      onSubmit={(e) => {
        e.preventDefault();
        const form = e.target as HTMLFormElement;
        const content = new FormData(form).get("content") as string;

        form.reset();
        onEdit({ type: "human", content });
        setEditing(false);
      }}
    >
      <input name="content" defaultValue={message.content as string} />
      <button type="submit">Save</button>
    </form>
  );
}

export default function App() {
  const thread = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    messagesKey: "messages",
  });

  return (
    <div>
      <div>
        {thread.messages.map((message) => {
          const meta = thread.getMessagesMetadata(message);
          const parentCheckpoint = meta?.firstSeenState?.parent_checkpoint;

          return (
            <div key={message.id}>
              <div>{message.content as string}</div>

              {message.type === "human" && (
                <EditMessage
                  message={message}
                  onEdit={(message) =>
                    thread.submit(
                      { messages: [message] },
                      { checkpoint: parentCheckpoint }
                    )
                  }
                />
              )}

              {message.type === "ai" && (
                <button
                  type="button"
                  onClick={() =>
                    thread.submit(undefined, { checkpoint: parentCheckpoint })
                  }
                >
                  <span>Regenerate</span>
                </button>
              )}

              <BranchSwitcher
                branch={meta?.branch}
                branchOptions={meta?.branchOptions}
                onSelect={(branch) => thread.setBranch(branch)}
              />
            </div>
          );
        })}
      </div>

      <form
        onSubmit={(e) => {
          e.preventDefault();

          const form = e.target as HTMLFormElement;
          const message = new FormData(form).get("message") as string;

          form.reset();
          thread.submit({ messages: [message] });
        }}
      >
        <input type="text" name="message" />

        {thread.isLoading ? (
          <button key="stop" type="button" onClick={() => thread.stop()}>
            Stop
          </button>
        ) : (
          <button key="submit" type="submit">
            Send
          </button>
        )}
      </form>
    </div>
  );
}

对于高级用例,您可以使用 experimental_branchTree 属性获取线程的树表示,这可用于为非基于消息的图渲染分支控件。

乐观更新

您可以在向代理执行网络请求之前乐观地更新客户端状态,从而立即向用户提供反馈,例如在代理看到请求之前立即显示用户消息。

const stream = useStream({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  messagesKey: "messages",
});

const handleSubmit = (text: string) => {
  const newMessage = { type: "human" as const, content: text };

  stream.submit(
    { messages: [newMessage] },
    {
      optimisticValues(prev) {
        const prevMessages = prev.messages ?? [];
        const newMessages = [...prevMessages, newMessage];
        return { ...prev, messages: newMessages };
      },
    }
  );
};

TypeScript

useStream() Hook 对用 TypeScript 编写的应用程序友好,您可以为状态指定类型以获得更好的类型安全和 IDE 支持。

// Define your types
type State = {
  messages: Message[];
  context?: Record<string, unknown>;
};

// Use them with the hook
const thread = useStream<State>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  messagesKey: "messages",
});

您还可以选择为不同的场景指定类型,例如:

  • ConfigurableTypeconfig.configurable 属性的类型(默认值:Record<string, unknown>
  • InterruptType:中断值的类型,即 interrupt(...) 函数的内容(默认值:unknown
  • CustomEventType:自定义事件的类型(默认值:unknown
  • UpdateType:提交函数的类型(默认值:Partial<State>
const thread = useStream<
  State,
  {
    UpdateType: {
      messages: Message[] | Message;
      context?: Record<string, unknown>;
    };
    InterruptType: string;
    CustomEventType: {
      type: "progress" | "debug";
      payload: unknown;
    };
    ConfigurableType: {
      model: string;
    };
  }
>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  messagesKey: "messages",
});

如果您正在使用 LangGraph.js,您还可以重用您的图的注解类型。但是,请确保只导入注解模式的类型,以避免导入整个 LangGraph.js 运行时(即通过 import type { ... } 指令)。

import {
  Annotation,
  MessagesAnnotation,
  type StateType,
  type UpdateType,
} from "@langchain/langgraph/web";

const AgentState = Annotation.Root({
  ...MessagesAnnotation.spec,
  context: Annotation<string>(),
});

const thread = useStream<
  StateType<typeof AgentState.spec>,
  { UpdateType: UpdateType<typeof AgentState.spec> }
>({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  messagesKey: "messages",
});

事件处理

useStream() Hook 提供了几个回调选项来帮助您响应不同的事件:

  • onError:发生错误时调用。
  • onFinish:流完成时调用。
  • onUpdateEvent:收到更新事件时调用。
  • onCustomEvent:收到自定义事件时调用。请参阅流式传输指南,了解如何流式传输自定义事件。
  • onMetadataEvent:收到元数据事件时调用,其中包含运行 ID 和线程 ID。

了解更多