TNT-LLM:规模化文本挖掘¶
Wan 等人的 TNT-LLM 描述了一种由 Microsoft 为其 Bing Copilot 应用开发的分类法生成和分类系统。
它能从原始对话日志中生成丰富、可解释的用户意图(或其他类别)分类法。然后,LLM 可以利用这个分类法对日志进行标注,这些标注过的日志反过来可以作为训练数据,用于适配一个可以在你的应用中部署的廉价分类器(例如基于嵌入的逻辑回归分类器)。
TNT-LLM 主要包含三个阶段
- 生成分类法
- 标注训练数据
- 微调分类器 + 部署
在本 notebook 中应用 LangGraph 时,我们将专注于第一个阶段:分类法生成(下图中蓝色部分)。然后,我们将在下面的后续步骤中展示如何标注和拟合分类器。
为了生成分类法,TNT-LLM 提出了 5 个步骤
- 使用成本较低的 LLM 总结聊天日志(对样本中的所有日志进行批量处理)
- 批量将日志分割成随机小批量
- 从第一个小批量中生成初始分类法
- 通过批判和修订提示,在每个后续小批量上更新分类法
- 审查最终分类法,评估其质量并使用最终样本生成最终值。
设置¶
首先,让我们安装所需的包并设置 API 密钥
pip install -U langgraph langchain_anthropic langsmith langchain-community
pip install -U sklearn langchain_openai
import getpass
import os
def _set_env(var: str):
if os.environ.get(var):
return
os.environ[var] = getpass.getpass(var + ":")
_set_env("ANTHROPIC_API_KEY")
为 LangGraph 开发设置 LangSmith
注册 LangSmith 可以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用 — 在此处阅读更多关于如何开始的信息。
定义图¶
图状态¶
由于 StateGraph 的每个节点都接受状态(并返回更新后的状态),我们将在开始时定义它。
我们的流程接收一个文档列表,对它们进行批量处理,然后生成并优化作为可解释“聚类”的候选分类法。
import logging
import operator
from typing import Annotated, List, Optional
from typing_extensions import TypedDict
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger("tnt-llm")
class Doc(TypedDict):
id: str
content: str
summary: Optional[str]
explanation: Optional[str]
category: Optional[str]
class TaxonomyGenerationState(TypedDict):
# The raw docs; we inject summaries within them in the first step
documents: List[Doc]
# Indices to be concise
minibatches: List[List[int]]
# Candidate Taxonomies (full trajectory)
clusters: Annotated[List[List[dict]], operator.add]
定义节点¶
1. 总结文档¶
聊天日志可能会很长。我们的分类法生成步骤需要处理大量且多样化的小批量数据,才能充分捕捉类别的分布。为了确保它们都能高效地适应上下文窗口,我们首先会总结每个聊天日志。下游步骤将使用这些总结而不是原始文档内容。
API 参考: ChatAnthropic | StrOutputParser | RunnableConfig | RunnableLambda | RunnablePassthrough
import re
from langchain import hub
from langchain_anthropic import ChatAnthropic
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnablePassthrough
summary_prompt = hub.pull("wfh/tnt-llm-summary-generation").partial(
summary_length=20, explanation_length=30
)
def parse_summary(xml_string: str) -> dict:
summary_pattern = r"<summary>(.*?)</summary>"
explanation_pattern = r"<explanation>(.*?)</explanation>"
summary_match = re.search(summary_pattern, xml_string, re.DOTALL)
explanation_match = re.search(explanation_pattern, xml_string, re.DOTALL)
summary = summary_match.group(1).strip() if summary_match else ""
explanation = explanation_match.group(1).strip() if explanation_match else ""
return {"summary": summary, "explanation": explanation}
summary_llm_chain = (
summary_prompt | ChatAnthropic(model="claude-3-haiku-20240307") | StrOutputParser()
# Customize the tracing name for easier organization
).with_config(run_name="GenerateSummary")
summary_chain = summary_llm_chain | parse_summary
# Now combine as a "map" operation in a map-reduce chain
# Input: state
# Output: state U summaries
# Processes docs in parallel
def get_content(state: TaxonomyGenerationState):
docs = state["documents"]
return [{"content": doc["content"]} for doc in docs]
map_step = RunnablePassthrough.assign(
summaries=get_content
# This effectively creates a "map" operation
# Note you can make this more robust by handling individual errors
| RunnableLambda(func=summary_chain.batch, afunc=summary_chain.abatch)
)
def reduce_summaries(combined: dict) -> TaxonomyGenerationState:
summaries = combined["summaries"]
documents = combined["documents"]
return {
"documents": [
{
"id": doc["id"],
"content": doc["content"],
"summary": summ_info["summary"],
"explanation": summ_info["explanation"],
}
for doc, summ_info in zip(documents, summaries)
]
}
# This is actually the node itself!
map_reduce_chain = map_step | reduce_summaries
2. 分割成小批量¶
每个小批量包含文档的随机样本。这使得流程可以使用新数据识别当前分类法中的不足之处。
import random
def get_minibatches(state: TaxonomyGenerationState, config: RunnableConfig):
batch_size = config["configurable"].get("batch_size", 200)
original = state["documents"]
indices = list(range(len(original)))
random.shuffle(indices)
if len(indices) < batch_size:
# Don't pad needlessly if we can't fill a single batch
return [indices]
num_full_batches = len(indices) // batch_size
batches = [
indices[i * batch_size : (i + 1) * batch_size] for i in range(num_full_batches)
]
leftovers = len(indices) % batch_size
if leftovers:
last_batch = indices[num_full_batches * batch_size :]
elements_to_add = batch_size - leftovers
last_batch += random.sample(indices, elements_to_add)
batches.append(last_batch)
return {
"minibatches": batches,
}
3.a 分类法生成工具函数¶
图的这部分是一个生成 -> 更新 🔄 -> 审查的循环。每个节点共享大量逻辑,我们已将其提取到下面的共享函数中。
API 参考: Runnable
from typing import Dict
from langchain_core.runnables import Runnable
def parse_taxa(output_text: str) -> Dict:
"""Extract the taxonomy from the generated output."""
cluster_matches = re.findall(
r"\s*<id>(.*?)</id>\s*<name>(.*?)</name>\s*<description>(.*?)</description>\s*",
output_text,
re.DOTALL,
)
clusters = [
{"id": id.strip(), "name": name.strip(), "description": description.strip()}
for id, name, description in cluster_matches
]
# We don't parse the explanation since it isn't used downstream
return {"clusters": clusters}
def format_docs(docs: List[Doc]) -> str:
xml_table = "<conversations>\n"
for doc in docs:
xml_table += f'<conv_summ id={doc["id"]}>{doc["summary"]}</conv_summ>\n'
xml_table += "</conversations>"
return xml_table
def format_taxonomy(clusters):
xml = "<cluster_table>\n"
for label in clusters:
xml += " <cluster>\n"
xml += f' <id>{label["id"]}</id>\n'
xml += f' <name>{label["name"]}</name>\n'
xml += f' <description>{label["description"]}</description>\n'
xml += " </cluster>\n"
xml += "</cluster_table>"
return xml
def invoke_taxonomy_chain(
chain: Runnable,
state: TaxonomyGenerationState,
config: RunnableConfig,
mb_indices: List[int],
) -> TaxonomyGenerationState:
configurable = config["configurable"]
docs = state["documents"]
minibatch = [docs[idx] for idx in mb_indices]
data_table_xml = format_docs(minibatch)
previous_taxonomy = state["clusters"][-1] if state["clusters"] else []
cluster_table_xml = format_taxonomy(previous_taxonomy)
updated_taxonomy = chain.invoke(
{
"data_xml": data_table_xml,
"use_case": configurable["use_case"],
"cluster_table_xml": cluster_table_xml,
"suggestion_length": configurable.get("suggestion_length", 30),
"cluster_name_length": configurable.get("cluster_name_length", 10),
"cluster_description_length": configurable.get(
"cluster_description_length", 30
),
"explanation_length": configurable.get("explanation_length", 20),
"max_num_clusters": configurable.get("max_num_clusters", 25),
}
)
return {
"clusters": [updated_taxonomy["clusters"]],
}
3. 生成初始分类法¶
# We will share an LLM for each step of the generate -> update -> review cycle
# You may want to consider using Opus or another more powerful model for this
taxonomy_generation_llm = ChatAnthropic(
model="claude-3-haiku-20240307", max_tokens_to_sample=2000
)
## Initial generation
taxonomy_generation_prompt = hub.pull("wfh/tnt-llm-taxonomy-generation").partial(
use_case="Generate the taxonomy that can be used to label the user intent in the conversation.",
)
taxa_gen_llm_chain = (
taxonomy_generation_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="GenerateTaxonomy")
generate_taxonomy_chain = taxa_gen_llm_chain | parse_taxa
def generate_taxonomy(
state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
return invoke_taxonomy_chain(
generate_taxonomy_chain, state, config, state["minibatches"][0]
)
4. 更新分类法¶
这是一个重复 N 次的“批判 -> 修订”步骤。
taxonomy_update_prompt = hub.pull("wfh/tnt-llm-taxonomy-update")
taxa_update_llm_chain = (
taxonomy_update_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="UpdateTaxonomy")
update_taxonomy_chain = taxa_update_llm_chain | parse_taxa
def update_taxonomy(
state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
which_mb = len(state["clusters"]) % len(state["minibatches"])
return invoke_taxonomy_chain(
update_taxonomy_chain, state, config, state["minibatches"][which_mb]
)
5. 审查分类法¶
这在我们处理完所有小批量后运行一次。
taxonomy_review_prompt = hub.pull("wfh/tnt-llm-taxonomy-review")
taxa_review_llm_chain = (
taxonomy_review_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="ReviewTaxonomy")
review_taxonomy_chain = taxa_review_llm_chain | parse_taxa
def review_taxonomy(
state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
batch_size = config["configurable"].get("batch_size", 200)
original = state["documents"]
indices = list(range(len(original)))
random.shuffle(indices)
return invoke_taxonomy_chain(
review_taxonomy_chain, state, config, indices[:batch_size]
)
编译图¶
定义好所有功能后,我们就可以构建图了!
API 参考: StateGraph | START | END
from langgraph.graph import StateGraph, START, END
graph = StateGraph(TaxonomyGenerationState)
graph.add_node("summarize", map_reduce_chain)
graph.add_node("get_minibatches", get_minibatches)
graph.add_node("generate_taxonomy", generate_taxonomy)
graph.add_node("update_taxonomy", update_taxonomy)
graph.add_node("review_taxonomy", review_taxonomy)
graph.add_edge("summarize", "get_minibatches")
graph.add_edge("get_minibatches", "generate_taxonomy")
graph.add_edge("generate_taxonomy", "update_taxonomy")
def should_review(state: TaxonomyGenerationState) -> str:
num_minibatches = len(state["minibatches"])
num_revisions = len(state["clusters"])
if num_revisions < num_minibatches:
return "update_taxonomy"
return "review_taxonomy"
graph.add_conditional_edges(
"update_taxonomy",
should_review,
# Optional (but required for the diagram to be drawn correctly below)
{"update_taxonomy": "update_taxonomy", "review_taxonomy": "review_taxonomy"},
)
graph.add_edge("review_taxonomy", END)
graph.add_edge(START, "summarize")
app = graph.compile()
from IPython.display import Image, display
try:
display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
使用图¶
文档可以包含任何内容,但我们发现它在聊天机器人日志上效果非常好,例如由 LangSmith 捕获的日志。
下面我们将使用它作为一个示例。请将 project_name
更新为您自己的 LangSmith 项目。
您可能需要自定义下面的 run_to_doc
函数,因为您期望的键可能与本 notebook 作者的不同。
from datetime import datetime, timedelta
from langsmith import Client
project_name = "YOUR PROJECT NAME" # Update to your own project
client = Client()
past_week = datetime.now() - timedelta(days=7)
runs = list(
client.list_runs(
project_name=project_name,
filter="eq(is_root, true)",
start_time=past_week,
# We only need to return the inputs + outputs
select=["inputs", "outputs"],
)
)
# Convert the langsmith traces to our graph's Doc object.
def run_to_doc(run) -> Doc:
turns = []
idx = 0
for turn in run.inputs.get("chat_history") or []:
key, value = next(iter(turn.items()))
turns.append(f"<{key} idx={idx}>\n{value}\n</{key}>")
idx += 1
turns.append(
f"""
<human idx={idx}>
{run.inputs['question']}
</human>"""
)
if run.outputs and run.outputs["output"]:
turns.append(
f"""<ai idx={idx+1}>
{run.outputs['output']}
</ai>"""
)
return {
"id": str(run.id),
"content": ("\n".join(turns)),
}
调用¶
现在将运行结果转换为文档并启动您的图流程。这将需要一些时间!总结步骤耗时最长。如果您想加快速度,可以尝试将负载分散到不同的模型提供商。
API 参考: InMemoryCache | set_llm_cache
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
# Optional. If you are running into errors or rate limits and want to avoid repeated computation,
# you can set this while debugging
set_llm_cache(InMemoryCache())
# We will randomly sample down to 1K docs to speed things up
docs = [run_to_doc(run) for run in runs if run.inputs]
docs = random.sample(docs, min(len(docs), 1000))
use_case = (
"Generate the taxonomy that can be used both to label the user intent"
" as well as to identify any required documentation (references, how-tos, etc.)"
" that would benefit the user."
)
stream = app.stream(
{"documents": docs},
{
"configurable": {
"use_case": use_case,
# Optional:
"batch_size": 400,
"suggestion_length": 30,
"cluster_name_length": 10,
"cluster_description_length": 30,
"explanation_length": 20,
"max_num_clusters": 25,
},
# We batch summarize the docs. To avoid getting errors, we will limit the
# degree of parallelism to permit.
"max_concurrency": 2,
},
)
for step in stream:
node, state = next(iter(step.items()))
print(node, str(state)[:20] + " ...")
最终结果¶
下面,将最终结果渲染为 markdown
from IPython.display import Markdown
def format_taxonomy_md(clusters):
md = "## Final Taxonomy\n\n"
md += "| ID | Name | Description |\n"
md += "|----|------|-------------|\n"
# Fill the table with cluster data
for label in clusters:
id = label["id"]
name = label["name"].replace(
"|", "\\|"
) # Escape any pipe characters within the content
description = label["description"].replace(
"|", "\\|"
) # Escape any pipe characters
md += f"| {id} | {name} | {description} |\n"
return md
Markdown(format_taxonomy_md(step["__end__"]["clusters"][-1]))
最终分类法¶
ID | 名称 | 描述 |
---|---|---|
1 | 网络连接问题故障排除 | 解决 DNS、网络连接和 GitHub 扩展激活问题。 |
2 | 提取和分析数据 | 从文本文件、数据库和 API 等各种来源检索和处理数据。 |
3 | 提供医疗健康洞见 | 生成医学诊断、症状检查、药物信息和皮肤状况分析。 |
4 | 配置和优化模型 | 调整模型参数和超参数以提高给定任务的性能。 |
5 | 生成创意诗歌 | 使用语言模型和人工智能工具创作诗歌。 |
6 | 与数据库交互 | 查询数据库、提取数据并在数据处理期间管理错误。 |
7 | 查询向量数据库 | 与 Milvus 等向量数据库交互,以存储和检索高维数据。 |
8 | 生成合成数据 | 使用语言模型和机器学习技术创建合成数据。 |
9 | 集成工具和工作流 | 将各种工具和库整合到内聚的工作流中,用于不同的任务。 |
10 | 改进信息检索 | 为每个文档存储和查询多个向量,以获得更好的语义理解。 |
11 | 处理文档和提取文本 | 解析和提取 PDF、DOCX 和 HTML 等各种文档格式中的文本。 |
12 | 构建本地知识库 | 从文本文件创建知识库,处理文本分割、嵌入和存储。 |
13 | 优化对话检索 | 排除故障并改进 LangChain 中 ConversationalRetrievalChain 的性能。 |
14 | 连接数据库和使用代理 | 连接到数据库,使用代理,并理解不同代理类型之间的差异。 |
15 | 自省 LangChain 工具 | 访问和检索 LangChain 工具的功能和源代码详情。 |
16 | 使用检索增强生成风格化答案 | 创建一个问答系统,以特定风格生成有良好引用的答案。 |
17 | 使用 ZERO_SHOT_REACT_DESCRIPTION 代理 | 在 LangChain 中将 ZERO_SHOT_REACT_DESCRIPTION 代理类型应用于聊天模型。 |
18 | 自动化微课创建 | 根据主题、数量和学习风格等输入参数生成微课。 |
19 | 与 Chroma 向量存储集成 | 在 Chroma 向量数据库中存储和检索数据,包括处理文档嵌入。 |
20 | 管理 LangChain 回调令牌 | 理解和利用 LCEL 链中的回调令牌特性。 |
21 | FastAPI 部署故障排除 | 解决使用 FastAPI 后端部署 React 应用的问题。 |
22 | 使用 LangChain 代理分析数据 | 使用 LangChain 代理与 Pandas 和 Spark DataFrames 交互进行数据探索。 |
23 | 实现 OpenAI Chat API | 实现 OpenAI 聊天完成 API 并理解所需的输入和输出。 |
24 | 比较 LangChain 和 LLMIndex | 评估 LangChain 和 LLMIndex 之间的差异,包括它们对 Markdown 的 UI 支持。 |
25 | 在 AgentExecutor 中禁用工具 | 在 AgentExecutor 中临时禁用工具,禁用次数固定。 |
阶段 2:标注¶
现在我们有了分类法,是时候标注我们数据的一个子集来训练分类器了。
输入分类在很多方面都有用,从行内提示优化(为每个分类的意图定制提示),到系统改进(识别系统无法产生良好响应的类别),再到产品分析(了解哪些意图类别可以改进以提高利润)。
问题在于基于 LLM 的标注可能成本高昂。
计算嵌入的成本大约低 100 倍,而在此基础上添加一个简单的逻辑回归分类器只会增加微不足道的成本。
让我们进行标注并训练一个分类器!
标注训练数据¶
使用 LLM 以全自动方式标注数据。为了获得更好的准确性,您还可以手动抽样部分结果进行标注以验证质量。
labeling_prompt = hub.pull("wfh/tnt-llm-classify")
labeling_llm = ChatAnthropic(model="claude-3-haiku-20240307", max_tokens_to_sample=2000)
labeling_llm_chain = (labeling_prompt | labeling_llm | StrOutputParser()).with_config(
run_name="ClassifyDocs"
)
def parse_labels(output_text: str) -> Dict:
"""Parse the generated labels from the predictions."""
category_matches = re.findall(
r"\s*<category>(.*?)</category>.*",
output_text,
re.DOTALL,
)
categories = [{"category": category.strip()} for category in category_matches]
if len(categories) > 1:
logger.warning(f"Multiple selected categories: {categories}")
label = categories[0]
stripped = re.sub(r"^\d+\.\s*", "", label["category"]).strip()
return {"category": stripped}
labeling_chain = labeling_llm_chain | parse_labels
final_taxonomy = step["__end__"]["clusters"][-1]
xml_taxonomy = format_taxonomy(final_taxonomy)
results = labeling_chain.batch(
[
{
"content": doc["content"],
"taxonomy": xml_taxonomy,
}
for doc in docs
],
{"max_concurrency": 5},
return_exceptions=True,
)
# Update the docs to include the categories
updated_docs = [{**doc, **category} for doc, category in zip(docs, results)]
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter your OPENAI_API_KEY: ")
API 参考: OpenAIEmbeddings
from langchain_openai import OpenAIEmbeddings
# Consider using other embedding models here too!
encoder = OpenAIEmbeddings(model="text-embedding-3-large")
vectors = encoder.embed_documents([doc["content"] for doc in docs])
embedded_docs = [{**doc, "embedding": v} for doc, v in zip(updated_docs, vectors)]
训练分类器¶
现在我们已经从文本中提取了特征,我们可以在这些特征上生成分类器。
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
# Create a dictionary mapping category names to their indices in the taxonomy
category_to_index = {d["name"]: i for i, d in enumerate(final_taxonomy)}
category_to_index["Other"] = len(category_to_index)
# Convert category strings to numeric labels
labels = [
category_to_index.get(d["category"], category_to_index["Other"])
for d in embedded_docs
]
label_vectors = [d["embedding"] for d in embedded_docs]
X_train, X_test, y_train, y_test = train_test_split(
label_vectors, labels, test_size=0.2, random_state=42
)
# Calculate class weights
class_weights = class_weight.compute_class_weight(
class_weight="balanced", classes=np.unique(y_train), y=y_train
)
class_weight_dict = dict(enumerate(class_weights))
# Weight the classes to partially handle imbalanced data
model = LogisticRegression(class_weight=class_weight_dict)
model.fit(X_train, y_train)
train_preds = model.predict(X_train)
test_preds = model.predict(X_test)
train_acc = accuracy_score(y_train, train_preds)
test_acc = accuracy_score(y_test, test_preds)
train_f1 = f1_score(y_train, train_preds, average="weighted")
test_f1 = f1_score(y_test, test_preds, average="weighted")
print(f"Train Accuracy: {train_acc:.3f}")
print(f"Test Accuracy: {test_acc:.3f}")
print(f"Train F1 Score: {train_f1:.3f}")
print(f"Test F1 Score: {test_f1:.3f}")
阶段 3:部署¶
现在您有了分类器,可以轻松地部署它并应用于未来的运行!您只需嵌入输入并应用您的 LogisticRegression 分类器。让我们试试。我们将使用 Python 的 joblib 库来序列化我们的 sklearn 分类器。下面是一个示例
from joblib import dump as jl_dump
categories = list(category_to_index)
# Save the model and categories to a file
with open("model.joblib", "wb") as file:
jl_dump((model, categories), file)
部署¶
部署时,您可以加载分类器并初始化您的嵌入编码器。它们使用 LCEL 可以轻松地组合在一起。
API 参考: OpenAIEmbeddings
from joblib import load as jl_load
from langchain_openai import OpenAIEmbeddings
loaded_model, loaded_categories = jl_load("model.joblib")
encoder = OpenAIEmbeddings(model="text-embedding-3-large")
def get_category_name(predictions):
return [loaded_categories[pred] for pred in predictions]
classifier = (
RunnableLambda(encoder.embed_documents, encoder.aembed_documents)
| loaded_model.predict
| get_category_name
)
示例:¶
假设您有了更多数据输入,您可以在下面获取并应用它
client = Client()
past_5_min = datetime.now() - timedelta(minutes=5)
runs = list(
client.list_runs(
project_name=project_name,
filter="eq(is_root, true)",
start_time=past_5_min,
# We only need to return the inputs + outputs
select=["inputs", "outputs"],
limit=100,
)
)
docs = [run_to_doc(r) for r in runs]
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
``````output
['Interacting with Databases', 'Optimizing Conversational Retrieval']
结论¶
恭喜您成功实现了 TNT-LLM!虽然大多数人使用基于聚类的方法,如 LDA、k-means 等,但通常很难真正解释每个聚类代表什么。TNT-LLM 生成人类可解释的标签,您可以在下游使用它们来监控和改进您的应用程序。
该技术也适用于分层子分类:一旦您拥有上述分类法,就可以用它来标注您的数据,然后在每个子类别上,使用与上述类似的技术生成一个新的分类法!