多模态检索增强生成应用于现实案例  带代码

多模态检索增强生成应用于现实案例 带代码

Barry Lv6

构建一个与文本、图像、表格和音频交互的RAG系统的完整指南。

介绍

想象一下,贵公司的核心专长是评估新兴市场中的ESG(环境、社会和治理)因素,以便进行战略投资决策。作为该公司的金融分析师,您负责分析大量多样化的数据,以为这些关键选择提供信息。

如果您有一个智能系统,可以:

  • 自动处理各种性质的数据
  • 回答关于不同市场ESG因素的特定问题
  • 提供准确的见解,而不必担心因AI幻觉而导致的昂贵错误,这不是很好吗?

在本文中,您将了解到多模态检索增强生成(RAG)如何创建这样的系统,使您能够:

  • 同时分析多种数据类型,包括PDF、图像和音频
  • 利用大型语言模型(LLMs)的优势,同时减轻其局限性
  • 在新兴市场中做出更明智和可靠的投资决策

多模态和检索增强生成用于ESG分析

检索增强生成和多模态学习是两个不同的基本组成部分,结合在一起会形成一个更强大的工具。让我们清晰地理解它们各自的含义。

什么是多模态学习?

多模态学习是指人工智能系统同时处理和理解多种感官信息的能力,例如文本、音频、图像、视频,与传统的单模态人工智能工具不同,后者仅训练用于特定任务的单一类型数据。

多模态学习模拟人类感知,整合来自各种感官的信息,以创建对世界的全面理解。在ESG的背景下,多模态学习可以结合多种数据源,例如:

  • 文本:报告、新闻文章、政策文件作为文本数据
  • 图像:卫星图像、环境影响视觉作为图像数据
  • 音频:讨论ESG战略和表现的会议录音
  • 表格:结构化财务数据、ESG评级和关键绩效指标

为什么在ESG分析中使用多模态学习?

多模态学习为ESG分析师提供了许多好处:

  • 通过整合多种数据类型,分析师可以做出更明智的投资决策
  • 多模态模型能够很好地处理分布转变,使其能够在ESG分析中跨不同数据类型和来源进行泛化。
  • ESG分析师可以开发出更有意义和更好的复杂ESG因素的表示,捕捉到单一模态方法可能遗漏的细微差别。
  • 多模态学习使得跨越不同数据类型的任务成为可能,例如生成环境影响视觉的文本描述或根据其他文档(如PDF)中的文本和图像回答关于ESG政策的问题。

多模态学习的挑战

像任何人工智能实施方法一样,多模态学习也有其自身的挑战,并且有必要意识到这些挑战。主要的挑战如下所示:

  • 多模态表示可能会很棘手,因为每种数据类型都有其独特之处,以及我们不想丢失的相关细节。
  • 有时,在进行多模态翻译时,我们需要将给定的信息转换为另一种类型,例如获取卫星图像的文本描述。这种翻译可能无法完全正确,导致信息丢失。
  • 在事实检查场景中,多模态对齐至关重要,如果处理不当可能导致错误的决策。例如,在比较公司在特定市场中的定位时,我们可能希望检查这些公司的可持续发展报告是否与其环境实践一致。
  • 在ESG的背景下,多模态融合对于进行全面的ESG评估非常重要。专家可以合并财务报表、新闻文章、社交媒体情绪和环境监测数据,以获得全面的ESG评分,用于投资建议。
  • 我们主要提到多模态共同学习时,当我们在一个领域的信息不足时,我们尝试利用其他领域的知识来填补空白。这具有挑战性,因为不同类型的信息并不总是能够轻松地从一个上下文转移到另一个上下文。
  • 音频数据主要包含多个说话者,识别在何时谁在说话可以导致更稳健的答案,这使得模型能够捕捉到正在讲话的人。这个过程称为语音分离,是语音识别任务中的一个常见挑战。

多模态检索增强生成 (RAG)

在ESG分析中,准确性至关重要。虽然传统的大型语言模型功能强大,但有时会产生过时或无来源的信息以及不透明的推理。

使用此类工具可能增加低效投资的风险,而在做出关键投资决策时,没有任何金融分析师能够承担这种风险。

这就是检索增强生成 (RAG) 的作用所在。

基于多模态学习的基础,多模态RAG通过整合来自外部来源的相关、最新的ESG数据,提高大型语言模型的准确性。

在ESG分析的背景下,多模态RAG提供以下优势:

  • 检索:系统搜索多种数据类型(文本、图像、音频等),以查找与给定ESG查询相关的信息。
  • 增强:来自多个模态的检索信息被结合并进行上下文化处理。
  • 生成:大型语言模型使用增强的信息生成准确、信息丰富的响应。

这种方法通过以下方式解决了传统大型语言模型的局限性:

  • 最小化错误信息和过时见解
  • 提供针对ESG相关查询的上下文特定答案
  • 用最新的市场数据丰富模型的知识库

通过在ESG分析中利用多模态RAG,金融分析师可以做出更明智和可靠的投资决策。

让我们全面了解以下针对我们用例的多模态检索增强生成工作流程,特别关注Retriever模块和Augmented Generation

完整源代码可在我的GitHub 上获取,并可以下载以便跟随本文。

检索组件

该部分是分析师的查询用于从向量数据库中检索最相似的块,过程如下:

  • 首先,分析师提交一个查询以寻找答案。在此示例中,用户的查询是:

2024年第一季度全球可持续基金的净流入总额是多少?

  • 然后,查询通过嵌入模型进行嵌入,并将其发送到向量数据库,以提取与分析师查询相关的前N个相似块/文档。

增强生成组件

这是链条的最后一个组件,负责生成用户的最终答案。过程如下所示:

  • 与分析师查询最相似的块/文档与实际查询结合,以创建用户的响应。

使用 Weaviate 构建多模态 RAG 以支持 ESG

本节重点介绍上述每个架构组件的技术实现。在深入实现之前,了解使数据对生成模型可消化的组件是很重要的:数据建模

如果您更喜欢观看视频演示,请查看以下链接。

数据建模

想象一下在两个系统之间进行选择,这两个系统都能够提供准确的响应。然而,第二个系统更进一步,还提供额外的信息,例如 页码段落编号源文档网址源音频,甚至答案的示例 图像

分析师更可能选择这两个系统中的哪个?

即使没有 ESG 方面的专业知识,我也可以自信地说,选择将更倾向于第二个系统。这主要是由于建模部分,它为模型准备数据,以提供如此详细的信息。

选择第二个系统可以带来以下好处:

  • 投资验证:这使分析师能够验证投资建议背后的事实,这对于新兴市场中的高风险决策至关重要。
  • 多模态上下文:引用特定的图像、音频片段或 PDF 的特定部分,为 ESG 因素提供更丰富的上下文,这些因素可能无法仅通过文本完全捕捉。
  • 审计轨迹:在系统的响应与来源之间保持清晰的参考,为内部审查和外部审计创建了有价值的审计轨迹。配备这些功能的系统对分析师更透明且更有用。

本文涵盖的主要文档类型是包含 图像表格原始文本PDF。除了这些类型的 PDF 文件外,音频 文件也被考虑在内。

上述系统 1 和 2 的示例适用于响应生成来自文本数据的场景,例如原始文本数据。

让我们全面了解在处理其余类型数据时的期望。

  1. 图像数据

对于图像数据,我们有以下细节:

  • 页码:图像所在的页码。
  • 源文档:图像所在的文档。
  • 图像路径:图像保存的绝对路径。这在将图像嵌入响应中以快速可视化时非常有用。

2. 表格数据

表格与图像有类似的结构:

  • 页码:表格所在的页码。
  • 源文档:表格所在的文档。

3. 音频数据

如果答案来自音频源,我们可能还想知道响应的来源,在这种情况下是原始 YouTube 页面讨论的链接。

很好,我们对建模过程有了全面的了解。现在的问题是如何做到这一点!

这将在接下来的部分中讨论。

数据收集

为了成功实施我们的用例,我们利用以下数据。这两个YouTube视频是免费提供的,PDF文件在使用电子邮件注册后可以免费使用。

先决条件

为了正确实施代码并避免软件包冲突和安装问题,建议创建一个虚拟环境,如下所示:

  • 创建一个名为weaviate_venv的虚拟环境。
1
python3 -m venv weaviate_venv
  • 激活虚拟环境
1
source weaviate_venv/bin/activate

此命令激活虚拟环境。在终端提示符中应显示(weaviate_venv),表示虚拟环境已激活。

  • 在安装ipykernel后,将环境链接到jupyter notebook
1
2
pip install ipykernel
python -m ipykernel install --user --name=weaviate_venv

最后一条命令将虚拟环境weaviate_venv链接到Jupyter Notebook。

现在,在启动Jupyter Notebook后,我们可以选择weaviate_venv作为内核,以使用此环境的Python解释器和已安装的软件包运行我们的笔记本,如下所示:

1. 音频数据

这些数据的原始格式是视频,因此在下载后需要经过中间步骤将其转换为音频。

此过程是通过辅助类YouTubeAudioDownloader实现的,该类依赖于用于从YouTube下载视频的pytube Python库。

在成功安装pytube包后,可以如下使用它,此外还需要osre库。

1
pip install pytube

现在我们可以导入它们

1
2
3
from pytube import YouTube
import os
import re

最后,YouTubeAudioDownloader类的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class YouTubeAudioDownloader:
def __init__(self, output_folder):
self.output_folder = os.path.abspath(output_folder)
self.audio_files_dict = {}

def get_safe_filename(self, filename):
safe_filename = re.sub(r'[^\w\-.]', '_', filename)
safe_filename = re.sub(r'_+', '_', safe_filename)
safe_filename = safe_filename[:50].strip('_')
return safe_filename
def download_audio(self, video_url):
try:
yt = YouTube(video_url)
video = yt.streams.filter(only_audio=True).first()
safe_title = self.get_safe_filename(yt.title)
safe_title = safe_title.replace(' ', '_')
out_file = video.download(output_path=self.output_folder, filename=safe_title)
base, ext = os.path.splitext(out_file)
new_file = base + '.mp3'
os.rename(out_file, new_file)
print(f"音频文件已下载:{new_file}")
self.audio_files_dict[video_url] = new_file
return new_file
except Exception as e:
print(f"从{video_url}下载音频时出错:{str(e)}")
return None
def download_multiple_audios(self, video_urls):
for url in video_urls:
print(f"正在处理视频:{url}")
audio_file = self.download_audio(url)
if audio_file is None:
print(f"从视频下载音频失败:{url}")
return self.audio_files_dict

让我们理解这里发生了什么:

  • YouTubeAudioDownloader类的输入是一个用于保存下载音频文件的文件夹。它有以下三个主要功能:
  • get_safe_filename清理文件名,以避免在保存音频文件时出现空格和特殊字符的问题。这确保了后续分析的顺利加载。
  • download_audio从单个YouTube视频收集音频并将其保存为MP3格式。
  • download_multiple_audios处理从一组YouTube链接下载音频。

通过指定输出目录data和两个YouTube URL,我们可以成功执行下载。

1
2
3
4
5
6
7
8
downloader = YouTubeAudioDownloader(output_folder="../data")

video_urls = ["https://www.youtube.com/watch?v=qP1JKWBBy80",
"https://www.youtube.com/watch?v=_p58cZIHDG4"]
audio_files = downloader.download_multiple_audios(video_urls)
print("已下载音频文件:")
for audio_file in audio_files:
print(audio_file)

下载后,我们在音频文件夹中获得以下音频文件:

2. PDFs、图片和表格

PDF文件是通过在注册个人信息(如电子邮件、名字和姓氏)后简单下载过程获得的。

PDF文件看起来是这样的:

数据处理

数据处理的目标是将所有数据类型规范化为相同的格式,通过将其转换为文本,然后再转换为向量。

这可以通过以下方式实现:

  • .mp3 文件转录为其文本表示形式。
  • 从 PDF 文件中提取每个图像和表格的文本摘要。
  • 保持原始文本数据的文本格式。
  1. 音频转录

OpenAI 的 Whisper 是执行此任务的理想选择。除了转录外,它还提供多种语言的语音识别、翻译和语言识别功能。

Whisper 需要安装 ffmpeg 才能正常工作,可以按照 官方页面 上的说明在任何系统上安装。

1
pip install openai-whisper

接下来,使用 AudioTranscriber 类转录每个音频文件,生成一个包含三个主要键的字典:

  • 原始视频的 URL
  • 音频文件的路径
  • 音频的转录内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AudioTranscriber:
def __init__(self, input_folder):
self.input_folder = os.path.abspath(os.path.join(os.getcwd(), input_folder))
self.whisper_model = None
self.transcriptions_dict = {}


def transcribe_audio(self, audio_file):
try:
if not os.path.exists(audio_file):
print(f"音频文件未找到: {audio_file}")
return None

file_size = os.path.getsize(audio_file)
if file_size == 0:
print(f"音频文件为空: {audio_file}")
return None

transcription = self.whisper_model.transcribe(audio_file)
return transcription["text"]
except Exception as e:
print(f"transcribe_audio 中出错: {str(e)}")
return None
def transcribe_all_audios(self, audio_files_dict):
for url, audio_path in audio_files_dict.items():
if not audio_path.endswith('.mp3'):
print(f"跳过非 mp3 文件: {audio_path}")
continue
transcription = self.transcribe_audio(audio_path)

if transcription is not None:
# 添加到转录字典
self.transcriptions_dict[url] = {
'url': url,
'audio_path': audio_path,
'transcription': transcription
}
else:
print(f"音频转录失败: {audio_path}")
return self.transcriptions_dict

使用两个主要函数来执行转录任务:

  • transcribe_audio 为单个音频文件生成转录。
  • transcribe_all_audio 则利用 transcribe_audio 为所有音频文件生成转录。

Whisper 提供五种不同的模型大小。模型越大,性能越好,但需要更多内存,因此加载时间也更长。

对于我们的用例,我们使用 Whisper 的中型版本,以在性能和速度之间取得平衡。

在使用模型之前,我们需要导入 torchWhisper,然后设置设备以使用 CPU 或 CUDA GPU。

1
2
3
4
5
# 设置设备
device = "cuda" if torch.cuda.is_available() else "cpu"

# 加载模型
whisper_model = whisper.load_model("medium", device=device)

现在我们按如下方式触发转录过程:

1
2
3
4
5
6
7
8
9
transcriber = AudioTranscriber(input_folder=r"../data")

transcriber.whisper_model = whisper_model
transcriptions_dict = transcriber.transcribe_all_audios(audio_files)
for url, data in transcriptions_dict.items():
print(f"URL: {url}")
print(f"音频文件: {data['audio_path']}")
print(f"转录: {data['transcription'][:100]}...") # 打印前 100 个字符
print("---")

成功执行上述代码将生成以下结果,显示每个转录的前一百个字符。

现在,我们创建完整的音频数据,形成一个字典列表,其中每个字典是每个音频文件的转录及附加元数据。

1
2
3
4
5
6
7
8
9
10
11
12
import json

audio_data = [
{
"url": value["url"],
"audio_path": value["audio_path"],
"transcription": value["transcription"]
}
for value in transcriptions_dict.values()
]
# 打印结果
print(json.dumps(audio_data, indent=2))

截断结果如下所示:

2. 图像、表格和文本

通过利用 unstructured 库,我们可以从给定的 PDF 文件中提取所有表格、图像和原始文本数据。

该库需要安装 pillowpdf-minermatplotlibunstructured-inferenceunstructured-pytesseracttesseract-ocr,可以通过以下方式在笔记本中安装:

1
2
3
4
5
6
7
%%bash
pip install pdfminer.six
pip install pillow-heif==0.3.2
pip install matplotlib
pip install unstructured-inference
pip install unstructured-pytesseract
pip install tesseract-ocr

现在我们导入 partition_pdf 函数,该函数用于将给定的 PDF 文件分割成不同的组件,例如图像、表格和原始文本。

1
from unstructured.partition.pdf import partition_pdf
  • 原始数据提取

接下来,加载目标 ESG 报告,同时将 extract_images_in_pdf 参数设置为 True,这样可以将图像以高分辨率保存到 images 文件夹中。

1
esg_report_path = "../data/Global_ESG_Q1_2024_Flows_Report.pdf”
1
2
3
4
5
6
7
esg_report_raw_data =partition_pdf(
filename=esg_report_path,
strategy="hi_res",
extract_images_in_pdf=True,
extract_image_block_to_payload=False,
extract_image_block_output_dir="../data/images/"
)

从上述 esg_report_raw_data 中,我们可以提取文本、表格和图像。

  • 图像提取

辅助函数 extract_image_metadata 用于创建图像及其元数据的列表,如数据建模部分所述。

1
from unstructured.documents.elements import Image
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def extract_image_metadata(esg_report, source_document):
image_data = []

for element in esg_report:
if isinstance(element, Image):
page_number = element.metadata.page_number
image_path = element.metadata.image_path if hasattr(element.metadata, 'image_path') else None

image_data.append({
"source_document": source_document,
"page_number": page_number,
"image_path": image_path
})

return image_data

通过将 extract_image_metadata 函数应用于报告和原始数据,我们获得每个图像的基础元数据。

1
2
extracted_image_data = extract_image_metadata(esg_report_raw_data, 
esg_report_path)

使用 display_images_from_metadata 函数,我们可以显示每个图像及其提取的页码。这对于可视化图像及其相应的页面来源非常有用。

1
2
3
import matplotlib.pyplot as plt
from PIL import Image
import math

我们首先导入相关库,如 matplotlibImagemath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def display_images_from_metadata(extracted_image_data, images_per_row=4):
valid_images = [img for img in extracted_image_data if img['image_path']]
if not valid_images:
print("没有有效的图像数据可用。")
return
num_images = len(valid_images)
num_rows = math.ceil(num_images / images_per_row)

fig, axes = plt.subplots(num_rows, images_per_row, figsize=(20, 5*num_rows))
axes = axes.flatten() if num_rows > 1 else [axes]
for ax, img_data in zip(axes, valid_images):
try:
img = Image.open(img_data['image_path'])
ax.imshow(img)
ax.axis('off')
ax.set_title(f"第 {img_data['page_number']} 页", fontsize=10)
except Exception as e:
print(f"加载图像 {img_data['image_path']} 时出错: {str(e)}")
ax.text(0.5, 0.5, f"加载图像时出错\n{str(e)}", ha='center', va='center')
ax.axis('off')
for ax in axes[num_images:]:
fig.delaxes(ax)
plt.tight_layout()
plt.show()

使用辅助函数,最多可以每行显示四个图像。

所有 37 张图像已显示,并附上其各自的页码。例如:

  • 第一页只有一张图像,即晨星的标志。
  • 第二页没有图像。
  • 第三页有一张图像,第四页有两张图像。
  • 文本提取

PDF 中的每条原始文本信息由 NarrativeText 组件表示,可以用来定位和提取所有文本数据。此外,每个 NarrativeText 具有一个 page_number 属性,可用于标识段落编号。

1
from unstructured.documents.elements import NarrativeText

extract_text_with_metadata 函数用于提取所有这些属性,包括实际文本和源文档。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def extract_text_with_metadata(esg_report, source_document):
text_data = []
paragraph_counters = {}

for element in esg_report:
if isinstance(element, NarrativeText):
page_number = element.metadata.page_number

if page_number not in paragraph_counters:
paragraph_counters[page_number] = 1
else:
paragraph_counters[page_number] += 1

paragraph_number = paragraph_counters[page_number]

text_content = element.text
text_data.append({
"source_document": source_document,
"page_number": page_number,
"paragraph_number": paragraph_number,
"text": text_content
})
return text_data

运行该函数后,结果保存在 extracted_data 属性中:

1
2
extracted_data = extract_text_with_metadata(esg_report_raw_data, 
esg_report_path)
  • 表格提取

最后一步与之前的步骤相似,专注于提取表格内容。每个表格元素由 Table 组件表示。

1
from unstructured.documents.elements import Table

辅助函数 extract_table_metadata 用于获取表格数据和元数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def extract_table_metadata(esg_report, source_document):
table_data = []

for element in esg_report:
if isinstance(element, Table):
page_number = element.metadata.page_number

# 将表格内容提取为字符串
table_content = str(element)

table_data.append({
"source_document": source_document,
"page_number": page_number,
"table_content": table_content
})

return table_data

最终结果保存在 extracted_table_data 属性中,如下所示:

1
2
extracted_table_data = extract_table_metadata(esg_report_raw_data, 
esg_report_path)

图像和表格内容摘要

在这个阶段,所有数据类型都已收集,最后一步是将每个图像和表格转换为其文本描述,以便进行简洁准确的表示。这是通过提示工程实现的。

  • 每个表格使用 tables_summarizer_prompt 进行摘要。
  • 每个图像使用 images_summarizer_prompt 进行摘要。

为了确保模型有效描述占位符 {} 中指定的表格和图像,需要提供清晰的指示。提示定义如下:

1
2
3
4
5
6
7
tables_summarizer_prompt = """
作为新兴市场投资的ESG分析师,请提供表格内容的简洁而准确的摘要。
重点关注关键的ESG指标(环境、社会、治理)及其与新兴市场的相关性。
突出数据中的重要趋势、比较或异常值。 识别对投资策略或风险评估的潜在影响。
避免使用项目符号;相反,提供一个连贯、事实性的摘要,捕捉表格的本质,以便进行ESG投资决策。表格: {table_content}
将您的摘要限制为3-4句,确保其对新兴市场的ESG分析精确且信息丰富。"""

1
2
3
4
5
6
images_summarizer_prompt = """
作为新兴市场投资的ESG分析师,请提供图像的简洁而准确的描述。
重点关注与ESG相关的内容(环境、社会、治理)及任何新兴市场背景。描述视觉类型(例如,图表、照片、信息图)及其关键元素。
突出与投资分析相关的重要数据点或趋势。避免使用项目符号;相反,提供一个连贯、事实性的摘要,捕捉图像的本质,以便进行ESG投资决策。图像: {image_element}
将您的描述限制为3-4句,确保其对ESG分析精确且信息丰富。"""

一旦提示定义完成,我们利用OpenAI的 GPT-4O 模型生成摘要。这需要拥有OpenAI凭证。

在继续之前,我们需要安装 langchain-corelangchain-openai 库,然后导入 ChatPromptTemplateChatOpenAI 模块。

1
2
3
%%bash
pip install langchain-core
pip install langchain-openai

现在,我们设置环境以使用该模型。

1
2
3
OPENAI_API_TOKEN="YOUR KEY"
model_ID = "gpt-4o"
os.environ["OPENAI_API_KEY"] = OPENAI_API_TOKEN

最后,辅助函数 extract_table_metadata_with_summaryextract_image_metadata_with_summary 被用来生成给定表格和图像的摘要/描述,以及初始元数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def extract_table_metadata_with_summary(esg_report, 
source_document,
tables_summarizer_prompt):
table_data = []
prompt = ChatPromptTemplate.from_template(tables_summarizer_prompt)
for element in esg_report:
if isinstance(element, Table):
page_number = element.metadata.page_number

table_content = str(element)

# 使用OpenAI模型生成摘要
messages = prompt.format_messages(table_content=table_content)
description = description_model.predict_messages(messages).content

table_data.append({
"source_document": source_document,
"page_number": page_number,
"table_content": table_content,
"description": description
})

return table_data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def extract_image_metadata_with_summary(esg_report_raw_data, 
esg_report_path,
images_summarizer_prompt):

image_data = []

# 创建ChatPromptTemplate实例
prompt = ChatPromptTemplate.from_template(images_summarizer_prompt)

# 创建ChatOpenAI实例
description_model = ChatOpenAI(model=model_ID)

for element in esg_report_raw_data:
if "Image" in str(type(element)):
page_number = element.metadata.page_number if hasattr(element.metadata, 'page_number') else None
image_path = element.metadata.image_path if hasattr(element.metadata, 'image_path') else None

if image_path and os.path.exists(image_path):
# 使用OpenAI模型生成描述
messages = prompt.format_messages(image_element=image_path)
description = description_model.predict_messages(messages).content

# 读取图像文件并将其编码为base64
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')

image_data.append({
"source_document": esg_report_path,
"page_number": page_number,
"image_path": image_path,
"description": description,
"base64_encoding": encoded_string
})
else:
print(f"警告:第 {page_number} 页的图像文件未找到或路径不可用")

return image_data

对于每个图像,创建了一个 base64 编码,这在显示图像时可以用来替代物理的 .png 文件。

现在我们提取两种数据类型的结果,如下所示:

1
2
3
extracted_table_data_with_summary = extract_table_metadata_with_summary(esg_report_raw_data, 
esg_report_path,
tables_summarizer_prompt)

以下是 print 语句的前几个表格的截断结果。

1
2
3
4
5
for table in extracted_table_data_with_summary:
print(f"第 {table['page_number']} 页的表格:")
print(f"表格来源: {table['source_document']}")
print(f"描述: {table['description']}")
print("---")

通过对图像应用类似的方法,我们得到了以下结果:

1
2
3
extracted_image_data = extract_image_metadata_with_summary(esg_report_raw_data,
esg_report_path,
images_summarizer_prompt)
1
2
3
4
5
6
for image in extracted_image_data:
print(f"第 {image['page_number']} 页的图像:")
print(f"路径: {image['image_path']}")
print(f"描述: {image['description']}")
print(f"Base 64: {image['base64_encoding']}")
print("---")

并且,上述 print 语句的截断结果如下所示:

完美!所有数据已准备好进行摄取。但在此之前,我们需要将它们转换为相同的嵌入空间。

数据摄取

本节的术语是向量数据库,目前有多个提供商,包括开源和付费。然而,我们的用例主要集中在 Weaviate

但是,为什么选择 Weaviate,而不是其他向量数据库呢?

Weaviate 是一个开源的向量数据库,旨在存储对象及其对应的向量。它提供高效的向量搜索和结构化过滤功能,使数据检索更加有效。

此外,它支持多种数据类型,包括文本和图像,并且是模块化的、云原生的和实时的,便于可扩展的机器学习模型。

它与流行的 AI 服务和框架无缝集成,为构建 AI 原生应用程序提供了强大的基础。

所有这些原因使其成为我们用例的完美选择。本节涵盖了从创建 Weaviate 账户、设置向量数据库实例到摄取所有数据的所有步骤。

例如,原始表格、图像、文本和音频已被转换为一个共同的嵌入空间。

这允许不同类型的文档相互比较。这是将数据存储在向量数据库中的关键过渡步骤。

图形插图展示了与气候变化和空气污染相关的文档是如何紧密相连的,与劳动实践和人权相关的文档也是如此。相反,不同组别的文档则明显分开。

这种接近性是通过向量相似性搜索自动确定的,这是后续章节将要讨论的主题。

  1. 创建 Weaviate 云账户

成功完成数据摄取过程需要以下信息:

  • 拥有一个 Weaviate 账户。这可以通过云账户页面使用电子邮件和密码完成。
  • 已经拥有 OpenAI 凭证。

在开始数据摄取之前,必须拥有到目前为止处理的所有数据的向量表示。此过程从创建向量数据库实例开始,登录云账户后按照以下四个步骤进行:

  • 选择 Create cluster 以启动创建用于托管向量数据库实例的集群。
  • 选择 Free sandbox 选项。
  • 为集群提供一个有意义的名称;我们的名称是 esg-rag-vector-instance
  • 最后点击 Create 完成集群创建。

点击 Create 按钮后,创建实例可能需要几分钟。所有实例都显示在 Weaviate Clusters 部分,我们可以看到我们的实例已成功创建,名称为 esg-rag-vector-instance-aufn6coj,其中 aufn6coj 是该实例在集群中的唯一标识符。

我们还注意到向量数据库当前为空,这是正常的,因为我们尚未摄取任何数据。下一节将涵盖定义数据库模式和摄取数据的步骤。

2. 连接到 Weaviate 向量数据库

摄取数据的第一步是创建一个 Weaviate 客户端,以便我们可以:

  • 连接到向量数据库。
  • 为要摄取到数据库中的数据创建一个集合。

让我们开始安装 Weaviate 客户端,如下所示:

1
pip install weaviate-client

接下来,我们导入模块,使用集群的 URLAPIKEY 设置环境变量。

1
2
3
4
import weaviate

URL = os.getenv("WCS_URL")
APIKEY = os.getenv("WCS_API_KEY")

通过 connect_to_wcs 函数,我们可以通过指定上述变量和初始 OpenAI 令牌来连接到向量数据库。

1
2
3
4
5
6
7
client = weaviate.connect_to_wcs(
cluster_url=URL,
auth_credentials=weaviate.auth.AuthApiKey(APIKEY),
headers = {
"X-OpenAI-Api-Key": OPENAI_API_TOKEN
}
)

等等,为什么我们在 Weaviate 中使用 OpenAI?

与 OpenAI 的集成使我们能够:

  • 将对象直接导入 Weaviate,而无需手动指定嵌入。
  • 使用来自其他提供商的生成模型构建我们的 RAG 管道。例如,我们可以使用 CohereAWSGoogleHugging FaceAzure OpenAIMistral更多 的模型。

3. 创建多模态 ESG 集合

通过指定以下参数来创建集合:

  • 集合的 name,对于我们的用例是 ESGDocument
  • properties 为集合的所有属性列表。
  • vectorizer_config 提供要使用的嵌入模型的详细信息;我们使用的是来自 OpenAI 的 text-embedding-3-large 模型。

我们的集合定义如下,属性包括:

  • TEXT 字段,如 source_documentdescriptionaudio_pathtexttable_contenttranscriptioncontent_typeurl
  • Numeric 字段,如 page_numberparagraph number
  • BLOB 用于 base64 编码。
1
import weaviate.classes.config as wc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
properties = [
wc.Property(name="source_document", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="page_number", data_type=wc.DataType.INT, skip_vectorization=True),
wc.Property(name="paragraph_number", data_type=wc.DataType.INT, skip_vectorization=True),
wc.Property(name="text", data_type=wc.DataType.TEXT),
wc.Property(name="image_path", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="description", data_type=wc.DataType.TEXT),
wc.Property(name="base64_encoding", data_type=wc.DataType.BLOB, skip_vectorization=True),
wc.Property(name="table_content", data_type=wc.DataType.TEXT),
wc.Property(name="url", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="audio_path", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="transcription", data_type=wc.DataType.TEXT),
wc.Property(name="content_type", data_type=wc.DataType.TEXT, skip_vectorization=True),
]

我们将 skip_vectorization 参数设置为 True,以便对于不需要向量化的属性。只有文本数据、图像描述、音频转录和表格描述等属性需要向量化以进行搜索。

现在,我们可以使用 create 函数创建集合,同时将 vectorizer_config 设置为 None。这告诉 Weaviate,我们将在将数据上传到向量数据库时指定自己的向量化器。

1
2
3
4
5
client.collections.create(
name="ESGDocuments",
properties=properties,
vectorizer_config=None
)

Collections 选项卡中,我们可以观察到所有十二个属性已被创建,以及正在使用的嵌入模型。

4. 摄取数据

集合已正确设置以进行数据摄取,以下辅助函数被用于将数据摄取到向量数据库中:每种特定数据类型一个函数,最后一个函数利用这些独特的函数来摄取所有数据。

我们首先导入相关库,如下所示:

1
2
3
4
from weaviate.util import generate_uuid5
from tqdm import tqdm
from openai import OpenAI
openai_client = OpenAI()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# 获取嵌入的函数
def get_embedding(text):
response = openai_client.embeddings.create(
input=text,
model="text-embedding-3-large"
)
return response.data[0].embedding


# 摄取音频数据的函数
def ingest_audio_data(collection, audio_data):
with collection.batch.dynamic() as batch:
for audio in tqdm(audio_data, desc="摄取音频数据"):
vector = get_embedding(audio['transcription'])
audio_obj = {
"url": audio['url'],
"audio_path": audio['audio_path'],
"transcription": audio['transcription'],
"content_type": "audio"
}
batch.add_object(
properties=audio_obj,
uuid=generate_uuid5(audio['url']),
vector=vector
)
def ingest_text_data(collection, text_data):
with collection.batch.dynamic() as batch:
for text in tqdm(text_data, desc="摄取文本数据"):
vector = get_embedding(text['text'])
text_obj = {
"source_document": text['source_document'],
"page_number": text['page_number'],
"paragraph_number": text['paragraph_number'],
"text": text['text'],
"content_type": "text"
}
batch.add_object(
properties=text_obj,
uuid=generate_uuid5(f"{text['source_document']}_{text['page_number']}_{text['paragraph_number']}"),
vector=vector
)
def ingest_image_data(collection, image_data):
with collection.batch.dynamic() as batch:
for image in tqdm(image_data, desc="摄取图像数据"):
vector = get_embedding(image['description'])
image_obj = {
"source_document": image['source_document'],
"page_number": image['page_number'],
"image_path": image['image_path'],
"description": image['description'],
"base64_encoding": image['base64_encoding'],
"content_type": "image"
}
batch.add_object(
properties=image_obj,
uuid=generate_uuid5(f"{image['source_document']}_{image['page_number']}_{image['image_path']}"),
vector=vector
)
def ingest_table_data(collection, table_data):
with collection.batch.dynamic() as batch:
for table in tqdm(table_data, desc="摄取表格数据"):
vector = get_embedding(table['description'])
table_obj = {
"source_document": table['source_document'],
"page_number": table['page_number'],
"table_content": table['table_content'],
"description": table['description'],
"content_type": "table"
}
batch.add_object(
properties=table_obj,
uuid=generate_uuid5(f"{table['source_document']}_{table['page_number']}"),
vector=vector
)
def ingest_all_data(collection_name, audio_data, text_data, image_data, table_data):
collection = client.collections.get(collection_name)
ingest_audio_data(collection, audio_data)
ingest_text_data(collection, text_data)
ingest_image_data(collection, image_data)
ingest_table_data(collection, table_data)
if len(collection.batch.failed_objects) > 0:
print(f"导入 {len(collection.batch.failed_objects)} 个对象失败")
else:
print("所有对象成功导入")

最后,使用 ingest_all_data 函数在 ESGDocument 集合中摄取数据。

1
2
3
4
5
6
ingest_all_data(collection_name="ESGDocument", 
audio_data=audio_data,
text_data=extracted_data,
image_data=extracted_image_data,
table_data=extracted_table_data_with_summary
)

数据导入后,我们统计了252个对象,这对应于所有对象的总数,包括文本、图像、表格和音频。

构建多模态 RAG 用于 ESG

本节包含实现多模态 RAG 搜索的所有步骤,从最近搜索到实现提示以增强大型语言模型的响应。

  1. 最近搜索

最近搜索逻辑在 search_multimodal 中实现,默认生成前三个结果。

该函数允许根据查询的含义进行文本、音频、图像和表格数据的语义搜索,而不是精确的关键词匹配。

匹配的对象将返回其所有属性。这对于捕捉特定于每种数据类型的属性非常有用。

1
import weaviate.classes.query as wq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def search_multimodal(query: str, limit: int = 3):
query_vector = get_embedding(query)

esg_documents = client.collections.get("ESGDocument")

response = esg_documents.query.near_vector(
near_vector=query_vector,
limit=limit,
return_metadata=wq.MetadataQuery(distance=True),
return_properties=[
"content_type", "url", "audio_path", "transcription",
"source_document", "page_number", "paragraph_number", "text",
"image_path", "description", "table_content"
]
)

return response.objects

search_and_print_results 函数通过适当地格式化输出显示搜索结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def search_and_print_results(query, limit=5):

search_results = search_multimodal(query, limit)
print(f"Search Results for query: '{query}'")
for item in search_results:
print(f"Type: {item.properties['content_type']}")
if item.properties['content_type'] == 'audio':
print(f"URL: {item.properties['url']}")
print(f"Transcription: {item.properties['transcription'][:100]}...")
elif item.properties['content_type'] == 'text':
print(f"Source: {item.properties['source_document']}, Page: {item.properties['page_number']}")
print(f"Text: {item.properties['text'][:100]}...")
elif item.properties['content_type'] == 'image':
print(f"Source: {item.properties['source_document']}, Page: {item.properties['page_number']}")
print(f"Description: {item.properties['description']}")
elif item.properties['content_type'] == 'table':
print(f"Source: {item.properties['source_document']}, Page: {item.properties['page_number']}")
print(f"Description: {item.properties['description']}")
print(f"Distance to query: {item.metadata.distance:.3f}")
print("---")
return search_results

现在,让我们找到与以下查询最相似的前三个条目:

1
2
query = "What are the main environmental challenges in renewable energy?"
search_and_print_results(query)

搜索的截断结果如下所示:

2. 设置生成提示

向分析师提供上述结果可能会造成困惑,并且可能不会给他们的体验带来任何价值。这就是增强生成部分发挥作用的地方。

为此,我们需要在 generate_response 函数中定义生成模型使用的提示。它接受用户的问题和相关上下文,然后使用 AI(GPT-4)为新兴市场创建专家 ESG 分析答案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def generate_response(query: str, context: str) -> str:
prompt = f"""
You are an AI assistant specializing in ESG (Environmental, Social, and Governance) analysis for emerging markets.
Use the following pieces of information to answer the user's question.
If you cannot answer the question based on the provided information, say that you don't have enough information to answer accurately.
Context:
{context}
User Question: {query}
Please provide a detailed and accurate answer based on the given context:
"""
response = openai_client.chat.completions.create(
model="gpt-4-1106-preview",
messages=[
{"role": "system", "content": "You are an expert ESG analyst for emerging markets."},
{"role": "user", "content": prompt}
],
temperature=0
)
return response.choices[0].message.content

esg_analysis 函数利用 search_multimodalgenerate_response 函数为用户生成最终响应。

该函数由于最后的数据格式化而较长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def esg_analysis(user_query: str):
# Step 1: Retrieve relevant information
search_results = search_multimodal(user_query)
# Step 2: Prepare context for RAG
context = ""
for item in search_results:
if item.properties['content_type'] == 'audio':
context += f"Audio Transcription from {item.properties['url']}: {item.properties['transcription']}\n\n"
elif item.properties['content_type'] == 'text':
context += f"Text from {item.properties['source_document']} (Page {item.properties['page_number']}, Paragraph {item.properties['paragraph_number']}): {item.properties['text']}\n\n"
elif item.properties['content_type'] == 'image':
context += f"Image Description from {item.properties['source_document']} (Page {item.properties['page_number']}, Path: {item.properties['image_path']}): {item.properties['description']}\n\n"
elif item.properties['content_type'] == 'table':
context += f"Table Description from {item.properties['source_document']} (Page {item.properties['page_number']}): {item.properties['description']}\n\n"
# Step 3: Generate response using RAG
response = generate_response(user_query, context)
# Step 4: Format and return the final output
sources = []
for item in search_results:
source = {
"type": item.properties["content_type"],
"distance": item.metadata.distance
}
if item.properties["content_type"] == 'text':
source.update({
"document": item.properties["source_document"],
"page": item.properties["page_number"],
"paragraph": item.properties["paragraph_number"]
})
elif item.properties["content_type"] == 'image':
source.update({
"document": item.properties["source_document"],
"page": item.properties["page_number"],
"image_path": item.properties["image_path"]
})
elif item.properties["content_type"] == 'table':
source.update({
"document": item.properties["source_document"],
"page": item.properties["page_number"]
})
elif item.properties["content_type"] == 'audio':
source.update({
"url": item.properties["url"]
})
sources.append(source)
# Sort sources by distance (ascending order)
sources.sort(key=lambda x: x['distance'])
final_output = {
"user_query": user_query,
"ai_response": response,
"sources": sources
}
return final_output

为了避免如前面插图所示的截断结果,我们可以使用辅助函数 wrap_text 来格式化输出,每行最多一百个字符。

首先,按如下方式安装库:

1
!pip install textwrap3

fill 函数用于指定最大字符数,默认为 120。

1
import textwrap
1
2
3
4
def wrap_text(text, width=120):
wrapped_text = textwrap.fill(text, width=width)

return wrapped_text

最后,通过下面的 analyze_and_print_esg_results 函数提供整体问答结果:

1
2
def analyze_and_print_esg_results(user_question):
result = esg_analysis(user_question)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
print("User Query:", result["user_query"])
print("\nAI Response:", wrap_text(result["ai_response"]))
print("\nSources (sorted by relevance):")
for source in result["sources"]:
print(f"- Type: {source['type']}, Distance: {source['distance']:.3f}")
if source['type'] == 'text':
print(f" Document: {source['document']}, Page: {source['page']}, Paragraph: {source['paragraph']}")
elif source['type'] == 'image':
print(f" Document: {source['document']}, Page: {source['page']}, Image Path: {source['image_path']}")
elif source['type'] == 'table':
print(f" Document: {source['document']}, Page: {source['page']}")
elif source['type'] == 'audio':
print(f" URL: {source['url']}")
print("---")

3. 问答

现在是测试一些查询并查看我们的 AI 驱动的 ESG 系统如何响应的时候了。

1
2
user_question = "Is ESG investment a fraud?"
analyze_and_print_esg_results(user_question)

结果:

系统对 ESG 投资是否欺诈的回答如下,按相关性提供:

  • 音频:一个 YouTube 视频 (最相关,距离 0.408)
  • 表格:来自“Global_ESG_Q1_2024_Flows_Report.pdf”,第 7 页(距离 0.455)
  • 文本:来自同一 PDF,第 8 页,第 3 段(距离 0.468)

对于第二个查询,我们有:

1
2
user_question = "What was the total net inflow for global sustainable funds in Q1 2024?"
analyze_and_print_esg_results(user_question)

结果:

系统提供了来自同一 PDF 报告的三个相关文本来源。

  • 最相关 (距离: 0.220): 第 2 页,第 6 段
  • 第二 (距离: 0.227): 第 6 页,第 2 段
  • 第三 (距离: 0.230): 第 2 页,第 4 段 (与答案中的引用匹配)

我们可以看到系统在提供正确响应和页码方面做得相当不错。

然而,当页面中存在表格时,段落编号并不总是准确,因为处理模块可能会对在处理表格时哪个部分可以被视为段落感到困惑。

总体而言,系统表现得很好!

让我们来看最后一个例子:

1
2
user_question = "What is the net flows for Parnassus Mid Cap Fund?"
analyze_and_print_esg_results(user_question)

结果:

来自全球 ESG 2024 年第一季度流动报告的三条相关文本来源:

  • 最相关 (距离: 0.320): 第 20 页,第 4 段
  • 第二 (距离: 0.344): 第 17 页,第 5 段
  • 第三 (距离: 0.414): 第 9 页,第 7 段

系统提供了有关基金表现的背景信息,但明确说明了可用信息的局限性。它解释了回答问题所需的额外数据,展示了对信息缺口的透明度。

每当知识库中没有信息时,模型会明确说明这一事实,而不是进行推测。

现在,让我们检查响应的正确性。

集成到业务工作流程

构建检索增强生成用例后,最大的挑战之一是自己进行测试,并从用户那里获得相关反馈以便于更快的迭代。

因此,我们可能希望构建一个快速的用户界面用于测试,并自动化该测试过程。可以使用 Gradio、MESOP 等工具来构建这样的界面,但这个用例利用了 Streamlit,下面是插图。

结论

本文提供了多模态检索增强生成(RAG)及其在环境、社会和治理(ESG)投资分析中的应用的综合概述。首先解释了多模态学习、其与多模态 RAG 的相关性以及相关的局限性。

接下来,文章指导读者通过 Weaviate 实现多模态 RAG 的过程。这包括创建 Weaviate 云账户、设置向量数据库实例、建模用例数据以及实现整体聊天系统。

最后,解释了如何通过构建 Streamlit 界面将最终结果集成到业务工作流程中。

那么,我们接下来该如何做?如何改善当前系统?

当前系统虽然提供了良好的结果,但并不完美。增强它的方法有很多,其中一些包括使用 生成反馈循环 ,该方法基于用户对先前生成结果的反馈来改善系统的响应。

另一种方法是不断更新知识库,以帮助系统提供准确且最新的响应,从而改善整体用户体验。

虽然多模态 RAG 在 ESG 场景中前景广阔,但持续改进对其整体成功至关重要。

  • 标题: 多模态检索增强生成应用于现实案例 带代码
  • 作者: Barry
  • 创建于 : 2024-07-19 21:51:00
  • 更新于 : 2024-08-31 06:59:45
  • 链接: https://wx.role.fun/2024/07/19/b5ac99d5b22c493cb3156f9701fb6f53/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。