跳到内容

LLMCompiler

本笔记本展示了如何在 LangGraph 中实现 LLMCompiler,作者为 Kim 等人

LLMCompiler 是一种 agent 架构,旨在通过在 DAG 中尽早执行任务来加速 agent 任务的执行。它还通过减少对 LLM 的调用次数来节省冗余令牌使用成本。以下是其计算图的概述

LLMCompiler Graph

它有 3 个主要组件

  1. 规划器:流式传输任务的 DAG。
  2. 任务获取单元:调度和执行可执行的任务
  3. 连接器:响应用户或触发第二个计划

本笔记本将逐步介绍每个组件,并展示如何使用 LangGraph 将它们连接在一起。最终结果将留下一个 如下所示的跟踪

设置

首先,让我们安装所需的软件包并设置我们的 API 密钥

%%capture --no-stderr
%pip install -U --quiet langchain_openai langsmith langgraph langchain numexpr
import getpass
import os


def _get_pass(var: str):
    if var not in os.environ:
        os.environ[var] = getpass.getpass(f"{var}: ")


_get_pass("OPENAI_API_KEY")

设置 LangSmith 以进行 LangGraph 开发

注册 LangSmith 以快速发现问题并提高您的 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序——在此处阅读更多关于如何开始的信息 here

辅助文件

数学工具

将以下代码放在名为 math_tools.py 的文件中,并确保您可以将其导入到此笔记本中。

输出解析器

定义工具

我们首先定义 agent 在我们的演示中使用的工具。我们将为其提供搜索引擎 + 计算器组合。

如果您不想注册 tavily,您可以将其替换为免费的 DuckDuckGo

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from math_tools import get_math_tool

_get_pass("TAVILY_API_KEY")

calculate = get_math_tool(ChatOpenAI(model="gpt-4-turbo-preview"))
search = TavilySearchResults(
    max_results=1,
    description='tavily_search_results_json(query="the search query") - a search engine.',
)

tools = [search, calculate]

API 参考: TavilySearchResults | ChatOpenAI

calculate.invoke(
    {
        "problem": "What's the temp of sf + 5?",
        "context": ["Thet empreature of sf is 32 degrees"],
    }
)
'37'

规划器

规划器主要改编自 原始源代码,接受输入问题并生成要执行的任务列表。

如果提供了先前的计划,则会指示其重新计划,这在完成第一批任务后,agent 必须采取更多操作时很有用。

以下代码构建了规划器的提示模板,并将其与 LLM 和在 output_parser.py 中定义的输出解析器组合。输出解析器处理以下形式的任务列表

1. tool_1(arg1="arg1", arg2=3.5, ...)
Thought: I then want to find out Y by using tool_2
2. tool_2(arg1="", arg2="${1}")'
3. join()<END_OF_PLAN>"

“Thought” 行是可选的。${#} 占位符是变量。这些用于将工具(任务)输出路由到其他工具。

from typing import Sequence

from langchain import hub
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    BaseMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from output_parser import LLMCompilerPlanParser, Task

prompt = hub.pull("wfh/llm-compiler")
print(prompt.pretty_print())

API 参考: BaseChatModel | BaseMessage | FunctionMessage | HumanMessage | SystemMessage | ChatPromptTemplate | RunnableBranch | BaseTool | ChatOpenAI

================================ System Message ================================

Given a user query, create a plan to solve it with the utmost parallelizability. Each plan should comprise an action from the following {num_tools} types:
{tool_descriptions}
{num_tools}. join(): Collects and combines results from prior actions.

 - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.
 - join should always be the last action in the plan, and will be called in two scenarios:
   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.
   (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:
 - Each action described above contains input/output types and description.
    - You must strictly adhere to the input and output types for each action.
    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.
 - Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action.
 - Each action MUST have a unique ID, which is strictly increasing.
 - Inputs for actions can either be constants or outputs from preceding actions. In the latter case, use the format $id to denote the ID of the previous action whose output will be the input.
 - Always call join as the last action in the plan. Say '<END_OF_PLAN>' after you call join
 - Ensure the plan maximizes parallelizability.
 - Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps.
 - Never introduce new actions other than the ones provided.

============================= Messages Placeholder =============================

{messages}

================================ System Message ================================

Remember, ONLY respond with the task list in the correct format! E.g.:
idx. tool(arg_name=args)
None

def create_planner(
    llm: BaseChatModel, tools: Sequence[BaseTool], base_prompt: ChatPromptTemplate
):
    tool_descriptions = "\n".join(
        f"{i+1}. {tool.description}\n"
        for i, tool in enumerate(
            tools
        )  # +1 to offset the 0 starting index, we want it count normally from 1.
    )
    planner_prompt = base_prompt.partial(
        replan="",
        num_tools=len(tools)
        + 1,  # Add one because we're adding the join() tool at the end.
        tool_descriptions=tool_descriptions,
    )
    replanner_prompt = base_prompt.partial(
        replan=' - You are given "Previous Plan" which is the plan that the previous agent created along with the execution results '
        "(given as Observation) of each plan and a general thought (given as Thought) about the executed results."
        'You MUST use these information to create the next plan under "Current Plan".\n'
        ' - When starting the Current Plan, you should start with "Thought" that outlines the strategy for the next plan.\n'
        " - In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.\n"
        " - You must continue the task index from the end of the previous one. Do not repeat task indices.",
        num_tools=len(tools) + 1,
        tool_descriptions=tool_descriptions,
    )

    def should_replan(state: list):
        # Context is passed as a system message
        return isinstance(state[-1], SystemMessage)

    def wrap_messages(state: list):
        return {"messages": state}

    def wrap_and_get_last_index(state: list):
        next_task = 0
        for message in state[::-1]:
            if isinstance(message, FunctionMessage):
                next_task = message.additional_kwargs["idx"] + 1
                break
        state[-1].content = state[-1].content + f" - Begin counting at : {next_task}"
        return {"messages": state}

    return (
        RunnableBranch(
            (should_replan, wrap_and_get_last_index | replanner_prompt),
            wrap_messages | planner_prompt,
        )
        | llm
        | LLMCompilerPlanParser(tools=tools)
    )
llm = ChatOpenAI(model="gpt-4-turbo-preview")
# This is the primary "agent" in our application
planner = create_planner(llm, tools, prompt)

example_question = "What's the temperature in SF raised to the 3rd power?"

for task in planner.stream([HumanMessage(content=example_question)]):
    print(task["tool"], task["args"])
    print("---")
description='tavily_search_results_json(query="the search query") - a search engine.' max_results=1 api_wrapper=TavilySearchAPIWrapper(tavily_api_key=SecretStr('**********')) {'query': 'current temperature in San Francisco'}
---
name='math' description='math(problem: str, context: Optional[list[str]]) -> float:\n - Solves the provided math problem.\n - `problem` can be either a simple math problem (e.g. "1 + 3") or a word problem (e.g. "how many apples are there if there are 3 apples and 2 apples").\n - You cannot calculate multiple expressions in one call. For instance, `math(\'1 + 3, 2 + 4\')` does not work. If you need to calculate multiple expressions, you need to call them separately like `math(\'1 + 3\')` and then `math(\'2 + 4\')`\n - Minimize the number of `math` actions as much as possible. For instance, instead of calling 2. math("what is the 10% of $1") and then call 3. math("$1 + $2"), you MUST call 2. math("what is the 110% of $1") instead, which will reduce the number of math actions.\n - You can optionally provide a list of strings as `context` to help the agent solve the problem. If there are multiple contexts you need to answer the question, you can provide them as a list of strings.\n - `math` action will not see the output of the previous actions unless you provide it as `context`. You MUST provide the output of the previous actions as `context` if you need to do math on it.\n - You MUST NEVER provide `search` type action\'s outputs as a variable in the `problem` argument. This is because `search` returns a text blob that contains the information about the entity, not a number or value. Therefore, when you need to provide an output of `search` action, you MUST provide it as a `context` argument to `math` action. For example, 1. search("Barack Obama") and then 2. math("age of $1") is NEVER allowed. Use 2. math("age of Barack Obama", context=["$1"]) instead.\n - When you ask a question about `context`, specify the units. For instance, "what is xx in height?" or "what is xx in millions?" instead of "what is xx?"' args_schema=<class 'langchain_core.utils.pydantic.math'> func=<function get_math_tool.<locals>.calculate_expression at 0x11bed0fe0> {'problem': 'x ** 3', 'context': ['$1']}
---
join ()
---

任务获取单元

此组件调度任务。它接收以下格式的工具流

{
    tool: BaseTool,
    dependencies: number[],
}

基本思想是在满足工具的依赖项后立即开始执行。这是通过多线程完成的。我们将在下面组合任务获取单元和执行器

diagram

import re
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Dict, Iterable, List, Union

from langchain_core.runnables import (
    chain as as_runnable,
)
from typing_extensions import TypedDict


def _get_observations(messages: List[BaseMessage]) -> Dict[int, Any]:
    # Get all previous tool responses
    results = {}
    for message in messages[::-1]:
        if isinstance(message, FunctionMessage):
            results[int(message.additional_kwargs["idx"])] = message.content
    return results


class SchedulerInput(TypedDict):
    messages: List[BaseMessage]
    tasks: Iterable[Task]


def _execute_task(task, observations, config):
    tool_to_use = task["tool"]
    if isinstance(tool_to_use, str):
        return tool_to_use
    args = task["args"]
    try:
        if isinstance(args, str):
            resolved_args = _resolve_arg(args, observations)
        elif isinstance(args, dict):
            resolved_args = {
                key: _resolve_arg(val, observations) for key, val in args.items()
            }
        else:
            # This will likely fail
            resolved_args = args
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}.)"
            f" Args could not be resolved. Error: {repr(e)}"
        )
    try:
        return tool_to_use.invoke(resolved_args, config)
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}."
            + f" Args resolved to {resolved_args}. Error: {repr(e)})"
        )


def _resolve_arg(arg: Union[str, Any], observations: Dict[int, Any]):
    # $1 or ${1} -> 1
    ID_PATTERN = r"\$\{?(\d+)\}?"

    def replace_match(match):
        # If the string is ${123}, match.group(0) is ${123}, and match.group(1) is 123.

        # Return the match group, in this case the index, from the string. This is the index
        # number we get back.
        idx = int(match.group(1))
        return str(observations.get(idx, match.group(0)))

    # For dependencies on other tasks
    if isinstance(arg, str):
        return re.sub(ID_PATTERN, replace_match, arg)
    elif isinstance(arg, list):
        return [_resolve_arg(a, observations) for a in arg]
    else:
        return str(arg)


@as_runnable
def schedule_task(task_inputs, config):
    task: Task = task_inputs["task"]
    observations: Dict[int, Any] = task_inputs["observations"]
    try:
        observation = _execute_task(task, observations, config)
    except Exception:
        import traceback

        observation = traceback.format_exception()  # repr(e) +
    observations[task["idx"]] = observation


def schedule_pending_task(
    task: Task, observations: Dict[int, Any], retry_after: float = 0.2
):
    while True:
        deps = task["dependencies"]
        if deps and (any([dep not in observations for dep in deps])):
            # Dependencies not yet satisfied
            time.sleep(retry_after)
            continue
        schedule_task.invoke({"task": task, "observations": observations})
        break


@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
    """Group the tasks into a DAG schedule."""
    # For streaming, we are making a few simplifying assumption:
    # 1. The LLM does not create cyclic dependencies
    # 2. That the LLM will not generate tasks with future deps
    # If this ceases to be a good assumption, you can either
    # adjust to do a proper topological sort (not-stream)
    # or use a more complicated data structure
    tasks = scheduler_input["tasks"]
    args_for_tasks = {}
    messages = scheduler_input["messages"]
    # If we are re-planning, we may have calls that depend on previous
    # plans. Start with those.
    observations = _get_observations(messages)
    task_names = {}
    originals = set(observations)
    # ^^ We assume each task inserts a different key above to
    # avoid race conditions...
    futures = []
    retry_after = 0.25  # Retry every quarter second
    with ThreadPoolExecutor() as executor:
        for task in tasks:
            deps = task["dependencies"]
            task_names[task["idx"]] = (
                task["tool"] if isinstance(task["tool"], str) else task["tool"].name
            )
            args_for_tasks[task["idx"]] = task["args"]
            if (
                # Depends on other tasks
                deps and (any([dep not in observations for dep in deps]))
            ):
                futures.append(
                    executor.submit(
                        schedule_pending_task, task, observations, retry_after
                    )
                )
            else:
                # No deps or all deps satisfied
                # can schedule now
                schedule_task.invoke(dict(task=task, observations=observations))
                # futures.append(executor.submit(schedule_task.invoke, dict(task=task, observations=observations)))

        # All tasks have been submitted or enqueued
        # Wait for them to complete
        wait(futures)
    # Convert observations to new tool messages to add to the state
    new_observations = {
        k: (task_names[k], args_for_tasks[k], observations[k])
        for k in sorted(observations.keys() - originals)
    }
    tool_messages = [
        FunctionMessage(
            name=name,
            content=str(obs),
            additional_kwargs={"idx": k, "args": task_args},
            tool_call_id=k,
        )
        for k, (name, task_args, obs) in new_observations.items()
    ]
    return tool_messages

API 参考: chain

import itertools


@as_runnable
def plan_and_schedule(state):
    messages = state["messages"]
    tasks = planner.stream(messages)
    # Begin executing the planner immediately
    try:
        tasks = itertools.chain([next(tasks)], tasks)
    except StopIteration:
        # Handle the case where tasks is empty.
        tasks = iter([])
    scheduled_tasks = schedule_tasks.invoke(
        {
            "messages": messages,
            "tasks": tasks,
        }
    )
    return {"messages": scheduled_tasks}

示例计划

我们仍然没有在计算图中引入任何循环,因此所有这些都可以很容易地在 LCEL 中表达。

tool_messages = plan_and_schedule.invoke(
    {"messages": [HumanMessage(content=example_question)]}
)["messages"]
tool_messages
[FunctionMessage(content="[{'url': 'https://www.accuweather.com/en/us/san-francisco/94103/current-weather/347629', 'content': 'Get the latest weather information for San Francisco, CA, including temperature, wind, humidity, pressure, and UV index. See hourly, daily, and monthly forecasts, as ...'}]", additional_kwargs={'idx': 1, 'args': {'query': 'current temperature in San Francisco'}}, response_metadata={}, name='tavily_search_results_json', tool_call_id=1),
 FunctionMessage(content='ValueError(\'Failed to evaluate "No specific value for \\\'x\\\' provided.". Raised error: SyntaxError(\\\'invalid syntax\\\', (\\\'<expr>\\\', 1, 4, "No specific value for \\\'x\\\' provided.", 1, 12)). Please try again with a valid numerical expression\')', additional_kwargs={'idx': 2, 'args': {'problem': 'x^3', 'context': ['$1']}}, response_metadata={}, name='math', tool_call_id=2),
 FunctionMessage(content='join', additional_kwargs={'idx': 3, 'args': ()}, response_metadata={}, name='join', tool_call_id=3)]

连接器

现在我们已经完成了规划和初始执行。我们需要一个组件来处理这些输出,并

  1. 响应正确的答案。
  2. 循环执行新计划。

本文将此称为“连接器”。这是另一个 LLM 调用。我们正在使用函数调用来提高解析可靠性。

将 Pydantic 与 LangChain 一起使用

本笔记本使用 Pydantic v2 BaseModel,它需要 langchain-core >= 0.3。使用 langchain-core < 0.3 会因混合使用 Pydantic v1 和 v2 BaseModels 而导致错误。

from langchain_core.messages import AIMessage

from pydantic import BaseModel, Field


class FinalResponse(BaseModel):
    """The final response/answer."""

    response: str


class Replan(BaseModel):
    feedback: str = Field(
        description="Analysis of the previous attempts and recommendations on what needs to be fixed."
    )


class JoinOutputs(BaseModel):
    """Decide whether to replan or whether you can return the final response."""

    thought: str = Field(
        description="The chain of thought reasoning for the selected action"
    )
    action: Union[FinalResponse, Replan]


joiner_prompt = hub.pull("wfh/llm-compiler-joiner").partial(
    examples=""
)  # You can optionally add examples
llm = ChatOpenAI(model="gpt-4-turbo-preview")

runnable = joiner_prompt | llm.with_structured_output(
    JoinOutputs, method="function_calling"
)

API 参考: AIMessage

我们将仅选择状态中最新的消息,并格式化输出,使其对规划器更有用,以防 agent 需要循环。

def _parse_joiner_output(decision: JoinOutputs) -> List[BaseMessage]:
    response = [AIMessage(content=f"Thought: {decision.thought}")]
    if isinstance(decision.action, Replan):
        return {
            "messages": response
            + [
                SystemMessage(
                    content=f"Context from last attempt: {decision.action.feedback}"
                )
            ]
        }
    else:
        return {"messages": response + [AIMessage(content=decision.action.response)]}


def select_recent_messages(state) -> dict:
    messages = state["messages"]
    selected = []
    for msg in messages[::-1]:
        selected.append(msg)
        if isinstance(msg, HumanMessage):
            break
    return {"messages": selected[::-1]}


joiner = select_recent_messages | runnable | _parse_joiner_output
input_messages = [HumanMessage(content=example_question)] + tool_messages
joiner.invoke({"messages": input_messages})
{'messages': [AIMessage(content='Thought: Since the temperature in San Francisco was not provided, I cannot calculate its value raised to the 3rd power. The search result did not include specific temperature information, and the subsequent action to calculate the power raised the error due to lack of numerical input.', additional_kwargs={}, response_metadata={}),
  SystemMessage(content="Context from last attempt: To answer the user's question, we need the current temperature in San Francisco. Please include a step to find the current temperature in San Francisco and then calculate its value raised to the 3rd power.", additional_kwargs={}, response_metadata={})]}

使用 LangGraph 组合

我们将 agent 定义为一个有状态的图,主要节点包括

  1. 计划和执行(来自上述第一步的 DAG)
  2. 连接:确定我们是否应该完成或重新计划
  3. 重新上下文化:根据连接器的输出更新图状态
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from typing import Annotated


class State(TypedDict):
    messages: Annotated[list, add_messages]


graph_builder = StateGraph(State)

# 1.  Define vertices
# We defined plan_and_schedule above already
# Assign each node to a state variable to update
graph_builder.add_node("plan_and_schedule", plan_and_schedule)
graph_builder.add_node("join", joiner)


## Define edges
graph_builder.add_edge("plan_and_schedule", "join")

### This condition determines looping logic


def should_continue(state):
    messages = state["messages"]
    if isinstance(messages[-1], AIMessage):
        return END
    return "plan_and_schedule"


graph_builder.add_conditional_edges(
    "join",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)
graph_builder.add_edge(START, "plan_and_schedule")
chain = graph_builder.compile()

API 参考: END | StateGraph | START | add_messages

简单问题

让我们向 agent 提出一个简单的问题。

for step in chain.stream(
    {"messages": [HumanMessage(content="What's the GDP of New York?")]}
):
    print(step)
    print("---")
{'plan_and_schedule': {'messages': [FunctionMessage(content="[{'url': 'https://www.investopedia.com/articles/investing/011516/new-yorks-economy-6-industries-driving-gdp-growth.asp', 'content': 'The manufacturing sector is a leader in railroad rolling stock, as many of the earliest railroads were financed or founded in New York; garments, as New York City is the fashion capital of the U.S.; elevator parts; glass; and many other products.\\n Educational Services\\nThough not typically thought of as a leading industry, the educational sector in New York nonetheless has a substantial impact on the state and its residents, and in attracting new talent that eventually enters the New York business scene. New York has seen a large uptick in college attendees, both young and old, over the 21st century, and an increasing number of new employees in other New York sectors were educated in the state. New York City is the leading job hub for banking, finance, and communication in the U.S. New York is also a major manufacturing center and shipping port, and it has a thriving technological sector.\\n The state of New York has the third-largest economy in the United States with a gross domestic product (GDP) of $1.7 trillion, trailing only Texas and California.'}]", additional_kwargs={'idx': 1, 'args': {'query': 'GDP of New York'}}, response_metadata={}, name='tavily_search_results_json', tool_call_id=1)]}}
---
{'join': {'messages': [AIMessage(content='Thought: The search result provides the specific information requested. It states that the state of New York has the third-largest economy in the United States with a GDP of $1.7 trillion.', additional_kwargs={}, response_metadata={}, id='63af07a6-f931-43e9-8fdc-4f2b8c7b7663'), AIMessage(content='The GDP of New York is $1.7 trillion.', additional_kwargs={}, response_metadata={}, id='7cfc50e6-e041-4985-a5f4-ebf2e097826e')]}}
---

# Final answer
print(step["join"]["messages"][-1].content)
The GDP of New York is $1.7 trillion.

多跳问题

这个问题要求 agent 执行多次搜索。

steps = chain.stream(
    {
        "messages": [
            HumanMessage(
                content="What's the oldest parrot alive, and how much longer is that than the average?"
            )
        ]
    },
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
{'plan_and_schedule': {'messages': [FunctionMessage(content='[{\'url\': \'https://en.wikipedia.org/wiki/Cookie_(cockatoo)\', \'content\': \'He was one of the longest-lived birds on record[4] and was recognised by the Guinness World Records as the oldest living parrot in the world.[5]\\nThe next-oldest pink cockatoo to be found in a zoological setting was a 31-year-old female bird located at Paradise Wildlife Sanctuary, England.[3] Information published by the World Parrot Trust states longevity for Cookie\\\'s species in captivity is on average 40–60 years.[6]\\nLife[edit]\\nCookie was Brookfield Zoo\\\'s oldest resident and the last surviving member of the animal collection from the time of the zoo\\\'s opening in 1934, having arrived from Taronga Zoo of Sydney, New South Wales, Australia, in the same year and judged to be one year old at the time.[7]\\nIn the 1950s an attempt was made to introduce Cookie to a female pink cockatoo, but Cookie rejected her as "she was not nice to him".[8]\\n In 2007, Cookie was diagnosed with, and placed on medication and nutritional supplements for, osteoarthritis and osteoporosis\\xa0– medical conditions which occur commonly in aging animals and humans alike,[7] although it is believed that the latter may also have been brought on as a result of being fed a seed-only diet for the first 40 years of his life, in the years before the dietary requirements of his species were fully understood.[9]\\nCookie was "retired" from exhibition at the zoo in 2009 (following a few months of weekend-only appearances) in order to preserve his health, after it was noticed by staff that his appetite, demeanor and stress levels improved markedly when not on public display. age.[11] A memorial at the zoo was unveiled in September 2017.[12]\\nIn 2020, Cookie became the subject of a poetry collection by Barbara Gregorich entitled Cookie the Cockatoo: Everything Changes.[13]\\nSee also[edit]\\nReferences[edit]\\nExternal links[edit] He was believed to be the oldest member of his species alive in captivity, at the age of 82 in June 2015,[1]\[2] having significantly exceeded the average lifespan for his kind.[3] He was moved to a permanent residence in the keepers\\\' office of the zoo\\\'s Perching Bird House, although he made occasional appearances for special events, such as his birthday celebration, which was held each June.[3]\'}]', additional_kwargs={'idx': 1, 'args': {'query': 'oldest parrot alive'}}, response_metadata={}, name='tavily_search_results_json', tool_call_id=1), FunctionMessage(content="[{'url': 'https://www.birdzilla.com/learn/how-long-do-parrots-live/', 'content': 'In captivity, they can easily live to be ten or even 18 years of age. In general, most wild parrot species live only half the numbers of years they would live in captivity. For example, adopted African Gray Parrots might live to be 60, whereas wild birds have an average lifespan of 30 or 40 at the very most.'}]", additional_kwargs={'idx': 2, 'args': {'query': 'average lifespan of a parrot'}}, response_metadata={}, name='tavily_search_results_json', tool_call_id=2), FunctionMessage(content='join', additional_kwargs={'idx': 3, 'args': ()}, response_metadata={}, name='join', tool_call_id=3)]}}
---
{'join': {'messages': [AIMessage(content="Thought: The information from Wikipedia about Cookie, the cockatoo, indicates that he was recognized as the oldest living parrot, reaching the age of 82. This significantly exceeds the average lifespan for his species, which is noted to be 40-60 years in captivity. The information from Birdzilla provides a more general perspective on parrot lifespans, indicating that, in captivity, parrots can easily live to be ten or even 18 years of age, with some species like the African Gray Parrot potentially living up to 60 years. However, it does not provide a specific average lifespan for all parrot species, making it challenging to provide a precise comparison for Cookie's age beyond his species' average lifespan.", additional_kwargs={}, response_metadata={}, id='f00a464e-c273-42b9-8d1b-edd27bde8687'), AIMessage(content="Cookie the cockatoo was recognized as the oldest living parrot, reaching the age of 82, which is significantly beyond the average lifespan for his species, noted to be between 40-60 years in captivity. While general information for parrots suggests varying lifespans with some capable of living up to 60 years in captivity, Cookie's age far exceeded these averages, highlighting his exceptional longevity.", additional_kwargs={}, response_metadata={}, id='dc62a826-5528-446e-8797-6854abdeb94c')]}}
---

# Final answer
print(step["join"]["messages"][-1].content)
Cookie the cockatoo was recognized as the oldest living parrot, reaching the age of 82, which is significantly beyond the average lifespan for his species, noted to be between 40-60 years in captivity. While general information for parrots suggests varying lifespans with some capable of living up to 60 years in captivity, Cookie's age far exceeded these averages, highlighting his exceptional longevity.

多步骤数学

for step in chain.stream(
    {
        "messages": [
            HumanMessage(
                content="What's ((3*(4+5)/0.5)+3245) + 8? What's 32/4.23? What's the sum of those two values?"
            )
        ]
    }
):
    print(step)
{'plan_and_schedule': {'messages': [FunctionMessage(content='3307.0', additional_kwargs={'idx': 1, 'args': {'problem': '((3*(4+5)/0.5)+3245) + 8'}}, response_metadata={}, name='math', tool_call_id=1), FunctionMessage(content='7.565011820330969', additional_kwargs={'idx': 2, 'args': {'problem': '32/4.23'}}, response_metadata={}, name='math', tool_call_id=2), FunctionMessage(content='join', additional_kwargs={'idx': 3, 'args': ()}, response_metadata={}, name='join', tool_call_id=3)]}}
{'join': {'messages': [AIMessage(content="Thought: The calculations for both the expressions provided by the user have been successfully completed, with the results being 3307.0 for the first expression and 7.565011820330969 for the second. Therefore, we have all the necessary information to answer the user's question.", additional_kwargs={}, response_metadata={}, id='2dd394b3-468a-4abc-b7d2-02f7b803a8b6'), AIMessage(content='The result of the first calculation ((3*(4+5)/0.5)+3245) + 8 is 3307.0, and the result of the second calculation (32/4.23) is approximately 7.57. The sum of those two values is 3307.0 + 7.57 = approximately 3314.57.', additional_kwargs={}, response_metadata={}, id='83eb8e01-7a0a-4f79-8475-fad5bc83e645')]}}

# Final answer
print(step["join"]["messages"][-1].content)
The result of the first calculation ((3*(4+5)/0.5)+3245) + 8 is 3307.0, and the result of the second calculation (32/4.23) is approximately 7.57. The sum of those two values is 3307.0 + 7.57 = approximately 3314.57.

复杂重规划示例

这个问题很可能会提示重新计划功能,但可能需要多次运行才能看到实际效果。

for step in chain.stream(
    {
        "messages": [
            HumanMessage(
                content="Find the current temperature in Tokyo, then, respond with a flashcard summarizing this information"
            )
        ]
    }
):
    print(step)
{'plan_and_schedule': {'messages': [FunctionMessage(content="[{'url': 'https://www.timeanddate.com/weather/japan/tokyo/ext', 'content': 'Tokyo 14 Day Extended Forecast. Weather Today Weather Hourly 14 Day Forecast Yesterday/Past Weather Climate (Averages) Currently: 84 °F. Partly sunny. (Weather station: Tokyo, Japan). See more current weather.'}]", additional_kwargs={'idx': 1, 'args': {'query': 'current temperature in Tokyo'}}, response_metadata={}, name='tavily_search_results_json', tool_call_id=1), FunctionMessage(content='join', additional_kwargs={'idx': 2, 'args': ()}, response_metadata={}, name='join', tool_call_id=2)]}}
{'join': {'messages': [AIMessage(content='Thought: The extracted information provides the current temperature in Tokyo, which is 84 °F and describes the weather as partly sunny. This information is sufficient to create a flashcard summary for the user.', additional_kwargs={}, response_metadata={}, id='e9a1af40-ca06-4eb8-b4bb-24429cf8c689'), AIMessage(content='**Flashcard: Current Temperature in Tokyo**\n\n- **Temperature:** 84 °F\n- **Weather Conditions:** Partly sunny\n\n*Note: This information is based on the latest available data and may change.*', additional_kwargs={}, response_metadata={}, id='92bb42bc-e9b9-4b98-8936-8f74ff111504')]}}

结论

恭喜您构建了您的第一个 LLMCompiler agent!我将为您留下上述实现的一些已知限制

  1. 如果您的函数需要超过 1 或 2 个参数,则规划器输出解析格式是脆弱的。我们可以通过使用流式工具调用使其更健壮。
  2. 在上面的示例中,变量替换是脆弱的。通过使用微调模型和更健壮的语法(例如,使用 Lark 或工具调用模式)可以使其更健壮
  3. 如果您需要多次重新计划运行,则状态可能会变得很长。为了处理这个问题,一旦您超过某个令牌限制,您可以添加一个消息压缩器。

评论