从零开始使用PyTorch构建自己的大型语言模型LLM
一步步指南,构建并训练一个名为MalayGPT 的LLM。该模型的任务是将文本从英语翻译成马来语。
通过本文,你将实现什么? 你将能够自己构建并训练一个大型语言模型(LLM),同时跟随我的代码进行实践。虽然我们构建的是一个将任意英语文本翻译成马来语的LLM,但你可以轻松修改此LLM架构以适应其他语言翻译任务。
LLM是大多数流行AI聊天机器人的核心基础,如ChatGPT、Gemini、MetaAI、Mistral AI等。在每个LLM的核心,都有一个名为Transformer 的架构。因此,我们将首先根据著名的论文“Attention is all you need”(https://arxiv.org/abs/1706.03762 )构建基于Transformer架构的模型。
首先,我们将逐个构建Transformer模型的所有组件。然后,我们将所有组件组装在一起构建我们的模型。之后,我们将使用从Hugging Face数据集中获取的数据来训练和验证我们的模型。最后,我们将通过在新翻译文本数据上进行翻译来测试我们的模型。
重要提示 :我将一步步编写Transformer架构中的所有组件代码,并提供必要的概念解释,包括为什么、是什么以及如何做。我还会在需要解释的代码行中提供注释。这样,我相信你可以在自己编写代码的同时理解整个工作流程。
让我们一起编写代码吧!
第一步:加载数据集 为了让llm模型能够完成从英语到马来语的翻译任务,我们需要使用包含源语言(英语)和目标语言(马来语)对的数据集。因此,我们将使用Huggingface上的一个数据集,名为“Helsinki-NLP/opus-100 ”。该数据集包含100万对英语-马来语的训练数据,足以获得良好的准确性,验证集和测试集各有2000条数据。该数据集已经预先分割好,因此我们无需再次进行数据集分割。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import osimport mathimport torchimport torch.nn as nnfrom torch.utils.data import Dataset, DataLoaderfrom pathlib import Pathfrom datasets import load_datasetfrom tqdm import tqdmdevice = torch.device("cuda" if torch.cuda.is_available() else "cpu" ) raw_train_dataset = load_dataset("Helsinki-NLP/opus-100" , "en-ms" , split='train' ) raw_validation_dataset = load_dataset("Helsinki-NLP/opus-100" , "en-ms" , split='validation' ) raw_test_dataset = load_dataset("Helsinki-NLP/opus-100" , "en-ms" , split='test' ) os.mkdir("./dataset-en" ) os.mkdir("./dataset-my" ) os.mkdir("./malaygpt" ) os.mkdir("./tokenizer_en" ) os.mkdir("./tokenizer_my" ) dataset_en = [] dataset_my = [] file_count = 1 for data in tqdm(raw_train_dataset["translation" ]): dataset_en.append(data["en" ].replace('\n' , " " )) dataset_my.append(data["ms" ].replace('\n' , " " )) if len (dataset_en) == 50000 : with open (f'./dataset-en/file{file_count} .txt' , 'w' , encoding='utf-8' ) as fp: fp.write('\n' .join(dataset_en)) dataset_en = [] with open (f'./dataset-my/file{file_count} .txt' , 'w' , encoding='utf-8' ) as fp: fp.write('\n' .join(dataset_my)) dataset_my = [] file_count += 1
第2步:创建分词器 Transformer模型不处理原始文本,只处理数字。因此,我们需要将原始文本转换为数字。为此,我们将使用一种流行的分词器——BPE分词器,这是一种子词分词器,被用于GPT3等模型中。我们首先在第1步准备好的语料数据(即我们的训练数据集)上训练BPE分词器。流程如下图所示。
训练完成后,分词器会为英语和马来语生成词汇表。词汇表是从语料数据中提取的唯一标记集合。由于我们正在进行翻译任务,因此需要为两种语言都准备分词器。BPE分词器接收原始文本,将其与词汇表中的标记映射,并为输入的每个单词返回标记。这些标记可以是单个单词或子词。这是子词分词器相对于其他分词器的优势之一,因为它可以克服OOV(词汇表外)问题。然后,分词器返回词汇表中标记的唯一索引或位置ID,这些索引将用于创建嵌入,如上图所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 from tokenizers import Tokenizerfrom tokenizers.models import BPEfrom tokenizers.trainers import BpeTrainerfrom tokenizers.pre_tokenizers import Whitespacepath_en = [str (file) for file in Path('./dataset-en' ).glob("**/*.txt" )] path_my = [str (file) for file in Path('./dataset-my' ).glob("**/*.txt" )] tokenizer_en = Tokenizer(BPE(unk_token="[UNK]" )) trainer_en = BpeTrainer(min_frequency=2 , special_tokens=["[PAD]" ,"[UNK]" ,"[CLS]" , "[SEP]" , "[MASK]" ]) tokenizer_en.pre_tokenizer = Whitespace() tokenizer_en.train(files=path_en, trainer=trainer_en) tokenizer_en.save("./tokenizer_en/tokenizer_en.json" ) tokenizer_my = Tokenizer(BPE(unk_token="[UNK]" )) trainer_my = BpeTrainer(min_frequency=2 , special_tokens=["[PAD]" ,"[UNK]" ,"[CLS]" , "[SEP]" , "[MASK]" ]) tokenizer_my.pre_tokenizer = Whitespace() tokenizer_my.train(files=path_my, trainer=trainer_my) tokenizer_my.save("./tokenizer_my/tokenizer_my.json" ) tokenizer_en = Tokenizer.from_file("./tokenizer_en/tokenizer_en.json" ) tokenizer_my = Tokenizer.from_file("./tokenizer_my/tokenizer_my.json" ) source_vocab_size = tokenizer_en.get_vocab_size() target_vocab_size = tokenizer_my.get_vocab_size() CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]" )], dtype=torch.int64).to(device) SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]" )], dtype=torch.int64).to(device) PAD_ID = torch.tensor([tokenizer_my.token_to_id("[PAD]" )], dtype=torch.int64).to(device)
第三步:准备数据集和数据加载器 在这一步中,我们将为源语言和目标语言准备数据集,这些数据集将用于后续训练和验证我们构建的模型。我们将创建一个类,该类接收原始数据集,并定义函数分别使用源(tokenizer_en)和目标(tokenizer_my)分词器对源文本和目标文本进行编码。最后,我们将为训练和验证数据集创建DataLoader,以批量迭代数据集(在我们的示例中,批量大小设置为10)。批量大小可以根据数据大小和可用处理能力进行调整。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 class EncodeDataset (Dataset ): def __init__ (self, raw_dataset, max_seq_len ): super ().__init__() self .raw_dataset = raw_dataset self .max_seq_len = max_seq_len def __len__ (self ): return len (self .raw_dataset) def __getitem__ (self, index ): raw_text = self .raw_dataset[index] source_text = raw_text["en" ] target_text = raw_text["ms" ] source_text_encoded = torch.tensor(tokenizer_en.encode(source_text).ids, dtype = torch.int64).to(device) target_text_encoded = torch.tensor(tokenizer_my.encode(target_text).ids, dtype = torch.int64).to(device) num_source_padding = self .max_seq_len - len (source_text_encoded) - 2 num_target_padding = self .max_seq_len - len (target_text_encoded) - 1 encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device) decoder_padding = torch.tensor([PAD_ID] * num_target_padding, dtype = torch.int64).to(device) encoder_input = torch.cat([CLS_ID, source_text_encoded, SEP_ID, encoder_padding]).to(device) decoder_input = torch.cat([CLS_ID, target_text_encoded, decoder_padding ]).to(device) target_label = torch.cat([target_text_encoded,SEP_ID,decoder_padding]).to(device) encoder_mask = (encoder_input != PAD_ID).unsqueeze(0 ).unsqueeze(0 ).int ().to(device) decoder_mask = (decoder_input != PAD_ID).unsqueeze(0 ).unsqueeze(0 ).int () & causal_mask(decoder_input.size(0 )).to(device) return { 'encoder_input' : encoder_input, 'decoder_input' : decoder_input, 'target_label' : target_label, 'encoder_mask' : encoder_mask, 'decoder_mask' : decoder_mask, 'source_text' : source_text, 'target_text' : target_text } def causal_mask (size ): mask = torch.triu(torch.ones(1 , size, size), diagonal = 1 ).type (torch.int ) return mask == 0 max_seq_len_source = 0 max_seq_len_target = 0 for data in raw_train_dataset["translation" ]: enc_ids = tokenizer_en.encode(data["en" ]).ids dec_ids = tokenizer_my.encode(data["ms" ]).ids max_seq_len_source = max (max_seq_len_source, len (enc_ids)) max_seq_len_target = max (max_seq_len_target, len (dec_ids)) print (f'max_seqlen_source: {max_seq_len_source} ' ) print (f'max_seqlen_target: {max_seq_len_target} ' ) max_seq_len = 550 train_dataset = EncodeDataset(raw_train_dataset["translation" ], max_seq_len) val_dataset = EncodeDataset(raw_validation_dataset["translation" ], max_seq_len) train_dataloader = DataLoader(train_dataset, batch_size = 10 , shuffle = True , generator=torch.Generator(device='cuda' )) val_dataloader = DataLoader(val_dataset, batch_size = 1 , shuffle = True , generator=torch.Generator(device='cuda' ))
第四步:输入嵌入和位置编码 输入嵌入 :从第二步的标记器生成的标记ID序列将被送入嵌入层。嵌入层将标记ID映射到词汇表,并为每个标记生成一个512维的嵌入向量。[512维来自注意力机制论文]。嵌入向量能够根据其训练数据集捕捉标记的语义含义。嵌入向量中的每个维度值代表与该标记相关的一些特征。例如,如果标记是“狗”,某些维度值可能代表眼睛、嘴巴、腿、高度等。如果在n维空间中绘制向量,外观相似的对象如狗、猫会彼此靠近,而外观不相似的对象如学校、家的嵌入向量则会相距较远。
位置编码 :Transformer架构的优势之一是它可以并行处理任意数量的输入序列,这大大减少了训练时间并使预测更快。然而,一个缺点是,在并行处理多个标记序列时,标记在句子中的位置将不会按顺序排列。这可能会导致依赖标记位置的句子产生不同的含义或上下文。因此,为了解决这个问题,注意力机制论文实现了位置编码方法。论文建议对每个标记的512维应用两个数学函数(一个是正弦函数,另一个是余弦函数)。以下是简单的正弦和余弦数学函数。
正弦 函数应用于每个偶数维度值,而余弦 函数应用于嵌入向量的奇数维度值。最后,得到的位置编码向量将被添加到嵌入向量中。现在,我们有了一个既能捕捉标记语义含义又能捕捉标记位置的嵌入向量。请注意,位置编码的值在每个序列中保持不变。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class EmbeddingLayer (nn.Module): def __init__ (self, vocab_size: int , d_model: int ): super ().__init__() self .d_model = d_model self .embedding = nn.Embedding(vocab_size, d_model) def forward (self, input ): embedding_output = self .embedding(input ) * math.sqrt(self .d_model) return embedding_output class PositionalEncoding (nn.Module): def __init__ (self, max_seq_len: int , d_model: int , dropout_rate: float ): super ().__init__() self .dropout = nn.Dropout(dropout_rate) pe = torch.zeros(max_seq_len, d_model) pos = torch.arange(0 , max_seq_len, dtype=torch.float ).unsqueeze(1 ) div_term = torch.exp(torch.arange(0 , d_model, 2 ).float ()) * (-math.log(10000 )/d_model) pe[:, 0 ::2 ] = torch.sin(pos * div_term) pe[:, 1 ::2 ] = torch.cos(pos * div_term) pe = pe.unsqueeze(0 ) def forward (self, input_embdding ): input_embdding = input_embdding + (self .pe[:, :input_embdding.shape[1 ], :]).requires_grad_(False ) return self .dropout(input_embdding)
第5步:多头注意力块 正如Transformer是LLM的核心,自注意力机制则是Transformer架构的核心。
为什么需要自注意力机制? 让我们通过下面的简单例子来回答这个问题。
在句子1 和句子2 中,单词“bank ”显然有两个不同的含义。然而,这两个句子中单词“bank ”的嵌入值是相同的。这并不合理。我们希望嵌入值能根据句子的上下文进行变化。因此,我们需要一种机制,使得嵌入值能够动态地根据句子的整体含义进行更新。自注意力机制能够动态地更新嵌入值,使其能够代表基于句子上下文的含义。
如果自注意力机制已经如此出色,为什么我们还需要多头自注意力? 让我们通过下面的另一个例子来找出答案。
在这个例子中,如果我们使用仅关注句子某一方面的自注意力机制,比如只关注“what ”方面,可能只能捕捉到“John做了什么? ”。然而,其他方面,如“when ”或“where ”,对于模型更好地执行同样重要。因此,我们需要找到一种方法,使自注意力机制能够同时学习句子中的多种关系。这就是多头自注意力 (多头注意力可以互换使用)发挥作用的地方。在多头注意力中,单头嵌入将被分成多个头,每个头将关注句子的不同方面并相应地学习。这正是我们想要的。
现在,我们知道为什么需要多头注意力。让我们看看它是如何工作的。
如果你对矩阵乘法感到舒适,理解这个机制就相当容易。首先看一下整个流程图,然后我将从输入到多头注意力的输出,按点进行描述。
首先,让我们复制3份编码器输入(输入嵌入和位置编码的组合,我们在第4步中已经完成)。分别命名为Q、K和V。它们只是编码器输入的副本。编码器输入形状:(seq_len, d_model),seq_len:最大序列长度,d_model:嵌入向量维度,这里为512。
接下来,我们将对Q、K和V分别与权重W_q、W_k和W_v进行矩阵乘法。每个权重矩阵的形状为(d_model, d_model)。得到的新查询 、键 和值 嵌入向量的形状为(seq_len, d_model)。权重参数将由模型随机初始化,并在训练开始后更新。为什么我们需要权重矩阵乘法?因为这些是可学习的参数,对于查询、键和值嵌入向量来说,需要这些参数来提供更好的表示。
根据注意力论文,头的数量为8。每个新的查询 、键 和值 嵌入向量将被分成8个较小的查询、键和值嵌入向量。新的嵌入向量形状为(seq_len, d_model/num_heads)或(seq_len, d_k)。[ d_k = d_model/num_heads ]。
每个查询嵌入向量将与自身和其他序列中所有嵌入向量的键嵌入向量的转置进行点积运算。这个点积给出注意力分数。注意力分数显示给定标记与输入序列中所有其他标记的相似度。分数越高,相似度越高。
注意力分数随后将被除以d_k的平方根,这是为了跨矩阵归一化分数值。但为什么必须除以d_k来归一化,可以是其他任何数字。主要原因是,随着嵌入向量维度的增加,注意力矩阵的总方差也相应增加。这就是为什么除以d_k会平衡方差的增加。如果我们不除以d_k,对于任何较高的注意力分数,softmax函数将给出非常高的概率值,而对于任何较低的注意力分数值,softmax函数将给出非常低的概率值。这将导致模型只关注学习那些高概率值的特征,而忽略低概率值的特征,这会导致梯度消失。因此,归一化注意力分数矩阵非常必要。
在执行softmax函数之前,如果编码器掩码不是None,注意力分数将与编码器掩码进行矩阵乘法。如果掩码是因果掩码,那么输入序列中出现在其后的那些嵌入标记的注意力分数值将被替换为负无穷。softmax函数会将负无穷转换为接近零的值。因此,模型不会学习那些出现在当前标记之后的特征。这就是我们如何防止未来标记影响模型学习的方法。
然后对注意力分数矩阵应用softmax函数,输出形状为(seq_len, seq_len)的权重矩阵。
这些权重矩阵随后将与相应的值 嵌入向量进行矩阵乘法。这将得到形状为(seq_len, d_v)的8个注意力头。[ d_v = d_model/num_heads ]。
最后,所有头将被连接成一个新形状为(seq_len, d_model)的单头。这个新单头将与输出权重矩阵W_o (d_model, d_model)进行矩阵乘法。多头注意力的最终输出代表了单词的上下文含义,并能够学习输入句子的多个方面。
接下来,让我们开始编写多头注意力块的代码,这会简单得多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 class MultiHeadAttention (nn.Module): def __init__ (self, d_model: int , num_heads: int , dropout_rate: float ): super ().__init__() self .dropout = nn.Dropout(dropout_rate) self .W_q = nn.Linear(d_model, d_model) self .W_k = nn.Linear(d_model, d_model) self .W_v = nn.Linear(d_model, d_model) self .W_o = nn.Linear(d_model, d_model) self .num_heads = num_heads assert d_model % num_heads == 0 , "d_model必须能被头的数量整除" self .d_k = d_model // num_heads def forward (self, q, k, v, encoder_mask=None ): query = self .W_q(q) key = self .W_k(k) value = self .W_v(v) query = query.view(query.shape[0 ], query.shape[1 ], self .num_heads ,self .d_k).transpose(1 ,2 ) key = key.view(key.shape[0 ], key.shape[1 ], self .num_heads ,self .d_k).transpose(1 ,2 ) value = value.view(value.shape[0 ], value.shape[1 ], self .num_heads ,self .d_k).transpose(1 ,2 ) attention_score = (query @ key.transpose(-2 ,-1 ))/math.sqrt(self .d_k) if encoder_mask is not None : attention_score = attention_score.masked_fill(encoder_mask==0 , -1e9 ) attention_weight = torch.softmax(attention_score, dim=-1 ) if self .dropout is not None : attention_weight = self .dropout(attention_weight) attention_output = attention_score @ value attention_output = attention_output.transpose(1 ,2 ).contiguous().view(attention_output.shape[0 ], -1 , self .num_heads * self .d_k) multihead_output = self .W_o(attention_output) return multihead_output
第6步:前馈网络、层归一化和AddAndNorm 前馈网络 :前馈网络使用深度神经网络来学习嵌入向量的所有特征,通过两个线性层(第一层有d_model个节点,第二层有d_ff个节点,值根据注意力论文分配),并在第一层线性层的输出上应用ReLU激活函数,从而为嵌入值提供非线性,同时应用dropout以进一步避免过拟合。
层归一化(LayerNorm) :我们对嵌入值应用层归一化,以确保网络中嵌入向量值的分布保持一致。这保证了平滑的学习过程。我们将使用额外的学习参数gamma和beta来根据网络需要缩放和偏移嵌入值。
AddAndNorm :这包括一个跳跃连接和一个层归一化(前面解释过)。在前向传播过程中,跳跃连接确保早期层的特征在后期仍能被记住,以便在计算输出时做出必要的贡献。同样,在反向传播过程中,跳跃连接通过减少每个阶段的反向传播次数来防止梯度消失。AddAndNorm在编码器(2次)和解码器块(3次)中都有使用。它从前一层获取输入,先进行归一化,然后将其添加到前一层的输出中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class FeedForward (nn.Module): def __init__ (self, d_model: int , d_ff: int , dropout_rate: float ): super ().__init__() self .layer_1 = nn.Linear(d_model, d_ff) self .activation_1 = nn.ReLU() self .dropout = nn.Dropout(dropout_rate) self .layer_2 = nn.Linear(d_ff, d_model) def forward (self, input ): return self .layer_2(self .dropout(self .activation_1(self .layer_1(input )))) class LayerNorm (nn.Module): def __init__ (self, eps: float = 1e-5 ): super ().__init__() self .eps = eps self .gamma = nn.Parameter(torch.ones(1 )) self .beta = nn.Parameter(torch.zeros(1 )) def forward (self, input ): mean = input .mean(dim=-1 , keepdim=True ) std = input .std(dim=-1 , keepdim=True ) return self .gamma * ((input - mean)/(std + self .eps)) + self .beta class AddAndNorm (nn.Module): def __init__ (self, dropout_rate: float ): super ().__init__() self .dropout = nn.Dropout(dropout_rate) self .layer_norm = LayerNorm() def forward (self, input , sub_layer ): return input + self .dropout(sub_layer(self .layer_norm(input )))
第7步:编码器块与编码器 编码器块 :编码器块内部主要包含两个组件:多头注意力(Multi-Head Attention)和前馈网络(Feedforward)。此外,还有两个Add & Norm单元。我们将按照《Attention》论文中的流程,在EncoderBlock类中组装这些组件。根据论文描述,这个编码器块被重复了6次。
编码器 :接下来,我们将创建一个名为Encoder的附加类,该类接收一系列EncoderBlock并将其堆叠,最终输出编码器的输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class EncoderBlock (nn.Module): def __init__ (self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float ): super ().__init__() self .multihead_attention = multihead_attention self .feed_forward = feed_forward self .add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range (2 )]) def forward (self, encoder_input, encoder_mask ): encoder_input = self .add_and_norm_list[0 ](encoder_input, lambda encoder_input: self .multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask)) encoder_input = self .add_and_norm_list[1 ](encoder_input, self .feed_forward) return encoder_input class Encoder (nn.Module): def __init__ (self, encoderblocklist: nn.ModuleList ): super ().__init__() self .encoderblocklist = encoderblocklist self .layer_norm = LayerNorm() def forward (self, encoder_input, encoder_mask ): for encoderblock in self .encoderblocklist: encoder_input = encoderblock(encoder_input, encoder_mask) encoder_output = self .layer_norm(encoder_input) return encoder_output
第8步:解码器块、解码器和投影层 解码器块: 解码器块主要包含三个组件:掩码多头注意力(Masked Multi-Head Attention)、多头注意力(Multi-Head Attention)和前馈网络(Feedforward)。解码器块还包含3个“加法与归一化”(Add & Norm)单元。我们将按照《Attention》论文中的流程,在DecoderBlock类中组装这些组件。根据论文,这个解码器块被重复了6次。
解码器: 我们将创建一个名为Decoder的附加类,它接收一系列DecoderBlock,将其堆叠并输出最终的解码器输出。
解码器块中包含两种多头注意力机制。第一种是掩码多头注意力,它以解码器输入作为查询(query)、键(key)和值(value),并使用一个解码器掩码 (也称为因果掩码 )。因果掩码防止模型查看序列顺序中靠后的嵌入。关于其工作原理的详细解释,请参见第3步和第5步。
投影层: 最终的解码器输出将被传递到投影层。在这个层中,解码器输出首先被送入一个线性层,其嵌入形状会根据下面的代码部分进行改变。随后,softmax函数将解码器输出转换为词汇表上的概率分布,并选择概率最高的词元作为预测输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class DecoderBlock (nn.Module): def __init__ (self, masked_multihead_attention: MultiHeadAttention,multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float ): super ().__init__() self .masked_multihead_attention = masked_multihead_attention self .multihead_attention = multihead_attention self .feed_forward = feed_forward self .add_and_norm_list = nn.ModuleList([AddAndNorm(dropout_rate) for _ in range (3 )]) def forward (self, decoder_input, decoder_mask, encoder_output, encoder_mask ): decoder_input = self .add_and_norm_list[0 ](decoder_input, lambda decoder_input: self .masked_multihead_attention(decoder_input,decoder_input, decoder_input, decoder_mask)) decoder_input = self .add_and_norm_list[1 ](decoder_input, lambda decoder_input: self .multihead_attention(decoder_input,encoder_output, encoder_output, encoder_mask)) decoder_input = self .add_and_norm_list[2 ](decoder_input, self .feed_forward) return decoder_input class Decoder (nn.Module): def __init__ (self,decoderblocklist: nn.ModuleList ): super ().__init__() self .decoderblocklist = decoderblocklist self .layer_norm = LayerNorm() def forward (self, decoder_input, decoder_mask, encoder_output, encoder_mask ): for decoderblock in self .decoderblocklist: decoder_input = decoderblock(decoder_input, decoder_mask, encoder_output, encoder_mask) decoder_output = self .layer_norm(decoder_input) return decoder_output class ProjectionLayer (nn.Module): def __init__ (self, vocab_size: int , d_model: int ): super ().__init__() self .projection_layer = nn.Linear(d_model, vocab_size) def forward (self, decoder_output ): output = self .projection_layer(decoder_output) return torch.log_softmax(output, dim=-1 )
最终,我们已经完成了Transformer架构中所有组件块的构建。唯一剩下的任务就是将它们全部组装起来。
我们将首先创建一个Transformer类 ,该类将初始化所有组件类的实例。在Transformer类内部,我们首先定义编码函数 ,该函数完成Transformer编码器部分的所有任务并生成编码器输出。其次,我们定义解码函数 ,该函数完成Transformer解码器部分的所有任务并生成解码器输出。最后,我们定义一个投影函数 ,该函数接收解码器输出并将其映射到词汇表以进行预测。
现在,Transformer架构已经准备就绪。我们可以通过定义一个函数来构建我们的翻译LLM模型,该函数接收如下代码中给出的所有必要参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class Transformer (nn.Module): def __init__ (self, source_embed: EmbeddingLayer, target_embed: EmbeddingLayer, positional_encoding: PositionalEncoding, multihead_attention: MultiHeadAttention, masked_multihead_attention: MultiHeadAttention, feed_forward: FeedForward, encoder: Encoder, decoder: Decoder, projection_layer: ProjectionLayer, dropout_rate: float ): super ().__init__() self .source_embed = source_embed self .target_embed = target_embed self .positional_encoding = positional_encoding self .multihead_attention = multihead_attention self .masked_multihead_attention = masked_multihead_attention self .feed_forward = feed_forward self .encoder = encoder self .decoder = decoder self .projection_layer = projection_layer self .dropout = nn.Dropout(dropout_rate) def encode (self, encoder_input, encoder_mask ): encoder_input = self .source_embed(encoder_input) encoder_input = self .positional_encoding(encoder_input) encoder_output = self .encoder(encoder_input, encoder_mask) return encoder_output def decode (self, decoder_input, decoder_mask, encoder_output, encoder_mask ): decoder_input = self .target_embed(decoder_input) decoder_input = self .positional_encoding(decoder_input) decoder_output = self .decoder(decoder_input, decoder_mask, encoder_output, encoder_mask) return decoder_output def project (self, decoder_output ): return self .projection_layer(decoder_output) def build_model (source_vocab_size, target_vocab_size, max_seq_len=1135 , d_model=512 , d_ff=2048 , num_heads=8 , num_blocks=6 , dropout_rate=0.1 ): source_embed = EmbeddingLayer(source_vocab_size, d_model) target_embed = EmbeddingLayer(target_vocab_size, d_model) positional_encoding = PositionalEncoding(max_seq_len, d_model, dropout_rate) multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate) masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate) feed_forward = FeedForward(d_model, d_ff, dropout_rate) projection_layer = ProjectionLayer(target_vocab_size, d_model) encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate) decoder_block = DecoderBlock(masked_multihead_attention,multihead_attention, feed_forward, dropout_rate) encoderblocklist = [] decoderblocklist = [] for _ in range (num_blocks): encoderblocklist.append(encoder_block) for _ in range (num_blocks): decoderblocklist.append(decoder_block) encoderblocklist = nn.ModuleList(encoderblocklist) decoderblocklist = nn.ModuleList(decoderblocklist) encoder = Encoder(encoderblocklist) decoder = Decoder(decoderblocklist) model = Transformer(source_embed, target_embed, positional_encoding, multihead_attention, masked_multihead_attention,feed_forward, encoder, decoder, projection_layer, dropout_rate) for param in model.parameters(): if param.dim() > 1 : nn.init.xavier_uniform_(param) return model model = build_model(source_vocab_size, target_vocab_size)
第十步:训练和验证我们的构建LLM模型 现在是时候训练我们的模型了。训练过程相当直接。我们将使用在第三步中创建的训练DataLoader。由于总训练数据集数量为100万,我强烈建议在GPU设备上训练我们的模型。我大约花了5小时完成了20个epoch。每个epoch结束后,我们将保存模型权重以及优化器状态,这样便于从停止点继续训练,而不是从头开始。
每个epoch结束后,我们将使用验证DataLoader进行验证。验证数据集的大小为2000,相当合理。在验证过程中,我们只需计算一次编码器输出,直到解码器输出结束符[SEP],这是因为直到解码器接收到[SEP]标记,我们不得不反复发送相同的编码器输出,这没有意义。
解码器输入首先以句子开始标记[CLS]开始。每次预测后,解码器输入将追加下一个生成的标记,直到达到句子结束标记[SEP]。最后,投影层将输出映射到相应的文本表示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 def training_model (preload_epoch=None ): EPOCHS = 20 initial_epoch = 0 global_step = 0 optimizer = torch.optim.Adam(model.parameters(), lr=1e-3 ) if preload_epoch is not None : model_filename = f"./malaygpt/model_{preload_epoch} .pt" state = torch.load(model_filename) initial_epoch = state['epoch' ] + 1 optimizer.load_state_dict(state['optimizer_state_dict' ]) global_step = state['global_step' ] loss_fn = nn.CrossEntropyLoss(ignore_index = tokenizer_en.token_to_id("[PAD]" ), label_smoothing=0.1 ).to(device) for epoch in range (initial_epoch, EPOCHS): model.train() for batch in tqdm(train_dataloader): encoder_input = batch['encoder_input' ].to(device) decoder_input = batch['decoder_input' ].to(device) target_label = batch['target_label' ].to(device) encoder_mask = batch['encoder_mask' ].to(device) decoder_mask = batch['decoder_mask' ].to(device) encoder_output = model.encode(encoder_input, encoder_mask) decoder_output = model.decode(decoder_input, decoder_mask, encoder_output, encoder_mask) projection_output = model.project(decoder_output) loss = loss_fn(projection_output.view(-1 , projection_output.shape[-1 ]), target_label.view(-1 )) optimizer.zero_grad() loss.backward() optimizer.step() global_step += 1 print (f'Epoch [{epoch+1 } /{EPOCHS} ]: Train Loss: {loss.item():.2 f} ' ) model_filename = f"./malaygpt/model_{epoch} .pt" torch.save({ 'epoch' : epoch, 'model_state_dict' : model.state_dict(), 'optimizer_state_dict' : optimizer.state_dict(), 'global_step' : global_step }, model_filename) model.eval () with torch.inference_mode(): for batch in tqdm(val_dataloader): encoder_input = batch['encoder_input' ].to(device) encoder_mask = batch['encoder_mask' ].to(device) source_text = batch['source_text' ] target_text = batch['target_text' ] encoder_output = model.encode(encoder_input, encoder_mask) decoder_input = torch.empty(1 ,1 ).fill_(tokenizer_my.token_to_id('[CLS]' )).type_as(encoder_input).to(device) while True : if decoder_input.size(1 ) == max_seq_len: break decoder_mask = causal_mask(decoder_input.size(1 )).type_as(encoder_mask).to(device) decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask) projection = model.project(decoder_output[:, -1 ]) _, new_token = torch.max (projection, dim=1 ) new_token = torch.empty(1 ,1 ).type_as(encoder_input).fill_(new_token.item()).to(device) decoder_input = torch.cat([decoder_input, new_token], dim=1 ) if new_token == tokenizer_my.token_to_id('[SEP]' ): break decoder_output = decoder_input.squeeze(0 ) model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy()) print (f'SOURCE TEXT": {source_text} ' ) print (f'TARGET TEXT": {target_text} ' ) print (f'PREDICTED TEXT": {model_predicted_text} ' ) training_model(preload_epoch=None )
第11步:创建一个函数,用我们构建的模型测试新的翻译任务 我们将给我们的翻译函数一个通用的名称,称为malaygpt。该函数接收用户输入的英文原始文本,并输出翻译成马来语的文本。让我们运行这个函数并尝试一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def malaygpt (user_input_text ): model.eval () with torch.inference_mode(): user_input_text = user_input_text.strip() user_input_text_encoded = torch.tensor(tokenizer_en.encode(user_input_text).ids, dtype = torch.int64).to(device) num_source_padding = max_seq_len - len (user_input_text_encoded) - 2 encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype = torch.int64).to(device) encoder_input = torch.cat([CLS_ID, user_input_text_encoded, SEP_ID, encoder_padding]).to(device) encoder_mask = (encoder_input != PAD_ID).unsqueeze(0 ).unsqueeze(0 ).int ().to(device) encoder_output = model.encode(encoder_input, encoder_mask) decoder_input = torch.empty(1 ,1 ).fill_(tokenizer_my.token_to_id('[CLS]' )).type_as(encoder_input).to(device) while True : if decoder_input.size(1 ) == max_seq_len: break decoder_mask = causal_mask(decoder_input.size(1 )).type_as(encoder_mask).to(device) decoder_output = model.decode(decoder_input,decoder_mask,encoder_output,encoder_mask) projection = model.project(decoder_output[:, -1 ]) _, new_token = torch.max (projection, dim=1 ) new_token = torch.empty(1 ,1 ).type_as(encoder_input).fill_(new_token.item()).to(device) decoder_input = torch.cat([decoder_input, new_token], dim=1 ) if new_token == tokenizer_my.token_to_id('[SEP]' ): break decoder_output = decoder_input.squeeze(0 ) model_predicted_text = tokenizer_my.decode(decoder_output.detach().cpu().numpy()) return model_predicted_text
测试时间!让我们进行一些翻译测试。
“翻译似乎运行得相当好。”
就是这样! 我非常有信心,你现在能够使用PyTorch从头开始构建自己的大型语言模型。你也可以在其他语言数据集上训练这个模型,并在该语言中执行翻译任务。现在,你已经学会了如何从头开始构建原始的Transformer,我可以保证你现在能够学习和实现市场上大多数大型语言模型的应用。
下一步是什么? 我将通过微调Llama 3模型来构建一个完全功能的应用程序,这是目前市场上最流行的开源大型语言模型之一。我还将分享完整的源代码。
所以,请继续关注并非常感谢你的阅读!
Google Colab笔记本链接
参考文献