SQL 生成器 2.0我如何为企业级构建 AI 查询向导支持 500 表

SQL 生成器 2.0我如何为企业级构建 AI 查询向导支持 500 表

Barry Lv6

迄今为止的旅程:Confluence Agent的回顾

在我们深入探讨SQL Agent之前,让我们简要回顾一下我们开发的Confluence Agent:

  1. Metadata Ingestion: 捕捉我们知识库的结构。
  2. Content Extraction: 提取我们文档的核心内容。
  3. Format Handling: 分离HTML和PDF内容以实现最佳处理。
  4. Image Analysis: 利用LLM解析提取和理解图像内容。
  5. Performance Boost: 实现异步和多线程处理,实现10倍的速度提升。

这些增强功能为强大的信息检索系统奠定了基础。现在,我们正在扩展我们的工具包,以解决数据驱动组织中最常见的挑战之一:SQL查询生成。

我为什么要构建这个?

想象一下这个场景:你是一名新的数据分析师,你的老板急匆匆地走到你的桌前,提出一个紧急请求:

“我需要对昨天的游戏指标与去年的数据进行比较分析,重点关注速度和收入。请在今天结束前放到我的桌子上。”

当你脸色苍白时,你意识到你面临几个挑战:

  1. 你是新来的,不知道在哪里找到相关数据。
  2. 你不确定哪些表包含你需要的信息。
  3. 编写复杂的 SQL 查询还不是你的强项(还不是)。
  4. 你的经理整天都在开会,你不想用基本问题打扰他们。

这个场景突显了许多组织面临的三个关键挑战:

  1. 数据量:在传统和现代数据仓库中,有超过 500 个表,找到正确的数据就像在大海捞针。
  2. 查询复杂性:作为唯一的真实数据来源,需要应对来自各个部门的复杂查询。
  3. 知识差距:新团队成员往往在没有广泛的机构知识的情况下,难以导航庞大的数据库。

我思考这个问题已经有一段时间,但直到一个完全正常的早晨,我和我的总经理进行了一次随意的对话,我才开始考虑着手解决这个问题。

“我们现在可以从你的聊天机器人查询数据吗?”

我没想到,这句随口而出的评论会让我踏上改革我们与数据互动方式的征程。但在我讲述我是如何构建这个 AI 查询向导之前,让我先谈谈为什么大多数现有解决方案根本无法解决企业规模的问题。

为什么“只需谷歌一下”对企业 SQL 挑战无效

在动手之前,我做了任何一个自尊心强的开发者都会做的事情:我在互联网上寻找解决方案。我发现的内容是……嗯,姑且说它还差得远。

文本到 SQL 的“你好,世界”

我发现的大多数文章和教程就像是用儿童游泳池来训练奥运会:

  • 🐣 小表:使用 2–3 个表的示例。可爱,但我们最小的模式会把这些当早餐吃掉。
  • 📜 无限提示卷轴:解决方案建议我将整个表模式粘贴到提示中。祝你在无限提示中好运。
  • 🏝️ 孤立岛查询:对单个表的简单查询,忽略了真实数据库中的复杂关系。
  • 🐌 性能?什么性能?:大多数示例忽视了在大数据集中的查询性能这一关键方面。

我的数据现实

在这些教程还在小型游泳池里玩耍时,我却在凝视着数据等同于太平洋的浩瀚:

  • 🏙️ 表格混乱:超过300个表格,有些表格的列数超过150
  • 🕸️ 复杂关系:一个具有复杂相互依赖关系的数据模型,即数据保险库建模。
  • 🏎️ 速度需求:需要为速度和效率优化的查询。
  • 🔄 演变中的模式:一个表结构频繁变化的动态环境。

很明显:我们需要一个与我们的数据环境一样强大和复杂的解决方案。

实现 SQL Agent

让我们不浪费时间,直接进入主题。现在,这里有一个限制。

我显然可以赋予 SQL Agent 执行 SQL 的能力。实际上,我应该这样做,但是,由于这是一个概念验证(PoC),与数据库交互的 API 密钥在我名下,任何人都可以轻易地进行提示注入,比如说“删除表”。此外,我不保证从代理生成的 SQL 是 100% 正确和优化的。随着训练数据的不断增加,我们最终会很快达到那个阶段。目前,用户需要自行验证生成的 SQL。

当前的高级架构

我们目前只有一个单一的代理。Confluence Agent。工作流程保持不变。用户发送请求 -> 请求发送到主代理进行推理 -> 发送到 Confluence Agent 以通过 Confluence 知识库找到答案 -> 发送到验证代理以验证响应 -> 发送到人工响应代理以标记 PII 数据(如果存在)并回复用户,否则,调用主代理并请求重新评估,因为数据检索不够好。

数据处理:成功的基础

如果你只是将纯文本直接放入向量数据库,并期望LLM能做好,那么你不会得到好的结果。我试过并失败了,所以你不必自己尝试。任何成功的机器学习系统或LLM/生成式AI系统都依赖于数据处理。这是一个关键步骤,通常占据超过80%的时间。如果你的数据处理得当,那么后续的提示工程可以稍后进行。不要试图在数据准备阶段进行提示工程。

1. 数据摄取:绘制数据全景

首先,我需要掌握我们庞大的数据全景。我们的数据平台使用AWS Glue作为主要数据目录,因此Glue始终包含表/数据库/模式的最新更新。这一步至关重要,因为它为后续的一切奠定了基础。

我编写了一个Python脚本,通过AWS Glue API提取我们所有数据库和表的信息。这不仅仅是简单的数据转储——我必须仔细构建提取的信息,以便在项目的后续阶段使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import boto3

session = boto3.Session(profile_name='<your_profile_name>')
glue_client = session.client('glue')
bedrock_runtime = session.client(service_name='bedrock-runtime')

def list_glue_tables(glue_client):
raw_all_tables = []
filtered_databases = ['<your_db1>','<your_db_2>','<your_db_3>']

paginator = client.get_paginator('get_databases')
for page in paginator.paginate():
for database in page['DatabaseList']:
if database['Name'] not in filtered_databases:
continue

table_paginator = client.get_paginator('get_tables')
for table_page in table_paginator.paginate(DatabaseName=database['Name']):
raw_all_tables.extend(table_page['TableList'])

return raw_all_tables

该脚本遍历每个数据库,然后遍历这些数据库中的每个表,提取关键信息,如表名、列名和类型、存储位置以及最后更新时间戳。这种全面的方法确保没有表被遗漏,从而为我提供了完整的数据地图。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def extract_schema(table):
return {
"DatabaseName": table['DatabaseName'],
"TableName": table['Name'],
"TableDescription": table.get('Description', ''),
"Partition": table.get('PartitionKeys', None),
"TableSchema": [
{
"Name": col['Name'],
"Type": col['Type'],
"Comment": col.get('Comment', '')
} for col in table['StorageDescriptor']['Columns']
],
"CreateTime": table.get('CreateTime', None),
"UpdateTime": table.get('UpdateTime', None),
"SourceSQL": table.get('ViewExpandedText', '')
}

def process_table(table):
print(f"Processing table {table['Name']}")
schema = extract_schema(table)
documentation = generate_documentation(schema) # find this function below
table_name = f"{table['DatabaseName']}.{table['Name']}"
save_documentation(table_name, documentation)
print(f"===Documentation generated for {table['Name']}")

2. 上下文丰富:为原始数据增添风味

原始表模式就像未调味的食物——功能性强,但并不令人兴奋。我需要为这些数据添加一些上下文,一些调味。这就是事情变得有趣的地方,也是我开始利用大型语言模型(LLMs)力量的地方。

我开发了一个基于LLM的丰富过程,它将提取每个表的原始元数据并生成有意义的上下文。这不仅仅是重述元数据中已有的内容——我希望LLM能对表的目的、与其他表的关系以及它可能如何使用做出有根据的猜测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def generate_documentation(schema):
system_prompt = """
You are an expert database and business developer specializing in <place holder for your purpose>
Your task is to review database schemas and generate comprehensive documentation in JSON format.
Focus on providing insights relevant to the betting industry, including table purposes, column descriptions,
and potential use cases. Be concise yet informative, and ensure all output is in valid JSON format.
"""

initial_user_prompt = f"""
Please generate comprehensive documentation for the following database schema in JSON format only.
The documentation should include:
1. A brief overview of the table's purpose and its role in <purpose>
2. Detailed descriptions of each column, including its data type, purpose, and any relevant notes specific to the <your data platform>
3. Any additional insights, best practices, or potential use cases for this table in the context of <your context>
4. Comments on the creation and last update times of the table, if relevant to its usage or data freshness
5. Generate at least 10 common queries that could be run against this table in the context <your context>

Here's the schema:
{json.dumps(schema, indent=2, cls=DateTimeEncoder)}

Please provide the output in the following format:
```json
{{
"DatabaseName": "Name of the database",
"TableName": "Name of the table",
"TableDescription": "Brief overview of the table",
"CreateTime": "Raw creation time of table",
"UpdateTime": "Raw updated time of table",
"Columns": [
{{
"name": "column_name",
"type": "data_type",
"description": "Detailed description and purpose of the column"
}},
// ... all other columns
],
"AdditionalInsights": [
"Insight 1",
"Insight 2",
// ... other insights
],
"CommonQueries": [
{
"natural_language": "Nature english query",
"sql_query": "Detail of SQL query",
}
]
}}
If you need more space to complete the documentation, end your response with "[CONTINUE]" and I will prompt you to continue.
"""

full_response = ""
conversation_history = f"{system_prompt}\n\nuser: {initial_user_prompt}\n\nassistant: "

while True:
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "messages": [{"role": "user", "content": conversation_history}],
        "max_tokens": 8192,
        "temperature": 0,
    })

    response = bedrock_runtime.invoke_model(body=body, modelId=model_id)
    response_body = json.loads(response.get('body').read())
    current_response = response_body['content'][0]['text']
    full_response += current_response

    if response_body['stop_reason'] != 'max_tokens':
        break

    conversation_history += current_response
    conversation_history += "\n\nuser: Please continue the JSON documentation where you left off, maintaining the perspective of an expert in sports and racing betting platforms.\n\nassistant: "

return full_response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
这一阶段的提示工程至关重要。我必须设计提示,以引导LLM提供:

1. 表在我们业务上下文中可能代表的详细描述。
2. 表的潜在用例,考虑不同部门可能如何查询它。
3. 与其他表的可能关系,帮助绘制我们的数据模型。
4. 任何数据质量考虑,例如潜在的空值或数据类型不一致。
5. 对于大型表的建议分区键,考虑查询优化。
6. 潜在的唯一标识符,这对后续的连接操作至关重要。

**关键在于:**


> 在处理此内容时,Claude模型通过Bedrock仅支持最多4096个输出令牌。这对于大多数用例来说是足够的,然而,对于某些包含超过100列的特殊表,它可能会导致错误。为了处理这个限制,我们首先查看第一个输出的响应是否包含**max\_token**作为**stop\_reason**。如果没有,则继续该过程,但如果存在**max\_token**,则需要将现有响应发送给LLM,并调整提示以请求从上一步继续生成。**记住:上下文令牌为200k,输出令牌仅为4096k。**

最终输出可能由于某些故障而无法解析为JSON。因此,我们需要将此错误写入txt文件以便后续审查。


```python
def save_documentation(table_name, documentation):
try:
json_content = documentation.split("```json")[1].split("```")[0].strip()
parsed_json = json.loads(json_content)

with open(f"{folder}/table_json/{table_name}.json", "w") as f:
json.dump(parsed_json, f, indent=2)
print(f"===Documentation saved for {table_name}")
except Exception as e:
print(f"===Error parsing documentation for {table_name}: {str(e)}")
with open(f"{folder}/{table_name}_doc_raw.txt", "w") as f:
f.write(documentation)
print(f"===Raw documentation saved for {table_name}")

这是存储模式的函数

1
2
3
4
5
6
7
def process_table(table):
print(f"Processing table {table['Name']}")
schema = extract_schema(table)
documentation = generate_documentation(schema)
table_name = f"{table['DatabaseName']}.{table['Name']}"
save_documentation(table_name, documentation)
print(f"===Documentation generated for {table['Name']}")

3. 双重索引:创建完美的结合

我开始使用普通的向量索引和基本的固定切片,但结果很糟糕,SQL生成器经常获取错误的表名和列名。由于切片将其切割掉,因此没有完整的上下文。

为了解决这个问题,我放弃了普通的切片算法,转而采用层次切片方法,并稍微改变了提示。这产生了更好的响应,并解决了第一种方法的所有问题。

但随后,向量索引忽略了一个关键方面,即表之间的关系。为此,我使用了知识图谱,结果非常出色。

总之,我们有两种索引方法:

向量索引

向量索引主要关注速度。我使用 OpenSearch 无服务器作为向量数据库,并使用 分层切分 作为切分算法。

知识图谱

虽然向量索引为我们提供了速度,但知识图谱则提供了深度。我使用 NetworkX 创建了我们整个数据模型的图形表示。每个表都成为一个节点,边缘表示表之间的关系。

定义这些关系的难点在于。有些关系很明显,比如外键关系,但其他关系则需要对我们的数据模型有更细致的理解。我实现了基于命名约定、公共前缀和第 2 步中丰富的元数据推断关系的逻辑。

这个知识图谱成为我们系统理解复杂多表查询的基础。它使 SQL Agent 能够以关系和路径的方式“思考”数据,类似于人类数据分析师的方式。

4. 魔法饮品:将问题翻译为 SQL

现在,进入最重要的部分,SQL Agent 本身。正如你所知道的,LlamaIndex 始终是我开发 AI Agent 时的首选。我使用过 LangChain 和其他开源工具,虽然 LangChain 发展成熟且拥有更大的社区支持,但有时它会增加不必要的复杂性。

以下是 SQL Agent 的实现

提示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def sql_agent_promt():
return """
You are an advanced AI assistant specialized in data analytics for <your domain database> with expert proficiency in Databricks Delta SQL.
Your primary role is to translate natural language queries into precise, executable SQL queries.
Follow these instructions meticulously:

Core Responsibilities:
- Always respond with an executable SQL query.
- Do NOT execute SQL queries; only formulate them.
- Utilize the vector database to access accurate schema information for all tables.

Process:
1. Understand User Input:
- Interpret the user's natural language query to comprehend their data requirements and objectives.
2. Retrieve Relevant Tables:
- Identify and retrieve the most relevant tables from the vector database that align with the user's query.
- Continue this step until you find all necessary tables for the query.
3. Verify Schema:
- For each relevant table, retrieve and confirm the exact schema.
- IMPORTANT: Pay special attention to column names, data types, and relationships between tables.
4. Formulate SQL Query:
- Construct a Databricks Delta SQL query using the confirmed schema information.
- Ensure all table and column names used in the query exactly match the schema.
5. Provide Professional Response
- Draft the SQL query as a seasoned senior business analyst would, ensuring clarity, accuracy, and adherence to best practices.
6. (Optional) Explanation
- If requested, provide a detailed explanation of the SQL query and its logic.

Response Format:
1. Begin with the SQL query enclosed in triple backticks (```).
2. Follow with a brief explanation of the query's purpose and how it addresses the user's request.
3. Include a schema confirmation section, listing the tables and columns used.
Guidelines:
- Prioritize query accuracy and performance optimization.
- Use clear and professional language in all responses.
- Offer additional insights to enhance user understanding when appropriate.
Error Handling:
If you lack information or encounter ambiguity, use the following format:
<clarification_request>
I need additional information to formulate an accurate query. Could you please:
- Provide more details about [specific aspect]?
- Confirm if the following tables and columns are relevant: [list potential tables/columns]?
- Clarify any specific time ranges, filters, or conditions for the data?
</clarification_request>

Schema Confirmation
Before providing the final query, always confirm the schema:
<schema_confirmation>
I'll be using the following schema for this query:
Table: [table_name1]
Columns: [column1], [column2], ...
Table: [table_name2]
Columns: [column1], [column2], ...
Are these the correct tables and columns for your query?
</schema_confirmation>

Example Response
<give your example here>
Remember to maintain a professional, clear, and helpful tone while engaging with users and formulating queries.
"""

该过程如下:

  1. 当用户提交查询时,我们首先使用向量索引快速识别潜在相关的表和列。
  2. 然后我们咨询知识图谱,以了解这些表之间的关系,并确定可能需要的其他表,以回答查询。
  3. 利用这些信息,我们为LLM构建一个提示,其中包括:
  • 用户的原始查询
  • 相关表和列的信息
  • 来自我们的知识图谱和向量数据库的上下文,关于这些表之间的关系
  • 我们编码的任何特定业务规则或常见实践

然后,LLM根据这些信息生成SQL查询。最后,我们通过验证步骤运行查询,以捕捉任何明显的错误或低效。

这是代理的代码:

记住我们上面创建的知识库,我们将其作为此代理中的工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
response_synthesizer = get_response_synthesizer(llm=llm)
query_engine = RetrieverQueryEngine(
retriever=sql_knowledgebase,
response_synthesizer=response_synthesizer,
)
query_engine_tools = [
QueryEngineTool(
query_engine=query_engine,
metadata=ToolMetadata(
name="database_retriever",
description="Have access to data catalog, that have all details about databases, schemas, tables, table columns and its attribute along with description."
),
),
...... <second tool for knownledge graph>
]
agent_worker = FunctionCallingAgentWorker.from_tools(
query_engine_tools,
llm=llm,
verbose=True,
allow_parallel_tool_calls=True,
system_prompt=sql_agent_promt()
)
agent = agent_worker.as_agent(memory=chat_memory)

此步骤的提示工程是所有步骤中最具挑战性的。我必须创建一个提示,以指导LLM编写正确、高效的SQL,同时解释其推理。这一解释组件对于建立用户的信任和帮助他们理解生成的查询至关重要。

结果:一个永不休眠的数据向导

我的SQL代理最终成为了我从未知道自己需要的MVP。就像拥有一个24/7的数据信使,永远不会搞错你的订单。

高级多代理架构现在

如您所见,我们在矩形框中有另一个代理。我们现在有两个代理,它们不断相互交流,以解决用户查询,只要查询与生成SQL或基于Confluence文档的一般问题有关。

学到的经验

在这段旅程中,我学到了一些宝贵的经验,我认为这些经验对任何从事类似项目的人都有益:

  1. 上下文为王:理解表的上下文和关系是生成准确查询的关键。
  2. 基于图的方式:知识图谱方法比普通索引更好地处理复杂查询。它使系统能够以一种接近人类推理的方式“思考”数据关系。
  3. 持续学习:我不断将新查询反馈到我们的系统中,以提高其性能。
  4. 可解释性至关重要:让系统解释其推理可以建立用户的信任,并帮助他们了解数据模型。这就像厨师解释复杂菜肴中的成分一样。
  5. 边缘案例是生活的调味料:处理边缘案例和不寻常的查询往往是获得最有趣见解的地方。这迫使我深入思考我们的数据模型以及用户如何与之互动。

结论

我从一个随意的评论到一个复杂的SQL代理的旅程表明,凭借正确的技术、创造力和一点咖啡因激发的灵感,我们可以解决甚至最复杂的数据挑战。

我创造的不仅仅是一个工具;我创造了一种全新的与数据互动的方式。它正在使每个团队成员成为潜在的数据美食家,从而实现洞察的民主化。

❤ 如果您觉得这篇文章对您有帮助,我非常感谢您给予我掌声。这对我意义重大,并证明了我的工作的价值。此外,您可以 订阅我的Substack ,因为我将在该频道中涵盖更深入的LLM开发。如果您有任何问题,请随时留言。我会尽快回答。

1
2
3
4
想要联系?
如果您需要联系,请随时通过我的
Twitter或LinkedIn给我发消息,并订阅我的Substack,因为我将在我的Substack频道中覆盖
更多学习实践,尤其是深入开发LLM的路径。

参考文献

  • 标题: SQL 生成器 2.0我如何为企业级构建 AI 查询向导支持 500 表
  • 作者: Barry
  • 创建于 : 2024-08-19 00:50:04
  • 更新于 : 2024-08-31 06:59:45
  • 链接: https://wx.role.fun/2024/08/19/d9f4eacd9c814728bb0e92038effb818/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。