从零开始使用PyTorch构建自己的大型语言模型LLM

从零开始使用PyTorch构建自己的大型语言模型LLM

Barry Lv6

一步步指南,构建并训练一个名为MalayGPT的LLM。该模型的任务是将文本从英语翻译成马来语。

通过本文,你将实现什么? 你将能够自己构建并训练一个大型语言模型(LLM),同时跟随我的代码进行实践。虽然我们构建的是一个将任意英语文本翻译成马来语的LLM,但你可以轻松修改此LLM架构以适应其他语言翻译任务。

LLM是大多数流行AI聊天机器人的核心基础,如ChatGPT、Gemini、MetaAI、Mistral AI等。在每个LLM的核心,都有一个名为Transformer的架构。因此,我们将首先根据著名的论文“Attention is all you need”(https://arxiv.org/abs/1706.03762 )构建基于Transformer架构的模型。

首先,我们将逐个构建Transformer模型的所有组件。然后,我们将所有组件组装在一起构建我们的模型。之后,我们将使用从Hugging Face数据集中获取的数据来训练和验证我们的模型。最后,我们将通过在新翻译文本数据上进行翻译来测试我们的模型。

重要提示:我将一步步编写Transformer架构中的所有组件代码,并提供必要的概念解释,包括为什么、是什么以及如何做。我还会在需要解释的代码行中提供注释。这样,我相信你可以在自己编写代码的同时理解整个工作流程。

让我们一起编写代码吧!

第一步:加载数据集

为了让llm模型能够完成从英语到马来语的翻译任务,我们需要使用包含源语言(英语)和目标语言(马来语)对的数据集。因此,我们将使用Huggingface上的一个数据集,名为“Helsinki-NLP/opus-100 ”。该数据集包含100万对英语-马来语的训练数据,足以获得良好的准确性,验证集和测试集各有2000条数据。该数据集已经预先分割好,因此我们无需再次进行数据集分割。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 导入必要的库
# 如果尚未安装datasets和tokenizers库,请先安装(!pip install datasets, tokenizers)。
import os
import math
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from datasets import load_dataset
from tqdm import tqdm

# 将设备值设为"cuda"以在GPU上训练,如果GPU不可用,则默认使用"cpu"。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 从huggingface路径加载训练、验证和测试数据集。
raw_train_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='train')
raw_validation_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='validation')
raw_test_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='test')

# 存储数据集文件的目录。
os.mkdir("./dataset-en")
os.mkdir("./dataset-my")

# 在每个EPOCHS后保存模型的目录(在第10步)。
os.mkdir("./malaygpt")

# 存储源语言和目标语言分词器的目录。
os.mkdir("./tokenizer_en")
os.mkdir("./tokenizer_my")

dataset_en = []
dataset_my = []
file_count = 1

# 为了训练分词器(在第2步),我们将训练数据集分成英语和马来语。
# 创建多个大小为50k数据的小文件,并存储到dataset-en和dataset-my目录中。
for data in tqdm(raw_train_dataset["translation"]):
dataset_en.append(data["en"].replace('\n', " "))
dataset_my.append(data["ms"].replace('\n', " "))
if len(dataset_en) == 50000:
with open(f'./dataset-en/file{file_count}.txt', 'w', encoding='utf-8') as fp:
fp.write('\n'.join(dataset_en))
dataset_en = []

with open(f'./dataset-my/file{file_count}.txt', 'w', encoding='utf-8') as fp:
fp.write('\n'.join(dataset_my))
dataset_my = []
file_count += 1

第2步:创建分词器

Transformer模型不处理原始文本,只处理数字。因此,我们需要将原始文本转换为数字。为此,我们将使用一种流行的分词器——BPE分词器,这是一种子词分词器,被用于GPT3等模型中。我们首先在第1步准备好的语料数据(即我们的训练数据集)上训练BPE分词器。流程如下图所示。

训练完成后,分词器会为英语和马来语生成词汇表。词汇表是从语料数据中提取的唯一标记集合。由于我们正在进行翻译任务,因此需要为两种语言都准备分词器。BPE分词器接收原始文本,将其与词汇表中的标记映射,并为输入的每个单词返回标记。这些标记可以是单个单词或子词。这是子词分词器相对于其他分词器的优势之一,因为它可以克服OOV(词汇表外)问题。然后,分词器返回词汇表中标记的唯一索引或位置ID,这些索引将用于创建嵌入,如上图所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 导入分词器库的类和模块
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

# 训练数据集文件的路径,用于训练分词器
path_en = [str(file) for file in Path('./dataset-en').glob("**/*.txt")]
path_my = [str(file) for file in Path('./dataset-my').glob("**/*.txt")]

# [创建源语言分词器 - 英语]
# 创建额外的特殊标记,如 [UNK] - 表示未知词,[PAD] - 填充标记以保持模型中序列长度一致
# [CLS] - 表示句子开始,[SEP] - 表示句子结束
tokenizer_en = Tokenizer(BPE(unk_token="[UNK]"))
trainer_en = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])

# 基于空格分割标记
tokenizer_en.pre_tokenizer = Whitespace()

# 分词器训练第1步创建的数据集文件
tokenizer_en.train(files=path_en, trainer=trainer_en)

# 保存分词器以备将来使用
tokenizer_en.save("./tokenizer_en/tokenizer_en.json")

# [创建目标语言分词器 - 马来语]
tokenizer_my = Tokenizer(BPE(unk_token="[UNK]"))
trainer_my = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])
tokenizer_my.pre_tokenizer = Whitespace()
tokenizer_my.train(files=path_my, trainer=trainer_my)
tokenizer_my.save("./tokenizer_my/tokenizer_my.json")

tokenizer_en = Tokenizer.from_file("./tokenizer_en/tokenizer_en.json")
tokenizer_my = Tokenizer.from_file("./tokenizer_my/tokenizer_my.json")

# 获取两个分词器的词汇表大小
source_vocab_size = tokenizer_en.get_vocab_size()
target_vocab_size = tokenizer_my.get_vocab_size()

# 定义标记ID变量,训练模型时需要这些
CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]")], dtype=torch.int64).to(device)
SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]")], dtype=torch.int64).to(device)
PAD_ID = torch.tensor([tokenizer_my.token_to_id("[PAD]")], dtype=torch.int64).to(device)

第三步:准备数据集和数据加载器

在这一步中,我们将为源语言和目标语言准备数据集,这些数据集将用于后续训练和验证我们构建的模型。我们将创建一个类,该类接收原始数据集,并定义函数分别使用源(tokenizer_en)和目标(tokenizer_my)分词器对源文本和目标文本进行编码。最后,我们将为训练和验证数据集创建DataLoader,以批量迭代数据集(在我们的示例中,批量大小设置为10)。批量大小可以根据数据大小和可用处理能力进行调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# 该类接收原始数据集和max_seq_len(整个数据集中序列的最大长度)。
class EncodeDataset(Dataset):
def __init__(self, raw_dataset, max_seq_len):
super().__init__()
self.raw_dataset = raw_dataset
self.max_seq_len = max_seq_len

def __len__(self):
return len(self.raw_dataset)

def __getitem__(self, index):

# 获取给定索引的原始文本,包含源和目标文本对。
raw_text = self.raw_dataset[index]

# 将文本分离为源文本和目标文本,稍后用于编码。
source_text = raw_text["en"]
target_text = raw_text["ms"]

# 使用源分词器(tokenizer_en)对源文本进行编码,使用目标分词器(tokenizer_my)对目标文本进行编码。
source_text_encoded = torch.tensor(tokenizer_en.encode(source_text).ids, dtype = torch.int64).to(device)
target_text_encoded = torch.tensor(tokenizer_my.encode(target_text).ids, dtype = torch.int64).to(device)

# 为了训练模型,每个输入序列的长度应等于最大序列长度。
# 因此,如果长度小于max_seq_len,则会在输入序列中添加额外的填充数量。
num_source_padding = self.max_seq_len - len(source_text_encoded) - 2
num_target_padding = self.max_seq_len - len(target_text_encoded) - 1

encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device)
decoder_padding = torch.tensor([PAD_ID] * num_target_padding, dtype = torch.int64).to(device)

# encoder_input 以句首标记 CLS_ID 开始,接着是源编码,然后是句尾标记 SEP。
# 为了达到所需的最大序列长度,会在末尾添加额外的 PAD 标记。
encoder_input = torch.cat([CLS_ID, source_text_encoded, SEP_ID, encoder_padding]).to(device)

# decoder_input 以句首标记 CLS_ID 开始,接着是目标编码。
# 为了达到所需的最大序列长度,会在末尾添加额外的 PAD 标记。解码器输入中没有句尾标记 SEP。
decoder_input = torch.cat([CLS_ID, target_text_encoded, decoder_padding ]).to(device)

# target_label 以目标编码开始,接着是句尾标记 SEP。目标标签中没有句首标记 CLS。
# 为了达到所需的最大序列长度,会在末尾添加额外的 PAD 标记。
target_label = torch.cat([target_text_encoded,SEP_ID,decoder_padding]).to(device)

# 由于我们在输入编码中添加了额外的填充标记,在训练过程中,我们不希望模型训练这些标记,因为这些标记没有可学习的内容。
# 因此,我们将使用编码器掩码在计算自注意力输出之前使填充标记值无效。
encoder_mask = (encoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int().to(device)

# 在解码阶段,我们也不希望任何标记受到未来标记的影响。因此,在掩码多头注意力中实施因果掩码来处理这个问题。
decoder_mask = (decoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int() & causal_mask(decoder_input.size(0)).to(device)

return {
'encoder_input': encoder_input,
'decoder_input': decoder_input,
'target_label': target_label,
'encoder_mask': encoder_mask,
'decoder_mask': decoder_mask,
'source_text': source_text,
'target_text': target_text
}

# 因果掩码确保当前标记之后的任何标记都被掩码,即值被替换为负无穷大,在softmax函数后转换为零或接近零。
# 因此,模型将忽略这些值或无法从这些值中学习任何内容。
def causal_mask(size):
# 因果掩码的维度(batch_size, seq_len, seq_len)
mask = torch.triu(torch.ones(1, size, size), diagonal = 1).type(torch.int)
return mask == 0

# 计算整个训练数据集中源和目标数据集的最大序列长度。
max_seq_len_source = 0
max_seq_len_target = 0

for data in raw_train_dataset["translation"]:
enc_ids = tokenizer_en.encode(data["en"]).ids
dec_ids = tokenizer_my.encode(data["ms"]).ids
max_seq_len_source = max(max_seq_len_source, len(enc_ids))
max_seq_len_target = max(max_seq_len_target, len(dec_ids))

print(f'max_seqlen_source: {max_seq_len_source}') #530
print(f'max_seqlen_target: {max_seq_len_target}') #526

# 为了简化训练过程,我们将只取一个最大序列长度,并增加20以覆盖序列中额外的标记长度,如PAD、CLS、SEP。
max_seq_len = 550

# 实例化EncodeRawDataset类,并创建编码后的训练和验证数据集。
train_dataset = EncodeDataset(raw_train_dataset["translation"], max_seq_len)
val_dataset = EncodeDataset(raw_validation_dataset["translation"], max_seq_len)

# 为训练和验证数据集创建DataLoader包装器。这个数据加载器将在后续的LLM模型训练和验证阶段使用。
train_dataloader = DataLoader(train_dataset, batch_size = 10, shuffle = True, generator=torch.Generator(device='cuda'))
val_dataloader = DataLoader(val_dataset, batch_size = 1, shuffle = True, generator=torch.Generator(device='cuda'))

第四步:输入嵌入和位置编码

输入嵌入:从第二步的标记器生成的标记ID序列将被送入嵌入层。嵌入层将标记ID映射到词汇表,并为每个标记生成一个512维的嵌入向量。[512维来自注意力机制论文]。嵌入向量能够根据其训练数据集捕捉标记的语义含义。嵌入向量中的每个维度值代表与该标记相关的一些特征。例如,如果标记是“狗”,某些维度值可能代表眼睛、嘴巴、腿、高度等。如果在n维空间中绘制向量,外观相似的对象如狗、猫会彼此靠近,而外观不相似的对象如学校、家的嵌入向量则会相距较远。

位置编码:Transformer架构的优势之一是它可以并行处理任意数量的输入序列,这大大减少了训练时间并使预测更快。然而,一个缺点是,在并行处理多个标记序列时,标记在句子中的位置将不会按顺序排列。这可能会导致依赖标记位置的句子产生不同的含义或上下文。因此,为了解决这个问题,注意力机制论文实现了位置编码方法。论文建议对每个标记的512维应用两个数学函数(一个是正弦函数,另一个是余弦函数)。以下是简单的正弦和余弦数学函数。

正弦函数应用于每个偶数维度值,而余弦函数应用于嵌入向量的奇数维度值。最后,得到的位置编码向量将被添加到嵌入向量中。现在,我们有了一个既能捕捉标记语义含义又能捕捉标记位置的嵌入向量。请注意,位置编码的值在每个序列中保持不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 输入嵌入和位置编码
class EmbeddingLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.d_model = d_model

# 使用PyTorch的嵌入层模块将标记ID映射到词汇表,然后转换为嵌入向量。
# vocab_size是第二步中标记器在训练语料库数据集时创建的训练数据集的词汇表大小。
self.embedding = nn.Embedding(vocab_size, d_model)

def forward(self, input):
# 除了将输入序列送入嵌入层外,还通过乘以d_model的平方根来规范化嵌入层输出
embedding_output = self.embedding(input) * math.sqrt(self.d_model)
return embedding_output


class PositionalEncoding(nn.Module):
def __init__(self, max_seq_len: int, d_model: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)

# 创建一个与嵌入向量形状相同的矩阵。
pe = torch.zeros(max_seq_len, d_model)

# 计算位置编码函数的位置部分。
pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)

# 计算位置编码函数的除法部分。注意,除法部分的表达式与论文中的表达式略有不同,因为这种指数函数似乎效果更好。
div_term = torch.exp(torch.arange(0, d_model, 2).float()) * (-math.log(10000)/d_model)

# 用正弦和余弦数学函数的结果填充奇数和偶数矩阵值。
pe[:, 0::2] = torch.sin(pos * div_term)
pe[:, 1::2] = torch.cos(pos * div_term)

# 由于我们期望输入序列以批次形式出现,因此在第0位置添加了额外的batch_size维度。
pe = pe.unsqueeze(0)

def forward(self, input_embdding):
# 将位置编码与输入嵌入向量相加。
input_embdding = input_embdding + (self.pe[:, :input_embdding.shape[1], :]).requires_grad_(False)

# 执行dropout以防止过拟合。
return self.dropout(input_embdding)

第5步:多头注意力块

正如Transformer是LLM的核心,自注意力机制则是Transformer架构的核心。

为什么需要自注意力机制? 让我们通过下面的简单例子来回答这个问题。

句子1句子2中,单词“bank”显然有两个不同的含义。然而,这两个句子中单词“bank”的嵌入值是相同的。这并不合理。我们希望嵌入值能根据句子的上下文进行变化。因此,我们需要一种机制,使得嵌入值能够动态地根据句子的整体含义进行更新。自注意力机制能够动态地更新嵌入值,使其能够代表基于句子上下文的含义。

如果自注意力机制已经如此出色,为什么我们还需要多头自注意力? 让我们通过下面的另一个例子来找出答案。

在这个例子中,如果我们使用仅关注句子某一方面的自注意力机制,比如只关注“what”方面,可能只能捕捉到“John做了什么?”。然而,其他方面,如“when”或“where”,对于模型更好地执行同样重要。因此,我们需要找到一种方法,使自注意力机制能够同时学习句子中的多种关系。这就是多头自注意力(多头注意力可以互换使用)发挥作用的地方。在多头注意力中,单头嵌入将被分成多个头,每个头将关注句子的不同方面并相应地学习。这正是我们想要的。

现在,我们知道为什么需要多头注意力。让我们看看它是如何工作的。

如果你对矩阵乘法感到舒适,理解这个机制就相当容易。首先看一下整个流程图,然后我将从输入到多头注意力的输出,按点进行描述。

  1. 首先,让我们复制3份编码器输入(输入嵌入和位置编码的组合,我们在第4步中已经完成)。分别命名为Q、K和V。它们只是编码器输入的副本。编码器输入形状:(seq_len, d_model),seq_len:最大序列长度,d_model:嵌入向量维度,这里为512。

  2. 接下来,我们将对Q、K和V分别与权重W_q、W_k和W_v进行矩阵乘法。每个权重矩阵的形状为(d_model, d_model)。得到的新查询嵌入向量的形状为(seq_len, d_model)。权重参数将由模型随机初始化,并在训练开始后更新。为什么我们需要权重矩阵乘法?因为这些是可学习的参数,对于查询、键和值嵌入向量来说,需要这些参数来提供更好的表示。

  3. 根据注意力论文,头的数量为8。每个新的查询嵌入向量将被分成8个较小的查询、键和值嵌入向量。新的嵌入向量形状为(seq_len, d_model/num_heads)或(seq_len, d_k)。[ d_k = d_model/num_heads ]。

  4. 每个查询嵌入向量将与自身和其他序列中所有嵌入向量的键嵌入向量的转置进行点积运算。这个点积给出注意力分数。注意力分数显示给定标记与输入序列中所有其他标记的相似度。分数越高,相似度越高。

  • 注意力分数随后将被除以d_k的平方根,这是为了跨矩阵归一化分数值。但为什么必须除以d_k来归一化,可以是其他任何数字。主要原因是,随着嵌入向量维度的增加,注意力矩阵的总方差也相应增加。这就是为什么除以d_k会平衡方差的增加。如果我们不除以d_k,对于任何较高的注意力分数,softmax函数将给出非常高的概率值,而对于任何较低的注意力分数值,softmax函数将给出非常低的概率值。这将导致模型只关注学习那些高概率值的特征,而忽略低概率值的特征,这会导致梯度消失。因此,归一化注意力分数矩阵非常必要。
  • 在执行softmax函数之前,如果编码器掩码不是None,注意力分数将与编码器掩码进行矩阵乘法。如果掩码是因果掩码,那么输入序列中出现在其后的那些嵌入标记的注意力分数值将被替换为负无穷。softmax函数会将负无穷转换为接近零的值。因此,模型不会学习那些出现在当前标记之后的特征。这就是我们如何防止未来标记影响模型学习的方法。
  1. 然后对注意力分数矩阵应用softmax函数,输出形状为(seq_len, seq_len)的权重矩阵。

  2. 这些权重矩阵随后将与相应的嵌入向量进行矩阵乘法。这将得到形状为(seq_len, d_v)的8个注意力头。[ d_v = d_model/num_heads ]。

  3. 最后,所有头将被连接成一个新形状为(seq_len, d_model)的单头。这个新单头将与输出权重矩阵W_o (d_model, d_model)进行矩阵乘法。多头注意力的最终输出代表了单词的上下文含义,并能够学习输入句子的多个方面。

接下来,让我们开始编写多头注意力块的代码,这会简单得多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout_rate: float):
super().__init__()
# 定义dropout以防止过拟合。
self.dropout = nn.Dropout(dropout_rate)

# 引入权重矩阵,它们都是可学习的参数。
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)

self.num_heads = num_heads
assert d_model % num_heads == 0, "d_model必须能被头的数量整除"

# d_k是每个分割后的自注意力头的新的维度
self.d_k = d_model // num_heads

def forward(self, q, k, v, encoder_mask=None):

# 我们将使用多个序列批次并行训练模型,因此需要在形状中包含batch_size。
# 通过输入嵌入与相应权重的矩阵乘法计算查询、键和值。
# 形状变化:q(batch_size, seq_len, d_model) @ W_q(d_model, d_model) => query(batch_size, seq_len, d_model) [键和值同理]。
query = self.W_q(q)
key = self.W_k(k)
value = self.W_v(v)

# 将查询、键和值分割成多个头。d_model在8个头中被分割成d_k。
# 形状变化:query(batch_size, seq_len, d_model) => query(batch_size, seq_len, num_heads, d_k) -> query(batch_size,num_heads, seq_len,d_k) [键和值同理]。
query = query.view(query.shape[0], query.shape[1], self.num_heads ,self.d_k).transpose(1,2)
key = key.view(key.shape[0], key.shape[1], self.num_heads ,self.d_k).transpose(1,2)
value = value.view(value.shape[0], value.shape[1], self.num_heads ,self.d_k).transpose(1,2)

# :: 自注意力块开始 ::

# 计算注意力分数,以找出查询与自身和序列中所有其他嵌入的键之间的相似度或关系。
# 形状变化:query(batch_size,num_heads, seq_len,d_k) @ key(batch_size,num_heads, seq_len,d_k) => attention_score(batch_size,num_heads, seq_len,seq_len)。
attention_score = (query @ key.transpose(-2,-1))/math.sqrt(self.d_k)

# 如果提供了掩码,注意力分数需要根据掩码值进行修改。详情参见第4点。
if encoder_mask is not None:
attention_score = attention_score.masked_fill(encoder_mask==0, -1e9)

# softmax函数计算所有注意力分数的概率分布。它为较高的注意力分数分配较高的概率值。意味着更相似的标记获得更高的概率值。
# 形状变化:与attention_score相同
attention_weight = torch.softmax(attention_score, dim=-1)

if self.dropout is not None:
attention_weight = self.dropout(attention_weight)

# 自注意力块的最后一步是,注意力权重与值嵌入向量进行矩阵乘法。
# 形状变化:attention_score(batch_size,num_heads, seq_len,seq_len) @ value(batch_size,num_heads, seq_len,d_k) => attention_output(batch_size,num_heads, seq_len,d_k)
attention_output = attention_score @ value

# :: 自注意力块结束 ::

# 现在,所有头将被合并回一个单头
# 形状变化:attention_output(batch_size,num_heads, seq_len,d_k) => attention_output(batch_size,seq_len,num_heads,d_k) => attention_output(batch_size,seq_len,d_model)
attention_output = attention_output.transpose(1,2).contiguous().view(attention_output.shape[0], -1, self.num_heads * self.d_k)

# 最后,attention_output与输出权重矩阵进行矩阵乘法,得到最终的多头注意力输出。
# 多头输出的形状与嵌入输入相同
# 形状变化:attention_output(batch_size,seq_len,d_model) @ W_o(d_model, d_model) => multihead_output(batch_size, seq_len, d_model)
multihead_output = self.W_o(attention_output)

return multihead_output

第6步:前馈网络、层归一化和AddAndNorm

前馈网络:前馈网络使用深度神经网络来学习嵌入向量的所有特征,通过两个线性层(第一层有d_model个节点,第二层有d_ff个节点,值根据注意力论文分配),并在第一层线性层的输出上应用ReLU激活函数,从而为嵌入值提供非线性,同时应用dropout以进一步避免过拟合。

层归一化(LayerNorm):我们对嵌入值应用层归一化,以确保网络中嵌入向量值的分布保持一致。这保证了平滑的学习过程。我们将使用额外的学习参数gamma和beta来根据网络需要缩放和偏移嵌入值。

AddAndNorm:这包括一个跳跃连接和一个层归一化(前面解释过)。在前向传播过程中,跳跃连接确保早期层的特征在后期仍能被记住,以便在计算输出时做出必要的贡献。同样,在反向传播过程中,跳跃连接通过减少每个阶段的反向传播次数来防止梯度消失。AddAndNorm在编码器(2次)和解码器块(3次)中都有使用。它从前一层获取输入,先进行归一化,然后将其添加到前一层的输出中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 前馈网络、层归一化和AddAndNorm模块
class FeedForward(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout_rate: float):
super().__init__()

self.layer_1 = nn.Linear(d_model, d_ff)
self.activation_1 = nn.ReLU()
self.dropout = nn.Dropout(dropout_rate)
self.layer_2 = nn.Linear(d_ff, d_model)

def forward(self, input):
return self.layer_2(self.dropout(self.activation_1(self.layer_1(input))))

class LayerNorm(nn.Module):
def __init__(self, eps: float = 1e-5):
super().__init__()
# Epsilon是一个非常小的值,它在防止潜在的除以零问题中起着重要作用。
self.eps = eps

# 引入额外的学习参数gamma和beta,根据网络需要来缩放和偏移嵌入值。
self.gamma = nn.Parameter(torch.ones(1))
self.beta = nn.Parameter(torch.zeros(1))

def forward(self, input):
mean = input.mean(dim=-1, keepdim=True)
std = input.std(dim=-1, keepdim=True)

return self.gamma * ((input - mean)/(std + self.eps)) + self.beta


class AddAndNorm(nn.Module):
def __init__(self, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.layer_norm = LayerNorm()

def forward(self, input, sub_layer):
return input + self.dropout(sub_layer(self.layer_norm(input)))

第7步:编码器块与编码器

编码器块:编码器块内部主要包含两个组件:多头注意力(Multi-Head Attention)和前馈网络(Feedforward)。此外,还有两个Add & Norm单元。我们将按照《Attention》论文中的流程,在EncoderBlock类中组装这些组件。根据论文描述,这个编码器块被重复了6次。

编码器:接下来,我们将创建一个名为Encoder的附加类,该类接收一系列EncoderBlock并将其堆叠,最终输出编码器的输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class EncoderBlock(nn.Module):
def __init__(self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(2)])

def forward(self, encoder_input, encoder_mask):
# 第一个AddAndNorm单元接收来自跳跃连接的编码器输入,并与多头注意力块的输出相加。
encoder_input = self.add_and_norm_list[0](encoder_input, lambda encoder_input: self.multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask))

# 第二个AddAndNorm单元接收来自跳跃连接的多头注意力块输出,并与前馈层的输出相加。
encoder_input = self.add_and_norm_list[1](encoder_input, self.feed_forward)

return encoder_input

class Encoder(nn.Module):
def __init__(self, encoderblocklist: nn.ModuleList):
super().__init__()

# 编码器类通过接收编码器块列表进行初始化。
self.encoderblocklist = encoderblocklist
self.layer_norm = LayerNorm()

def forward(self, encoder_input, encoder_mask):
# 遍历所有编码器块 - 共6次。
for encoderblock in self.encoderblocklist:
encoder_input = encoderblock(encoder_input, encoder_mask)

# 对最终的编码器块输出进行归一化并返回。此编码器输出将作为解码器块中交叉注意力的键和值使用。
encoder_output = self.layer_norm(encoder_input)
return encoder_output

第8步:解码器块、解码器和投影层

解码器块: 解码器块主要包含三个组件:掩码多头注意力(Masked Multi-Head Attention)、多头注意力(Multi-Head Attention)和前馈网络(Feedforward)。解码器块还包含3个“加法与归一化”(Add & Norm)单元。我们将按照《Attention》论文中的流程,在DecoderBlock类中组装这些组件。根据论文,这个解码器块被重复了6次。

解码器: 我们将创建一个名为Decoder的附加类,它接收一系列DecoderBlock,将其堆叠并输出最终的解码器输出。

解码器块中包含两种多头注意力机制。第一种是掩码多头注意力,它以解码器输入作为查询(query)、键(key)和值(value),并使用一个解码器掩码(也称为因果掩码)。因果掩码防止模型查看序列顺序中靠后的嵌入。关于其工作原理的详细解释,请参见第3步和第5步。

投影层: 最终的解码器输出将被传递到投影层。在这个层中,解码器输出首先被送入一个线性层,其嵌入形状会根据下面的代码部分进行改变。随后,softmax函数将解码器输出转换为词汇表上的概率分布,并选择概率最高的词元作为预测输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class DecoderBlock(nn.Module):
def __init__(self, masked_multihead_attention: MultiHeadAttention,multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float):
super().__init__()
self.masked_multihead_attention = masked_multihead_attention
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range(3)])

def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
# 第一个AddAndNorm单元接收来自跳跃连接的解码器输入,并与掩码多头注意力块的输出相加。
decoder_input = self.add_and_norm_list[0](decoder_input, lambda decoder_input: self.masked_multihead_attention(decoder_input,decoder_input, decoder_input, decoder_mask))
# 第二个AddAndNorm单元接收来自跳跃连接的掩码多头注意力块的输出,并与多头注意力块的输出相加。
decoder_input = self.add_and_norm_list[1](decoder_input, lambda decoder_input: self.multihead_attention(decoder_input,encoder_output, encoder_output, encoder_mask)) # 交叉注意力
# 第三个AddAndNorm单元接收来自跳跃连接的多头注意力块的输出,并与前馈层的输出相加。
decoder_input = self.add_and_norm_list[2](decoder_input, self.feed_forward)
return decoder_input

class Decoder(nn.Module):
def __init__(self,decoderblocklist: nn.ModuleList):
super().__init__()
self.decoderblocklist = decoderblocklist
self.layer_norm = LayerNorm()

def forward(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
for decoderblock in self.decoderblocklist:
decoder_input = decoderblock(decoder_input, decoder_mask, encoder_output, encoder_mask)

decoder_output = self.layer_norm(decoder_input)
return decoder_output

class ProjectionLayer(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.projection_layer = nn.Linear(d_model, vocab_size)

def forward(self, decoder_output):
# 投影层首先接收解码器输出,并将其传递到形状为(d_model, vocab_size)的线性层中。
# 形状变化:decoder_output(batch_size, seq_len, d_model) @ linear_layer(d_model, vocab_size) => output(batch_size, seq_len, vocab_size)
output = self.projection_layer(decoder_output)

# softmax函数输出词汇表上的概率分布
return torch.log_softmax(output, dim=-1)

第9步:创建并构建Transformer

最终,我们已经完成了Transformer架构中所有组件块的构建。唯一剩下的任务就是将它们全部组装起来。

我们将首先创建一个Transformer类,该类将初始化所有组件类的实例。在Transformer类内部,我们首先定义编码函数,该函数完成Transformer编码器部分的所有任务并生成编码器输出。其次,我们定义解码函数,该函数完成Transformer解码器部分的所有任务并生成解码器输出。最后,我们定义一个投影函数,该函数接收解码器输出并将其映射到词汇表以进行预测。

现在,Transformer架构已经准备就绪。我们可以通过定义一个函数来构建我们的翻译LLM模型,该函数接收如下代码中给出的所有必要参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Transformer(nn.Module):
def __init__(self, source_embed: EmbeddingLayer, target_embed: EmbeddingLayer, positional_encoding: PositionalEncoding, multihead_attention: MultiHeadAttention, masked_multihead_attention: MultiHeadAttention, feed_forward: FeedForward, encoder: Encoder, decoder: Decoder, projection_layer: ProjectionLayer, dropout_rate: float):
super().__init__()

# 初始化Transformer架构中所有组件类的实例
self.source_embed = source_embed
self.target_embed = target_embed
self.positional_encoding = positional_encoding
self.multihead_attention = multihead_attention
self.masked_multihead_attention = masked_multihead_attention
self.feed_forward = feed_forward
self.encoder = encoder
self.decoder = decoder
self.projection_layer = projection_layer
self.dropout = nn.Dropout(dropout_rate)

# 编码函数接收编码器输入,在所有编码器块内进行必要的处理并给出编码器输出
def encode(self, encoder_input, encoder_mask):
encoder_input = self.source_embed(encoder_input)
encoder_input = self.positional_encoding(encoder_input)
encoder_output = self.encoder(encoder_input, encoder_mask)
return encoder_output

# 解码函数接收解码器输入,在所有解码器块内进行必要的处理并给出解码器输出
def decode(self, decoder_input, decoder_mask, encoder_output, encoder_mask):
decoder_input = self.target_embed(decoder_input)
decoder_input = self.positional_encoding(decoder_input)
decoder_output = self.decoder(decoder_input, decoder_mask, encoder_output, encoder_mask)
return decoder_output

# 投影函数接收解码器输出并将其通过投影层映射到词汇表以进行预测
def project(self, decoder_output):
return self.projection_layer(decoder_output)

def build_model(source_vocab_size, target_vocab_size, max_seq_len=1135, d_model=512, d_ff=2048, num_heads=8, num_blocks=6, dropout_rate=0.1):

# 定义并赋值Transformer架构所需的所有参数
source_embed = EmbeddingLayer(source_vocab_size, d_model)
target_embed = EmbeddingLayer(target_vocab_size, d_model)
positional_encoding = PositionalEncoding(max_seq_len, d_model, dropout_rate)
multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
feed_forward = FeedForward(d_model, d_ff, dropout_rate)
projection_layer = ProjectionLayer(target_vocab_size, d_model)
encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate)
decoder_block = DecoderBlock(masked_multihead_attention,multihead_attention, feed_forward, dropout_rate)

encoderblocklist = []
decoderblocklist = []

for _ in range(num_blocks):
encoderblocklist.append(encoder_block)

for _ in range(num_blocks):
decoderblocklist.append(decoder_block)

encoderblocklist = nn.ModuleList(encoderblocklist)
decoderblocklist = nn.ModuleList(decoderblocklist)

encoder = Encoder(encoderblocklist)
decoder = Decoder(decoderblocklist)

# 通过提供所有参数值实例化Transformer类
model = Transformer(source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention,feed_forward, encoder, decoder, projection_layer, dropout_rate)

for param in model.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)

return model

# 最后,调用build_model并将其赋值给model变量。
# 该模型现已完全准备好训练和验证我们的数据集。
# 训练和验证后,我们可以使用此模型执行新的翻译任务

model = build_model(source_vocab_size, target_vocab_size)

第十步:训练和验证我们的构建LLM模型

现在是时候训练我们的模型了。训练过程相当直接。我们将使用在第三步中创建的训练DataLoader。由于总训练数据集数量为100万,我强烈建议在GPU设备上训练我们的模型。我大约花了5小时完成了20个epoch。每个epoch结束后,我们将保存模型权重以及优化器状态,这样便于从停止点继续训练,而不是从头开始。

每个epoch结束后,我们将使用验证DataLoader进行验证。验证数据集的大小为2000,相当合理。在验证过程中,我们只需计算一次编码器输出,直到解码器输出结束符[SEP],这是因为直到解码器接收到[SEP]标记,我们不得不反复发送相同的编码器输出,这没有意义。

解码器输入首先以句子开始标记[CLS]开始。每次预测后,解码器输入将追加下一个生成的标记,直到达到句子结束标记[SEP]。最后,投影层将输出映射到相应的文本表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
def training_model(preload_epoch=None):   

# 整个训练、验证周期将运行20次。
EPOCHS = 20
initial_epoch = 0
global_step = 0

# Adam是最常用的优化算法之一,它持有当前状态,并根据计算的梯度更新参数。
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 如果preload_epoch不为空,意味着训练将从上次保存的权重和优化器开始。新的epoch编号将是preload_epoch + 1。
if preload_epoch is not None:
model_filename = f"./malaygpt/model_{preload_epoch}.pt"
state = torch.load(model_filename)
initial_epoch = state['epoch'] + 1
optimizer.load_state_dict(state['optimizer_state_dict'])
global_step = state['global_step']

# CrossEntropyLoss损失函数计算投影输出与目标标签之间的差异。
loss_fn = nn.CrossEntropyLoss(ignore_index = tokenizer_en.token_to_id("[PAD]"), label_smoothing=0.1).to(device)

for epoch in range(initial_epoch, EPOCHS):

# ::: 训练块开始 :::
model.train()

# 使用第三步中准备的训练dataloder进行训练。
for batch in tqdm(train_dataloader):
encoder_input = batch['encoder_input'].to(device) # (batch_size, seq_len)
decoder_input = batch['decoder_input'].to(device) # (batch_size, seq_len)
target_label = batch['target_label'].to(device) # (batch_size, seq_len)
encoder_mask = batch['encoder_mask'].to(device)
decoder_mask = batch['decoder_mask'].to(device)

encoder_output = model.encode(encoder_input, encoder_mask)
decoder_output = model.decode(decoder_input, decoder_mask, encoder_output, encoder_mask)
projection_output = model.project(decoder_output)

# projection_output(batch_size, seq_len, vocab_size)
loss = loss_fn(projection_output.view(-1, projection_output.shape[-1]), target_label.view(-1))

# 反向传播
optimizer.zero_grad()
loss.backward()

# 更新权重
optimizer.step()
global_step += 1

print(f'Epoch [{epoch+1}/{EPOCHS}]: Train Loss: {loss.item():.2f}')

# 每个epoch结束后保存模型状态
model_filename = f"./malaygpt/model_{epoch}.pt"
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'global_step': global_step
}, model_filename)
# ::: 训练块结束 :::

# ::: 验证块开始 :::
model.eval()
with torch.inference_mode():
for batch in tqdm(val_dataloader):
encoder_input = batch['encoder_input'].to(device) # (batch_size, seq_len)
encoder_mask = batch['encoder_mask'].to(device)
source_text = batch['source_text']
target_text = batch['target_text']

# 计算源序列的编码器输出。
encoder_output = model.encode(encoder_input, encoder_mask)

# 对于预测任务,解码器输入的第一个标记是[CLS]标记
decoder_input = torch.empty(1,1).fill_(tokenizer_my.token_to_id('[CLS]')).type_as(encoder_input).to(device)

# 因为我们需要不断将输出添加回输入,直到接收到[SEP] - 结束标记。
while True:
# 检查是否达到了最大长度,如果是,则停止。
if decoder_input.size(1) == max_seq_len:
break

# 每次新输出添加到解码器输入以进行下一个标记预测时重新创建掩码
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)

decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask)

# 仅对下一个标记应用投影。
projection = model.project(decoder_output[:, -1])

# 选择概率最高的标记,这是一种称为贪心搜索的实现。
_, new_token = torch.max(projection, dim=1)
new_token = torch.empty(1,1).type_as(encoder_input).fill_(new_token.item()).to(device)

# 将新标记添加回解码器输入。
decoder_input = torch.cat([decoder_input, new_token], dim=1)

# 检查新标记是否为结束标记,如果是,则停止。
if new_token == tokenizer_my.token_to_id('[SEP]'):
break

# 将解码器输出分配为完全追加的解码器输入。
decoder_output = decoder_input.squeeze(0)
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy())

print(f'SOURCE TEXT": {source_text}')
print(f'TARGET TEXT": {target_text}')
print(f'PREDICTED TEXT": {model_predicted_text}')
# ::: 验证块结束 :::

# 此函数运行20个epoch的训练和验证
training_model(preload_epoch=None)

第11步:创建一个函数,用我们构建的模型测试新的翻译任务

我们将给我们的翻译函数一个通用的名称,称为malaygpt。该函数接收用户输入的英文原始文本,并输出翻译成马来语的文本。让我们运行这个函数并尝试一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def malaygpt(user_input_text):
model.eval()
with torch.inference_mode():
user_input_text = user_input_text.strip()
user_input_text_encoded = torch.tensor(tokenizer_en.encode(user_input_text).ids, dtype = torch.int64).to(device)

num_source_padding = max_seq_len - len(user_input_text_encoded) - 2
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device)
encoder_input = torch.cat([CLS_ID, user_input_text_encoded, SEP_ID, encoder_padding]).to(device)
encoder_mask = (encoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int().to(device)

# 计算源序列的编码器输出
encoder_output = model.encode(encoder_input, encoder_mask)
# 对于预测任务,解码器输入的第一个token是[CLS] token
decoder_input = torch.empty(1,1).fill_(tokenizer_my.token_to_id('[CLS]')).type_as(encoder_input).to(device)

# 由于我们需要不断将输出添加回输入,直到接收到[SEP] - 结束token。
while True:
# 检查是否达到了最大长度
if decoder_input.size(1) == max_seq_len:
break
# 每次将新输出添加到解码器输入以进行下一个token预测时,重新创建掩码
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)
decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask)

# 仅对下一个token应用投影
projection = model.project(decoder_output[:, -1])

# 选择概率最高的token,这是贪婪搜索的实现
_, new_token = torch.max(projection, dim=1)
new_token = torch.empty(1,1).type_as(encoder_input).fill_(new_token.item()).to(device)

# 将新token添加回解码器输入
decoder_input = torch.cat([decoder_input, new_token], dim=1)

# 检查新token是否是结束token
if new_token == tokenizer_my.token_to_id('[SEP]'):
break
# 最终的解码器输出是直到结束token的解码器输入的连接
decoder_output = decoder_input.squeeze(0)
model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy())

return model_predicted_text

测试时间!让我们进行一些翻译测试。

“翻译似乎运行得相当好。”

就是这样! 我非常有信心,你现在能够使用PyTorch从头开始构建自己的大型语言模型。你也可以在其他语言数据集上训练这个模型,并在该语言中执行翻译任务。现在,你已经学会了如何从头开始构建原始的Transformer,我可以保证你现在能够学习和实现市场上大多数大型语言模型的应用。

下一步是什么? 我将通过微调Llama 3模型来构建一个完全功能的应用程序,这是目前市场上最流行的开源大型语言模型之一。我还将分享完整的源代码。

所以,请继续关注并非常感谢你的阅读!

Google Colab笔记本链接

参考文献

  • Attention Is All You Need — 论文,Ashish Vaswani, Noam Shazeer 和团队
  • Attention in transformers, 视觉解释,3Blue1Brown — YouTube
  • Let’s build GPT, Andrej Karpathy, YouTube
  • https://github.com/hkproj/pytorch-transformer — Umar Jamil
  • 标题: 从零开始使用PyTorch构建自己的大型语言模型LLM
  • 作者: Barry
  • 创建于 : 2024-06-05 20:58:44
  • 更新于 : 2024-08-31 06:59:45
  • 链接: https://wx.role.fun/2024/06/05/03995bdf9e8a4f6487478d2370e1a2d9/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。