算法竞赛¶
在本教程中,你将构建一个计算机奥林匹克智能体,该智能体利用三种互补技术来提升性能:反思、检索和人在回路协作。这些技术和数据均改编自 Quan Shi、Michael Tang、Karthik Narasimhan 和 Shunyu Yao 的论文《语言模型能解决奥林匹克编程问题吗?》。你可以在以下链接查看他们的论文
你将构建一个智能体图,它能够回答难度不断增加的编程问题。
- 反思:在第1部分中,你将创建一个零样本工具调用智能体,并提示它反思测试用例结果,以纠正其初始错误。这类似于论文中报告的在美国计算奥林匹克竞赛(USACO)基准测试中通过率为12.38的智能体。
- 检索:在第2部分中,你将实现一个初始检索步骤,作为智能体的“情景记忆”,从我们的编程问题语料库中检索高质量的少样本示例,以帮助解决青铜级别的问题。该智能体类似于论文中基准测试通过率为20.2的智能体。
- 人在回路:在第3部分中,你将使用
interrupt_after
让用户与智能体协同工作,以获得更好的答案。此时的基准性能仅受与之配对的人类竞争力的限制。
你的最终智能体图将结构如下面的图表所示
第1部分和第2部分分别对应于论文中基准测试通过率为12.38和20.2的系统。
虽然大型语言模型(LLMs)目前还不能自主解决所有这些问题,但我们可以设计一个系统,其解决这些问题的能力远远超过基本的ReAct智能体。
在深入研究之前,让我们先设置好机器。这包括安装依赖项、获取数据集以及定义一个实用函数。
设置¶
对于本教程,我们需要安装一些依赖项,获取奥林匹克竞赛数据集,并定义一个实用函数来帮助运行候选解决方案,以查看它们是否通过测试用例。
首先,让我们安装所需的软件包并设置API密钥
import getpass
import os
def _get_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_get_env("ANTHROPIC_API_KEY")
为 LangGraph 开发设置 LangSmith
注册 LangSmith,快速发现问题并提高你的 LangGraph 项目性能。LangSmith 允许你使用跟踪数据来调试、测试和监控你使用 LangGraph 构建的 LLM 应用 — 在此处了解更多入门信息。
数据¶
使用下面的实用工具获取 USACO 基准测试数据
import os
import zipfile
import datasets
import requests
usaco_url = "https://storage.googleapis.com/benchmarks-artifacts/usaco/usaco_sampled_with_tests.zip"
zip_path = "usaco.zip"
extract_path = "usaco_datasets"
response = requests.get(usaco_url)
with open(zip_path, "wb") as file:
file.write(response.content)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
os.remove(zip_path)
ds = datasets.load_from_disk(os.path.join(extract_path, "usaco_v3_sampled_with_tests"))
测试评估工具¶
我们还需要一种方法来评估我们生成的代码。我们将使用这个不安全的代码执行程序,对照我们的测试用例运行生成的代码。注意:下面的代码将在你的本地机器上运行任意代码!请谨慎操作。
import multiprocessing
import queue
import subprocess
import sys
import time
import traceback
multiprocessing.set_start_method("fork", force=True)
# WARNING
# This program exists to execute untrusted model-generated code. Although
# it is highly unlikely that model-generated code will do something overtly
# malicious in response to this test suite, model-generated code may act
# destructively due to a lack of model capability or alignment.
# Users are strongly encouraged to sandbox this evaluation suite so that it
# does not perform destructive actions on their host or network.
# Proceed at your own risk:
def exec_program(q, program, input_data, expected_output, timeout):
try:
start_time = time.time()
process = subprocess.Popen(
[sys.executable, "-c", program],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = process.communicate(input=input_data, timeout=timeout)
if time.time() - start_time > timeout:
raise TimeoutError("Execution timed out.")
if process.returncode != 0:
q.put(f"failed: {stderr}")
else:
if stdout.strip() == expected_output.strip():
q.put("passed")
else:
q.put(f"wrong answer. Expected '{expected_output}', got '{stdout}'")
except subprocess.TimeoutExpired:
process.kill()
q.put("timed out")
except Exception:
q.put(f"failed: {traceback.format_exc()}")
def check_correctness(
program: str, input_data: str, expected_output: str, timeout: float
) -> str:
q = multiprocessing.Queue()
process = multiprocessing.Process(
target=exec_program, args=(q, program, input_data, expected_output, timeout)
)
process.start()
process.join(timeout=timeout + 1)
if process.is_alive():
process.terminate()
process.join()
result = "timed out"
else:
try:
result = q.get_nowait()
except queue.Empty:
result = "no result returned"
return result
让我们检查一个示例程序和输出,看看它是如何工作的
program_code = "print('hello, world!')"
input_data = ""
expected_output = "hello, world!"
timeout = 2
test_result = check_correctness(program_code, input_data, expected_output, timeout)
print("Example 1: ", test_result)
test_result = check_correctness("print('goodbye')", input_data, "hi there", timeout)
print("Example 2: ", test_result)
第1部分:零样本与反思¶
在第一部分,我们将构建一个简单的零样本工具调用智能体来尝试解决这些问题。我们将通过添加一个“推理”(reasoning)字段,直接在智能体的工具调用模式中整合一种简单的反思形式。此外,Claude 在调用任何工具之前接受了使用自由文本进行“推理”的训练。总的来说,这应该会引导出具有反思性的“思维链”(chain-of-thought)提示。
注意:这与论文的实现有些不同,论文使用了显式的反思步骤以及 Reflexion 提示的变体。
到本节结束时,我们将构建一个反思性零样本编程智能体,其结构如下面系统图中标注“Part 1”的部分所示
状态¶
LangGraph 的主要原语是 StateGraph
,你使用它来将智能体定义为一个可控的状态机。图包含执行工作的 node
(Python 函数)和定义如何在节点之间路由的 edge
。State
定义了每个节点之间的接口,并携带智能体所需的所有信息。
下面,为我们的编程奥林匹克智能体定义一个 State
。messages
将作为聊天历史记录跟踪提交序列(以及测试用例反馈)。如果提交通过所有测试用例,status
字段将从 in_progress
变为 success
。其他字段(test_cases, runtime_limit)由 evaluation
节点用于测试智能体的提交。智能体本身看不到这些值。
API 参考:add_messages
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class TestCase(TypedDict):
inputs: str
outputs: str
class State(TypedDict):
# Append-only chat memory so the agent can try to recover from initial mistakes.
messages: Annotated[list[AnyMessage], add_messages]
# From the dataset. These are used for testing.
test_cases: list[TestCase]
runtime_limit: int
status: str
现在,将数据集转换为我们的图将接受的输入。
input_states = [
{
"messages": [("user", row["description"])],
"test_cases": row["test_cases"],
"runtime_limit": row["runtime_limit"],
"status": "in_progress",
"problem_level": row["problem_level"],
}
for row in ds
]
节点 1:求解器¶
创建一个 solver
节点,提示 LLM“智能体”使用 writePython 工具生成提交的代码。
在 LangChain 中使用 Pydantic
本 notebook 使用 Pydantic v2 BaseModel
,需要 langchain-core >= 0.3
。使用 langchain-core < 0.3
将导致 Pydantic v1 和 v2 BaseModel
混用而产生错误。
API 参考:BaseChatModel | ChatPromptTemplate
from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
class writePython(BaseModel):
"""Write python code that resolves the problem."""
reasoning: str = Field(..., description="Conceptual solution.")
pseudocode: str = Field(..., description="Detailed English pseudocode.")
code: str = Field(..., description="Valid Python 3 solution to the problem")
class Solver:
def __init__(self, llm: BaseChatModel, prompt: ChatPromptTemplate):
self.runnable = prompt | llm.bind_tools([writePython])
def __call__(self, state: State) -> dict:
# Our agent only can see the "messages" and will ignore the test info
return {"messages": [self.runnable.invoke({"messages": state["messages"]})]}
现在,在下方创建求解器。我们将使用 Claude Opus
API 参考:ChatAnthropic
from langchain import hub
from langchain_anthropic import ChatAnthropic
# For this section, we are testing zero-shot performance and won't have
# any examples. Partial them out to pre-fill the template.
prompt = hub.pull("wfh/usaco-draft-solver").partial(examples="")
print("*" * 35 + "Prompt" + "*" * 35)
prompt.pretty_print()
# Use Haiku if you want to save $$ while (almost) never correctly answering the question
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-opus-20240229")
solver = Solver(llm, prompt)
***********************************Prompt***********************************
================================ System Message ================================
You are a world-class competitive programmer.
Please reply with a Python 3 solution to the problem below.
First, reason through the problem and conceptualize a solution.
Then write detailed pseudocode to uncover any potential logical errors or omissions.
Finally output the working Python code for your solution, ensuring to fix any errors uncovered while writing pseudocode.
No outside libraries are allowed.{examples}
============================= Messages Placeholder =============================
{messages}
print("*" * 34 + " Example " + "*" * 34)
result = solver(
{
"messages": [
(
"user",
"How do I get a perfectly random sample from an infinite stream",
)
]
}
)
result["messages"][0].pretty_print()
# Could expand to include (1)
# 1. Restate the problem in plain English
# 2. Closely following the explanation, restate and explain the solution in plain English
# 3. Write a pseudocode solution
# 4. Output the final Python solution with your solution steps in comments.
********************************** Example **********************************
================================== Ai Message ==================================
[{'text': "<thinking>\nTo address this problem, we need to use the writePython function, which requires the following parameters:\n- reasoning: a conceptual solution to the problem\n- pseudocode: detailed pseudocode for the solution\n- code: working Python code implementing the solution\n\nThe key aspects to address in the solution are:\n1. We have an infinite stream, so we can't store all elements. Need an online algorithm.\n2. Need to ensure each element has an equal probability of being in the final sample.\n\nI believe I have enough information to provide values for all the required parameters.\n</thinking>", 'type': 'text'}, {'id': 'toolu_01UqpLYyueky5GtYMidS9oLF', 'input': {'reasoning': 'To get a perfectly random sample of size k from an infinite stream:\n\n1. Store the first k elements in an array (reservoir). \n2. For each ith element after the kth element (i > k):\n - Generate a random integer j between 0 and i (inclusive)\n - If j < k, replace the jth element of the reservoir with the ith element\n3. At the end, the reservoir contains the random sample.\n\nThis works because for any element, when we process the nth element, the probability that it is in the reservoir is:\n- k/n when n <= k (first k elements always selected)\n- k/n * k/(n-1) * k/(n-2) * ... * k/(k+1) = k/n when n > k\n\nSo any element has k/n probability of being in final reservoir, giving a perfectly random sample.', 'pseudocode': '\`\`\`\nfunction selectKItems(stream, k):\n reservoir = [0..k-1] # store first k elements\n\n i = k\n while stream has next item:\n item = stream.next()\n j = random(0, i) # generate random index between 0 and i\n if j < k:\n reservoir[j] = item # replace element at random index with new item\n i += 1\n\n return reservoir\n\`\`\`', 'code': 'import random\n\ndef reservoir_sampling(stream, k):\n reservoir = []\n \n # Store first k elements in reservoir\n for i in range(k):\n reservoir.append(next(stream))\n\n i = k\n for item in stream:\n # Generate random index between 0 and i\n j = random.randint(0, i) \n \n # Replace element at random index with new item\n if j < k:\n reservoir[j] = item\n i += 1\n\n return reservoir'}, 'name': 'writePython', 'type': 'tool_use'}]
节点 2:评估¶
现在定义“evaluate
”节点。该节点接收 solver
提交的代码,并对照我们的 State
中的 test_cases
执行该代码。这里使用了我们在上面设置中定义的不安全的 check_correctness
工具。
API 参考:AIMessage | HumanMessage | ToolMessage
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
# This is the node we will add to the graph.
# Most tool-calling APIs require that the `ToolMessage` contain the ID
# of the
def format_tool_message(response: str, ai_message: AIMessage):
return ToolMessage(
content=response + "\nMake all fixes using the writePython tool.",
tool_call_id=ai_message.tool_calls[0]["id"],
)
def evaluate(state: State):
test_cases = state["test_cases"]
ai_message: AIMessage = state["messages"][-1]
if not ai_message.tool_calls:
return {
"messages": [
HumanMessage(
content="No code submitted. Please try again using the correct python code."
)
]
}
try:
code = ai_message.tool_calls[0]["args"]["code"]
except Exception as e:
return {"messages": [format_tool_message(repr(e), ai_message)]}
num_test_cases = len(test_cases)
succeeded = 0
test_results = []
# TODO: Multiprocess
for test_case in test_cases:
input_data = test_case["inputs"]
expected_output = test_case["outputs"]
test_result = check_correctness(code, input_data, expected_output, timeout)
test_results.append(test_result)
if test_result == "passed":
succeeded += 1
pass_rate = succeeded / num_test_cases if num_test_cases else "N/A"
if pass_rate == 1:
return {"status": "success"}
responses = "\n".join(
[f"<test id={i}>\n{r}\n</test>" for i, r in enumerate(test_results)]
)
response = f"Incorrect submission. Please respond with updated code.\nPass rate: {succeeded}/{num_test_cases}\nResults:\n{responses}"
formatted_message = format_tool_message(response, ai_message)
return {"messages": [formatted_message]}
创建图¶
现在,将它们整合起来!一旦你定义了每个节点,定义连接性/状态转换就相当容易了。
我们的零样本图定义了一个循环。如果我们可视化数据流,我们希望逻辑是:1. 首先转到 solver
节点,它尝试提出第一个解决方案。2. 接下来转到 evaluate
节点,它测试该解决方案。3. 如果解决方案通过,则结束;否则,返回到 solver
节点再次尝试。
在 LangGraph 中,我们使用 conditional_edges
定义包含条件逻辑的状态转换。在下方,定义图,添加一个 control_edge
来处理上面的步骤 (3)。
API 参考:END | StateGraph | START
from langgraph.graph import END, StateGraph, START
builder = StateGraph(State)
builder.add_node("solver", solver)
builder.add_edge(START, "solver")
builder.add_node("evaluate", evaluate)
builder.add_edge("solver", "evaluate")
def control_edge(state: State):
if state.get("status") == "success":
return END
return "solver"
builder.add_conditional_edges("evaluate", control_edge, {END: END, "solver": "solver"})
graph = builder.compile()
from IPython.display import Image, display
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
现在我们已经创建了图,让我们看看它需要解决什么类型的问题。
input_state = input_states[0].copy()
# We will reduce the test cases to speed this notebook up
input_state["test_cases"] = input_state["test_cases"][:3]
print(input_state["messages"][0][1])
Farmer John has $N$ ($1 \leq N \leq 2 \cdot 10^5$) farms, numbered from $1$ to
$N$. It is known that FJ closes farm $i$ at time $c_i$. Bessie wakes up at time
$S$, and wants to maximize the productivity of her day by visiting as many farms
as possible before they close. She plans to visit farm $i$ on time $t_i + S$.
Bessie must arrive at a farm strictly before Farmer John closes it to actually visit it.
Bessie has $Q$ $(1 \leq Q \leq 2 \cdot 10^5)$ queries. For each query, she gives
you two integers $S$ and $V$. For each query, output whether Bessie can visit at
least $V$ farms if she wakes up at time $S$.
INPUT FORMAT (input arrives from the terminal / stdin):
The first line consists of $N$ and $Q$.
The second line consists of $c_1, c_2, c_3 \dots c_N$ ($1 \leq c_i \leq 10^6$).
The third line consists of $t_1, t_2, t_3 \dots t_N$ ($1 \leq t_i \leq 10^6$).
The next $Q$ lines each consist of two integers $V$ ($1 \leq V \leq N$) and $S$
($1 \leq S \leq 10^6$).
OUTPUT FORMAT (print output to the terminal / stdout):
For each of the $Q$ queries, output YES or NO on a new line.
SAMPLE INPUT:
5 5
3 5 7 9 12
4 2 3 3 8
1 5
1 6
3 3
4 2
5 1
SAMPLE OUTPUT:
YES
NO
YES
YES
NO
For the first query, Bessie will visit the farms at time $t = [9, 7, 8, 8, 13]$,
so she will only get to visit farm $4$ on time before FJ closes the farm.
For the second query, Bessie will not be able to visit any of the farms on time.
For the third query, Bessie will visit farms $3, 4, 5$ on time.
For the fourth and fifth queries, Bessie will be able to visit all but the first
farm on time.
SCORING:
Inputs 2-4: $N,Q\le 10^3$Inputs 5-9: $c_i, t_i \le 20$Inputs 10-17: No additional constraints.
Problem credits: Chongtian Ma
hide_inputs
”并过滤掉 test_cases。所有这些都是可选的,但对开发有用。
注意:我们期望在这里看到 GraphRecursionError,因为它无法在分配的步数内正确回答问题。
API 参考:tracing_v2_enabled
from langchain_core.tracers.context import tracing_v2_enabled
from langsmith import Client
# We don't need to include all the test cases in our traces.
def _hide_test_cases(inputs):
copied = inputs.copy()
# These are tens of MB in size. No need to send them up
copied["test_cases"] = "..."
return copied
client = Client(hide_inputs=_hide_test_cases, hide_outputs=_hide_test_cases)
with tracing_v2_enabled(client=client):
events = graph.stream(input_state)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
Assistant: [{'text': '<thinking>\nThe key steps to solve this
Assistant: KeyError('code')\nMake all fixes using the writePy
Assistant: [{'id': 'toolu_01KimhKt8aqQjGZJmrHVnAtE', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01CMZTqAd7BZQ2nSgtk9djRW', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01Kbaq9gX4BnHvps6TMfVGHL', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01MiSnpiGK5Yy4Cpp6GGbjmT', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01GWuvJezXLMVurUBG84odDP', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01W8DGmhcpFVctySmx58scf9', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_018bhYtCKDK6S4MHiAxUZCrb', 'input':
Assistant: KeyError('code')\nMake all fixes using the writePy
Assistant: [{'id': 'toolu_01LCwaCjX9uZBV3jt9eAkmAa', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01WqJvdE2WDeTZXoKp2V7PWb', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_01DGevkunt9zWx7SVDCHdBuv', 'input':
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'id': 'toolu_013comYKVxNSzTM4ZbH3L3FP', 'input':
Assistant: Incorrect submission. Please respond with updated
---------------------------------------------------------------------------
``````output
GraphRecursionError Traceback (most recent call last)
``````output
Cell In[25], line 17
15 with tracing_v2_enabled(client=client):
16 events = graph.stream(input_state)
---> 17 for event in events:
18 for value in event.values():
19 messages = value.get("messages")
``````output
File ~/.pyenv/versions/3.11.2/lib/python3.11/site-packages/langgraph/pregel/__init__.py:645, in Pregel.stream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before_nodes, interrupt_after_nodes, debug)
643 break
644 elif step == config["recursion_limit"]:
--> 645 raise GraphRecursionError(
646 f"Recursion limit of {config['recursion_limit']} reached"
647 "without hitting a stop condition. You can increase the "
648 "limit by setting the `recursion_limit` config key."
649 )
651 # before execution, check if we should interrupt
652 if _should_interrupt(
653 checkpoint,
654 interrupt_before_nodes,
655 self.stream_channels_list,
656 next_tasks,
657 ):
``````output
GraphRecursionError: Recursion limit of 25 reachedwithout hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
它没能及时解决 但这没关系!如果这很容易,这篇论文就会短很多 :)
你可以在提供的链接查看智能体的完整 LangSmith 追踪记录。
在下一节中,我们将添加论文中称为“情景记忆”(episodic memory)的改进,这实际上是少样本检索。
第2部分:少样本检索¶
即使使用了反思性工具调用,我们第1部分的基线智能体在应对这项困难任务时仍然力不从心。一种“教导”LLM 如何更好地执行任务的方法是通过演示,也称为“少样本示例”。
USACO 论文作者所称的“情景记忆”实际上只是在相似示例上进行少样本提示。
在本例中,每个示例都是数据集中不同的问题+解决方案。如果你假设你的智能体已经“解决”了这些问题并正在回忆其解决方案,那么“情景记忆”这个术语就说得通了。
本节添加了下图“Part 2”中的“情景记忆”组件。
注意,这个记忆步骤仅执行一次,在第1部分的零样本循环逻辑之前。步骤如下:
- 提示 LLM 生成一个候选解决方案。
- 使用候选解决方案的文本检索 N 个最相似的(问题,解决方案)对。
- 将此结果格式化到零样本智能体的提示中。
下面,我们将情景记忆实现为一个检索器。我们将遵循论文的检索器选择,使用 BM25。
状态¶
状态大部分沿用了第1部分的内容。增加了额外的“candidate”和“examples”字段,用于存储记忆步骤的信息。
API 参考:add_messages
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class TestCase(TypedDict):
inputs: str
outputs: str
class State(TypedDict):
# NEW! Candidate for retrieval + formatted fetched examples as "memory"
candidate: AIMessage
examples: str
# Repeated from Part 1
messages: Annotated[list[AnyMessage], add_messages]
test_cases: list[TestCase]
runtime_limit: int
status: str
节点 1 和 3:草稿与求解器¶
让我们创建我们的“智能体”。我们将修改第1部分中的 Solver
,将其重用于智能体节点和候选程序生成节点(“草稿”)。
API 参考:ChatAnthropic
from langchain import hub
from langchain_anthropic import ChatAnthropic
class Solver:
def __init__(self, llm: BaseChatModel, prompt: ChatPromptTemplate):
self.runnable = prompt | llm.bind_tools([writePython])
def __call__(self, state: State) -> dict:
# Our agent only can see the "messages" and will ignore the test info
inputs = {"messages": state["messages"]}
has_examples = bool(state.get("examples"))
output_key = "candidate" # Used in the draft node
if has_examples:
output_key = "messages"
# Used in the solve node
inputs["examples"] = state["examples"]
response = self.runnable.invoke(inputs)
if not response.content:
return {
output_key: AIMessage(
content="I'll need to think about this step by step."
)
}
return {output_key: response}
prompt = hub.pull("wfh/usaco-draft-solver")
llm = ChatAnthropic(model="claude-3-opus-20240229")
draft_solver = Solver(llm, prompt.partial(examples=""))
solver = Solver(llm, prompt)
节点 2:检索¶
检索节点接收一个候选解决方案(由“求解器”节点生成),使用这个来搜索相似示例,然后将这些示例格式化到消息中。
# We will test our agent on index 0 (the same as above).
# Later, we will test on index 2 (the first 'silver difficulty' question)
test_indices = [0, 2]
train_ds = [row for i, row in enumerate(ds) if i not in test_indices]
test_ds = [row for i, row in enumerate(ds) if i in test_indices]
API 参考:BM25Retriever
from langchain_community.retrievers import BM25Retriever
def format_example(row):
question = row["description"]
answer = row["solution"]
return f"""<problem>
{question}
</problem>
<solution>
{answer}
</solution>"""
# Skip our 'test examples' to avoid cheating
# This is "simulating" having seen other in-context examples
retriever = BM25Retriever.from_texts([format_example(row) for row in train_ds])
现在定义节点。任何节点都可以选择接受第二个位置参数 config
。它包含在调用图时可以调整的 configurable
参数。例如,我们可以调整为我们的智能体检索的顶部 k
个示例。
API 参考:RunnableConfig
from langchain_core.runnables import RunnableConfig
def retrieve_examples(state: State, config: RunnableConfig):
top_k = config["configurable"].get("k") or 2
ai_message: AIMessage = state["candidate"]
if not ai_message.tool_calls:
# We err here. To make more robust, you could loop back
raise ValueError("Draft agent did not produce a valid code block")
code = ai_message.tool_calls[0]["args"]["code"]
examples_str = "\n".join(
[doc.page_content for doc in retriever.invoke(code)[:top_k]]
)
examples_str = f"""
You previously solved the following problems in this competition:
<Examples>
{examples_str}
<Examples>
Approach this new question with similar sophistication."""
return {"examples": examples_str}
图¶
现在让我们把它们整合起来。这个图比第1部分稍微复杂一些,因为我们需要在智能体循环中添加初始的“草稿”和“检索”节点。
API 参考:MemorySaver | END | StateGraph | START
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
builder = StateGraph(State)
builder.add_node("draft", draft_solver)
builder.add_edge(START, "draft")
builder.add_node("retrieve", retrieve_examples)
builder.add_node("solve", solver)
builder.add_node("evaluate", evaluate)
# Add connectivity
builder.add_edge("draft", "retrieve")
builder.add_edge("retrieve", "solve")
builder.add_edge("solve", "evaluate")
def control_edge(state: State):
if state.get("status") == "success":
return END
return "solve"
builder.add_conditional_edges("evaluate", control_edge, {END: END, "solve": "solve"})
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
from IPython.display import Image, display
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
让我们再次尝试这个问题
config = {"configurable": {"thread_id": "question-recall", "k": 3}}
with tracing_v2_enabled(client=client):
events = graph.stream(input_state, config)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
elif value.get("examples"):
print("Retrieved examples:\n\n", value["examples"][:100] + "...")
elif value.get("candidate"):
print(str(value["candidate"].content)[:200])
[{'text': "<thinking>\nThis problem essentially asks to find the number of farms Bessie can visit before they close at each query. The key insights are:\n\n1. Bessie's arrival time at each farm is S +
Retrieved examples:
You previously solved the following problems in this competition:
<Examples>
<problem>
Farmer John...
Assistant: [{'text': "<thinking>\nThe key information given i
恭喜!你为你的智能体添加了“情景记忆”,以获取少样本示例并解决了这个青铜级别的计算机奥林匹克竞赛问题!
然而,我们的智能体仍然有限。让我们在一个更具挑战性的 🪙🏆白银✨ 级别问题上测试一下它
silver_input = {
"messages": [("user", silver_row["description"])],
"test_cases": silver_row["test_cases"],
"runtime_limit": silver_row["runtime_limit"],
"status": "in_progress",
}
config = {"configurable": {"thread_id": "silver-question-1", "k": 2}}
with tracing_v2_enabled(client=client):
events = graph.stream(silver_input, config)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
elif value.get("examples"):
print("Retrieved examples:\n\n", value["examples"][:100] + "...")
elif value.get("candidate"):
print(str(value["candidate"].content)[:200])
[{'text': "<thinking>\nThe relevant tool for this problem is writePython. It requires the following parameters:\n- reasoning: To solve this problem, we need to simulate the cruise by following the seq
Retrieved examples:
You previously solved the following problems in this competition:
<Examples>
<problem>
Farmer John...
Assistant: [{'text': "<thinking>\nTo solve this problem, we n
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nAfter reviewing the failed
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nAfter reviewing the latest
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nOops, looks like I made a s
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nHmm, some of the test cases
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': '<thinking>\nOops, looks like I accident
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nLooks like the code is now
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': '<thinking>\nOops, looks like I accident
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nHmm, the optimization to si
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nOops, I did it again - acci
Assistant: Incorrect submission. Please respond with updated
Assistant: [{'text': "<thinking>\nHmm, the latest code is sti
Assistant: Incorrect submission. Please respond with updated
---------------------------------------------------------------------------
``````output
GraphRecursionError Traceback (most recent call last)
``````output
Cell In[37], line 12
10 with tracing_v2_enabled(client=client):
11 events = graph.stream(silver_input, config)
---> 12 for event in events:
13 for value in event.values():
14 messages = value.get("messages")
``````output
File ~/.pyenv/versions/3.11.2/lib/python3.11/site-packages/langgraph/pregel/__init__.py:645, in Pregel.stream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before_nodes, interrupt_after_nodes, debug)
643 break
644 elif step == config["recursion_limit"]:
--> 645 raise GraphRecursionError(
646 f"Recursion limit of {config['recursion_limit']} reached"
647 "without hitting a stop condition. You can increase the "
648 "limit by setting the `recursion_limit` config key."
649 )
651 # before execution, check if we should interrupt
652 if _should_interrupt(
653 checkpoint,
654 interrupt_before_nodes,
655 self.stream_channels_list,
656 next_tasks,
657 ):
``````output
GraphRecursionError: Recursion limit of 25 reachedwithout hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
仍然太难了!尚未实现通用人工智能(AGI)。要详细调查我们智能体的轨迹,请查看完整的 LangSmith 追踪记录。
我们的智能体还没有足够强大到可以自主。LangGraph 的优点在于你无需在“自主智能体”和“简单 DAG”之间做出选择:你可以在任何对你的应用有益的地方注入控制和用户界面。
第3部分:人在回路¶
我们的检索增强智能体能够解决 bronze
级别的问题,但对于更具挑战性的 silver 难度问题仍然失败。
回想一下,论文提出了3种互补的技术,它们提高了性能
- 反思:明确提示 LLM “反思”其错误有助于它
- 少样本提示:检索相关的、高质量的示例作为“记忆”
- 人在回路协作:在不提供正确答案的情况下,允许人类帮助智能体反思其方法并将其指向更好的方向。
在本节中,我们将添加“人类”节点(如下图中标记为“part 3”),完成我们的智能体图
从机器学习的角度来看,这有点像 聪明的汉斯,但从应用设计师的角度来看,其主要目标是实现更高的综合成功率,让人类插入思考和见解是很自然的。
无论如何,向 LangGraph 实例添加人工检查不需要额外的代码行。我们可以通过指示图在“evaluate
”节点之后使用 interrupt_after
来实现,从而让用户有机会修改轨迹。
在下方开始组装你的图。以下部分与我们在第2部分的应用相同
API 参考:MemorySaver | END | StateGraph | START
# This is all the same as before
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
builder = StateGraph(State)
prompt = hub.pull("wfh/usaco-draft-solver")
llm = ChatAnthropic(model="claude-3-opus-20240229", max_tokens_to_sample=4000)
draft_solver = Solver(llm, prompt.partial(examples=""))
builder.add_node("draft", draft_solver)
builder.add_edge(START, "draft")
builder.add_node("retrieve", retrieve_examples)
solver = Solver(llm, prompt)
builder.add_node("solve", solver)
builder.add_node("evaluate", evaluate)
builder.add_edge("draft", "retrieve")
builder.add_edge("retrieve", "solve")
builder.add_edge("solve", "evaluate")
def control_edge(state: State):
if state.get("status") == "success":
return END
return "solve"
builder.add_conditional_edges("evaluate", control_edge, {END: END, "solve": "solve"})
checkpointer = MemorySaver()
现在通过编译图来完成。设置interrupt_after=["evaluate"]
,指示智能体在继续执行之前等待人类输入。
graph = builder.compile(
checkpointer=checkpointer,
# New: this tells the graph to break any time it goes to the "human" node
interrupt_after=["evaluate"],
)
from IPython.display import Image, display
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
正如你在上图中看到的,结构与第2部分相同,只是我们在“evaluate
”和“solve
”节点之间插入了一个“human
”断点。
让我们再次尝试这个问题!
config = {"configurable": {"thread_id": "silver-hl-1", "k": 2}}
with tracing_v2_enabled(client=client):
events = graph.stream(silver_input, config)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
elif value.get("examples"):
print("Retrieved examples:\n\n", value["examples"][:100] + "...")
elif value.get("candidate"):
print(str(value["candidate"].content)[:200])
[{'text': "<thinking>\nTo solve this problem, we need to:\n1. Read in the input data - number of ports N, length of direction sequence M, number of repetitions K, the port connections, and the directi
Retrieved examples:
You previously solved the following problems in this competition:
<Examples>
<problem>
Farmer John ...
Assistant: [{'text': '<thinking>\nTo determine where Bessie e
Assistant: Incorrect submission. Please respond with updated
回顾最初的问题
Problem 3: Luxury River Cruise [Josh Alman and Nathan Pinsker, 2013]
Farmer John is taking Bessie and the cows on a cruise! They are sailing on a
network of rivers with N ports (1 <= N <= 1,000) labeled 1..N, and Bessie
starts at port 1. Each port has exactly two rivers leading out of it which
lead directly to other ports, and rivers can only be sailed one way.
At each port, the tour guides choose either the "left" river or the "right"
river to sail down next, but they keep repeating the same choices over and
over. More specifically, the tour guides have chosen a short sequence of M
directions (1 <= M <= 500), each either "left" or "right", and have
repeated it K times (1 <= K <= 1,000,000,000). Bessie thinks she is going
in circles -- help her figure out where she ends up!
PROBLEM NAME: cruise
INPUT FORMAT:
* Line 1: Three space-separated integers N, M, and K.
* Lines 2..N+1: Line i+1 has two space-separated integers,
representing the number of the ports that port i's left and
right rivers lead to, respectively.
* Line N+2: M space-separated characters, either 'L' or 'R'. 'L'
represents a choice of 'left' and 'R' represents a choice of
'right'.
SAMPLE INPUT:
4 3 3
2 4
3 1
4 2
1 3
L L R
INPUT DETAILS:
The port numbers are arranged clockwise in a circle, with 'L' being a
clockwise rotation and 'R' being a counterclockwise rotation. The sequence
taken is LLRLLRLLR.
OUTPUT FORMAT:
* Line 1: A single integer giving the number of the port where
Bessie's cruise ends.
SAMPLE OUTPUT:
4
OUTPUT DETAILS:
After the first iteration of the sequence of directions, Bessie is at port
2 (1 -> 2 -> 3 -> 2); after the second, she is at port 3 (2 -> 3 -> 4 ->
3), and at the end she is at port 4 (3 -> 4 -> 1 -> 4).
snapshot = graph.get_state(config)
print(snapshot.values["messages"][-2].content[0]["text"])
print("\n\nCode:\n\n")
print(snapshot.values["messages"][-2].tool_calls[0]["args"]["code"])
<thinking>
To determine where Bessie ends up, we need to:
1. Simulate the cruise by following the sequence of left/right directions
2. Repeat this sequence K times to find the final destination port
The problem provides:
- The number of ports N
- The connections between ports (left and right rivers for each port)
- The sequence of M directions (L or R) to follow
- The number of times K to repeat the sequence
With this information, we have everything needed to simulate the cruise and find the ending port. The key steps will be:
1. Read in the input data to initialize the river connections and direction sequence
2. Iterate K times:
- For each direction in the M-length sequence:
- Move to the next port based on the current port and direction
3. Output the final port number after K iterations
The solution will require loops to repeat the sequence K times and follow the M directions. Since K can be up to 1 billion, simulating all K iterations directly would be too slow. Instead, we can find a pattern in how the port changes after each M-length sequence, and then "fast-forward" by calculating which port we reach after K repetitions of the pattern.
</thinking>
Code:
N, M, K = map(int, input().split())
ports = []
for _ in range(N):
left, right = map(int, input().split())
ports.append((left, right))
directions = input().split()
cur = 1
pattern = []
seen = set()
steps = 0
while cur not in seen:
seen.add(cur)
for d in directions:
steps += 1
if d == 'L':
cur = ports[cur-1][0]
else:
cur = ports[cur-1][1]
pattern.append((cur, steps))
K %= steps
for port, step in pattern:
if step > K:
cur = port
break
K -= step
print(cur)
Incorrect submission. Please respond with updated code.
Pass rate: 4/10
Results:
<test id=0>
wrong answer. Expected '4
', got '3
'
</test>
<test id=1>
wrong answer. Expected '50
', got '2
'
</test>
<t
智能体需要记住,模拟应该包括循环 + 导致示例的任何步骤。它可以使用“龟兔赛跑”算法进行循环检测,使用模拟路径并在检测到重复时中断,然后
让我们通过更新图状态来告知智能体这一点。
updated_config = graph.update_state(
config,
values={
"messages": [
(
"user",
"""Consider breaking down the algorithm into separate parts: reading inputs, detecting cycles using the tortoise and hare algorithm, and determining Bessie's final position by skipping ahead K steps.
Read the inputs into three arrays:
- Two arrays L and R for the ports (adjust for 0-based indexing)
- A third array S for the direction sequence
Optimize by multiplying K by M before the main loop to convert the number of repetitions into the total number of steps.
Use the tortoise and hare algorithm to detect the cycle:
- Define a helper function get_next(v) that returns the next position and direction index
- Initialize two pointers s0 and s1 to (0, 0)
- In each iteration:
- Move s0 by 1 step and s1 by 2 steps using get_next()
- If s0 equals s1, decrement K by 1 and break out of the loop
- Otherwise, decrement K by 1
- After the loop, if K is not 0, there is a cycle
To find the cycle length:
- Initialize a counter variable rho to 1
- Move s0 by 1 step using get_next()
- Enter a loop:
- Move s0 by 1 step using get_next()
- Increment rho
- If s0 equals s1, break out of the loop
Skip ahead by reducing K modulo rho.
Simulate the remaining steps:
- While K > 0, move s0 to the next position using get_next() and decrement K
Print the final position (converted to 1-based indexing).
Pay close attention to the initialization and movement of pointers during cycle detection and length calculation. Ensure that the logic is correct and handles all cases accurately.""",
)
]
},
)
现在图的状态包含了我们的新消息。
HumanMessage(content="Consider breaking down the algorithm into separate parts: reading inputs, detecting cycles using the tortoise and hare algorithm, and determining Bessie's final position by skipping ahead K steps.\n\nRead the inputs into three arrays:\n- Two arrays L and R for the ports (adjust for 0-based indexing)\n- A third array S for the direction sequence\n\nOptimize by multiplying K by M before the main loop to convert the number of repetitions into the total number of steps.\n\nUse the tortoise and hare algorithm to detect the cycle:\n- Define a helper function get_next(v) that returns the next position and direction index\n- Initialize two pointers s0 and s1 to (0, 0)\n- In each iteration:\n - Move s0 by 1 step and s1 by 2 steps using get_next()\n - If s0 equals s1, decrement K by 1 and break out of the loop\n - Otherwise, decrement K by 1\n- After the loop, if K is not 0, there is a cycle\n\nTo find the cycle length:\n- Initialize a counter variable rho to 1\n- Move s0 by 1 step using get_next()\n- Enter a loop:\n - Move s0 by 1 step using get_next()\n - Increment rho\n - If s0 equals s1, break out of the loop\n\nSkip ahead by reducing K modulo rho.\n\nSimulate the remaining steps:\n- While K > 0, move s0 to the next position using get_next() and decrement K\n\nPrint the final position (converted to 1-based indexing).\n\nPay close attention to the initialization and movement of pointers during cycle detection and length calculation. Ensure that the logic is correct and handles all cases accurately.", id='98888982-a469-4c5a-ab65-743d2f2608dc')
让我们让智能体再次尝试。调用 stream
并传入 None
,只使用从内存加载的输入。我们将跳过接下来的几次人工审查,看看它能否自行纠正。
num_trials = 1
with tracing_v2_enabled(client=client):
for _ in range(num_trials):
events = graph.stream(None, updated_config)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
elif value.get("examples"):
print("Retrieved examples:\n\n", value["examples"][:100] + "...")
elif value.get("candidate"):
print(str(value["candidate"].content)[:200])
if graph.get_state(config).values["status"] == "success":
break
print("Continuing...")
Assistant: [{'text': '<thinking>\nThank you for the detailed
Assistant: Incorrect submission. Please respond with updated
Continuing...
好的,智能体又尝试了一次。查看此步骤的 LangSmith 追踪记录,查看其更新。
snapshot = graph.get_state(most_recent_state.config)
ai_message = snapshot.values["messages"][-2]
if ai_message.content:
print(ai_message.content)
print("\n\nCode:\n\n")
print(ai_message.tool_calls[0]["args"]["code"] if ai_message.tool_calls else "N/A")
[{'text': '<thinking>\nThank you for the detailed algorithm breakdown! Let me go through each step to make sure I understand and have the necessary information to implement the solution.\n\nReading inputs:\n- Read N, M, K and store in separate variables\n- Create arrays L and R to store the left and right port connections (adjust for 0-based indexing)\n- Create array S to store the M-length direction sequence \n- Multiply K by M upfront to get the total number of steps\n\nDetecting cycles with tortoise and hare:\n- Define get_next(v) to return the next position and direction index\n - It will use the current position and direction to look up the next port in L/R\n- Initialize two pointers s0 and s1 to (0, 0) \n- Loop until s0 equals s1 or all K steps are taken:\n - Move s0 by 1 step and s1 by 2 steps using get_next()\n - Decrement K\n- After the loop, check if K is 0 to determine if a cycle was found\n\nFinding cycle length:\n- If a cycle was found, initialize rho to 1\n- Move s0 by 1 step \n- Loop until s0 equals s1 again:\n - Move s0 by 1 step and increment rho\n- rho will equal the cycle length\n\nSkipping ahead:\n- Reduce K by taking it modulo rho\n\nSimulating remaining steps:\n- While K is greater than 0:\n - Move s0 using get_next()\n - Decrement K\n- s0 will hold the final position\n\nPrinting result:\n- Add 1 to the final position to convert back to 1-based indexing before printing\n\nThe key aspects are:\n- Handling the input format and 0-based indexing \n- Defining get_next() to handle moving to the next port based on direction\n- Correctly implementing the tortoise and hare cycle detection\n- Finding the cycle length after detection\n- Skipping ahead with modulo and simulating any remaining steps\n- Adjusting the output back to 1-based indexing\n\nI believe I have all the necessary pieces to implement this solution now. Let me code it up using the writePython tool.\n</thinking>', 'type': 'text'}, {'id': 'toolu_01EDrYeHJU7GxApRb1QfMA1b', 'input': {'reasoning': "Here's the problem-solving approach:\n\n1. Read in the input data:\n - N ports, M-length direction sequence, K repetitions\n - L and R arrays for left/right port connections\n - S array for direction sequence\n - Multiply K by M to get total steps\n\n2. Define get_next(v) helper function:\n - Takes current position and direction index\n - Returns next position and incremented direction index\n - Looks up next port in L/R arrays based on current direction\n\n3. Detect cycle using tortoise and hare algorithm:\n - Initialize s0 and s1 pointers to (0, 0)\n - Loop until match or all steps taken:\n - Move s0 by 1 step, s1 by 2 steps\n - Decrement K\n - Check if K is 0 after loop\n\n4. If cycle found, find cycle length:\n - Initialize rho to 1\n - Move s0 by 1 step\n - Loop until s0 equals s1 again:\n - Move s0 and increment rho\n - rho is the cycle length\n\n5. Skip ahead by K % rho steps\n\n6. Simulate remaining steps:\n - While K > 0:\n - Move s0 with get_next()\n - Decrement K\n \n7. Print final position (+1 for 1-based indexing)\n\nKey points:\n- Multiplying K*M avoids nested loop\n- get_next() handles port transitions \n- Tortoise and hare finds cycles\n- Modulo skips ahead in cycle\n- Adjust 0-based indexing for input/output", 'pseudocode': "1. Read input:\n N, M, K = read_ints()\n L = [0] * N\n R = [0] * N\n for i in 0..N-1:\n L[i], R[i] = read_ints()\n S = read_direction_sequence()\n K *= M\n\n2. Define get_next(v):\n def get_next(pos, dir_idx):\n if S[dir_idx] == 'L':\n next_pos = L[pos]\n else:\n next_pos = R[pos]\n next_dir_idx = (dir_idx + 1) % M\n return (next_pos, next_dir_idx)\n\n3. Find cycle:\n s0 = (0, 0)\n s1 = (0, 0) \n while K:\n s0 = get_next(s0[0], s0[1])\n s1 = get_next(s1[0], get_next(s1[0], s1[1])[1])\n K -= 1\n if s0 == s1: break\n if K != 0: no cycle, print s0[0] + 1\n\n4. Find cycle length:\n rho = 1\n s0 = get_next(s0[0], s0[1])\n while s0 != s1:\n s0 = get_next(s0[0], s0[1]) \n rho += 1\n\n5. Skip steps:\n K %= rho\n\n6. Remaining steps: \n while K:\n s0 = get_next(s0[0], s0[1])\n K -= 1\n \n7. Print result:\n print(s0[0] + 1)", 'code': "def read_ints():\n return map(int, input().split())\n\nN, M, K = read_ints()\n\nL = [0] * N\nR = [0] * N\nfor i in range(N):\n L[i], R[i] = read_ints()\n L[i] -= 1\n R[i] -= 1\n\nS = input().split()\n\nK *= M\n\ndef get_next(pos, dir_idx):\n if S[dir_idx] == 'L':\n next_pos = L[pos] \n else:\n next_pos = R[pos]\n next_dir_idx = (dir_idx + 1) % M\n return (next_pos, next_dir_idx)\n\ns0 = (0, 0) \ns1 = (0, 0)\n\nwhile K:\n if s0 == s1: break\n \n s0 = get_next(s0[0], s0[1])\n s1 = get_next(s1[0], get_next(s1[0], s1[1])[1])\n \n K -= 1\n \nif K:\n rho = 1\n s0 = get_next(s0[0], s0[1])\n while s0 != s1:\n s0 = get_next(s0[0], s0[1])\n rho += 1\n \n K %= rho\n \nwhile K: \n s0 = get_next(s0[0], s0[1])\n K -= 1\n \nprint(s0[0] + 1)"}, 'name': 'writePython', 'type': 'tool_use'}]
Code:
def read_ints():
return map(int, input().split())
N, M, K = read_ints()
L = [0] * N
R = [0] * N
for i in range(N):
L[i], R[i] = read_ints()
L[i] -= 1
R[i] -= 1
S = input().split()
K *= M
def get_next(pos, dir_idx):
if S[dir_idx] == 'L':
next_pos = L[pos]
else:
next_pos = R[pos]
next_dir_idx = (dir_idx + 1) % M
return (next_pos, next_dir_idx)
s0 = (0, 0)
s1 = (0, 0)
while K:
if s0 == s1: break
s0 = get_next(s0[0], s0[1])
s1 = get_next(s1[0], get_next(s1[0], s1[1])[1])
K -= 1
if K:
rho = 1
s0 = get_next(s0[0], s0[1])
while s0 != s1:
s0 = get_next(s0[0], s0[1])
rho += 1
K %= rho
while K:
s0 = get_next(s0[0], s0[1])
K -= 1
print(s0[0] + 1)
Incorrect submission. Please respond with updated code.
Pass rate: 3/10
Results:
<test id=0>
passed
</test>
<test id=1>
timed out
</test>
<test id=2>
timed out
</test>
<test id=3>
timed out
</test>
<t
让我们提供更多反馈。
updated_config = graph.update_state(
updated_config,
values={
"messages": [
(
"user",
"""That's better, but you're still getting some errors. Let's double check some things:
1. When calculating the cycle length, make sure the initialization and movement of the pointers is correct. Double-check the logic there and see if you can spot any discrepancies.
2. Check the condition for whether there's a cycle after the main loop to ensure it covers all cases, like if K becomes 0 in the last iteration.
Think step by step through youur implementation and update using the writePython tool.""",
)
]
},
)
现在我们已经提供了这些反馈,在我们再次介入之前,让智能体多尝试几次解决它。
num_trials = 2
with tracing_v2_enabled(client=client):
for _ in range(num_trials):
events = graph.stream(None, updated_config)
for event in events:
for value in event.values():
messages = value.get("messages")
if messages:
if isinstance(messages, list):
messages = value["messages"][-1]
print(
"Assistant:",
str(messages.content).replace("\n", "\\n")[:50],
)
elif value.get("examples"):
print("Retrieved examples:\n\n", value["examples"][:100] + "...")
elif value.get("candidate"):
print(str(value["candidate"].content)[:200])
if graph.get_state(config).values["status"] == "success":
break
print("Continuing...")
结论¶
恭喜你完成了本教程!在本教程中,你使用 LangGraph 实现了一个能够解决具有挑战性编程问题的智能体。你通过利用一些常见技术来提高性能,包括:
- 反思:虽然我们没有实现明确的反思步骤,但我们的提示和工具调用旨在鼓励对之前输出进行批判。你在第1部分中添加了这一点。
- 检索:智能体的“情景记忆”从我们的编程问题语料库中检索高质量的少样本示例,以帮助解决青铜级别的问题。在第2部分中,你实现了一个检索记忆作为初始步骤。
- 人在回路:由 LLM 驱动的智能体目前还不足以自主回答所有这些问题,但在某些时候,它们可以完成大部分工作,并在人类反馈下找到正确答案。在第3部分中,你对
evaluate
节点使用了interrupt_after
,然后通过在图上使用update_state
包含了你的反馈。
LLMs 尚无法自主解决所有这些问题,但通过更好的提示和巧妙的工程设计,你可以创建一个能够更可靠地得出正确解决方案的系统。