Transformer实践解析

简介

Transformer 的核心思想是利用自注意力机制(self-attention - 能注意到输入序列的不同位置以计算该序列的表示的能力。基于多头注意力机制,每个单词会被转化为 Token (词元)


核心机制

位置编码

Transformer 中不包含任何的循环或卷积,因此添加了位置编码,为模型提供一些关于单词在句子中相对位置的信息。

位置编码向量被加到嵌入(embedding)向量中。嵌入表示一个 d 维空间的标记,在 d 维空间中有着相似含义的标记会离彼此更近。但嵌入并没有对在一句话中的词的相对位置进行编码,因此当加上位置编码后,单词将基于他们含义的相似度以及他们在句子中的位置,在 d 维空间中离彼此更近。

点积注意力

Transformer 使用的注意力函数共有三个输入:Q 请求、K 主键、V 数值。

点积注意力被缩小深度的平方根倍,这样做是因为对于较大的深度值,点积的大小会成倍增大,从而推动 softmax 函数往仅有很小的梯度的方向靠拢,导致 hardsoftmax

多头注意力

多头注意力由四部分组成:

  • 线性层并拆分为多头。
  • 按比例缩放点积注意力。
  • 多头及联。
  • 最后一层线性层。

每个多头注意力块有三个输入:Q 请求、K 主键、V 数值。这些输入经过线行(Dense)层,并会被拆分为多头。

QKV 被拆分为多头,而不是单头,因为多头允许模型共同注意来自不同表示空间的不同位置的信息。在拆分后每个头部的维度减少,因此总的计算成本与有全部维度的单个注意力头相同。

掩码

掩码一批序列中所有的填充标记,其确保模型不会将填充物作为输入。该 mask 表示填充值 0 出现的位置:这些位置输出 1,否则输出 0

点式前馈网络

点式前馈网络由两个全连接层组成,两层之间插入一个 ReLU 激活函数。

编码器

编码器包含以下部分:

  • 输入嵌入(Input Embedding)。

  • 位置编码(Positional Encoding)。

  • N编码器层(encoder layers,每个编码器层包含以下子层:

    • 多头注意力(有填充掩码)。
    • 点式前馈网络。

    每个子层在其周围有一个残差连接,然后进行层归一化。归一化是在 d_model 最后一个维度完成的。残差连接是为了避免深度网络中的梯度消失问题。
    每个子层的输出是 LayerNorm(x + Sublayer(x))

输入通过嵌入(embedding)后,该嵌入与位置编码相加。该加法结果的输出会作为编码器层的输入。编码器的输出即解码器的输入。

解码器

解码器包含以下部分:

  • 输出嵌入(Output Embedding)。

  • 位置编码(Positionak Encoding)。

  • N编码器层(decoder layers,每个解码器层包含以下子层:

    • 掩码的多头注意力。
    • 多头注意力(用填充掩码),V 数值和 K 主键接收编码器输出作为输入,Q 接收掩码的多头注意力子层的输出。
    • 点式前馈网络。

    每个子层在其周围有一个残差连接,然后进行层归一化。每个子层的输出是 LayerNorm(x + Sublayer(x))。归一化是在 d_model 最后一个维度完成的。
    Q 接收到解码器的第一个注意力块的输出,并且 K 接收到编码器的输出时,注意力权重表示根据编码器的输出赋予解码器输入的重要性。换句话说,解码器通过查看编码器的输出和对其自身输出的自注意力,来预测下一个词。

目标经过一个嵌入后,该嵌入与位置编码相加,该加法结果就是解码器层的输入。解码器的输出是最后线性层的输入。


本文是基于《用Transformers处理自然语言》一书的简要学习记录,并没有很详细的方案,有兴趣的话可以阅读原文。


文本分类

文本分类是NLP中最常见的任务之一。另一种常见的文本分类是情感分析,其目的是识别特定文本的倾向性。

数据集标记化

1
2
print(tokenize(emotions["train"][:2]))
{'input_ids': [[101, 1045, 2134, 2102, 2514, 26608, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [101, 1045, 2064, 2175, 2013, 3110, 2061, 20625, 2000, 2061, 9636, 17772, 2074, 2013, 2108, 2105, 2619, 2040, 14977, 1998, 2003, 8300, 102]], 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,,,,],[,,,,,,,,,,,,,,,,,,,, 1, 1, 1]]}

训练文本分类器

Tranformer 提取特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 使用预训练分类器
from transformers import AutoModel
model_ckpt = "distilbert-base-uncased"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModel.from_pretrained(model_ckpt).to(device)

# 提取最后一层隐藏状态
inputs = {k:v.to(device) for k,v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)

print(outputs)
# BaseModelOutput(last_hidden_state=tensor([[[-0.1565, -0.1862, 0.0528, ..., -0.1188, 0.0662, 0.5470], [-0.3575, -0.6484, -0.0618, ..., -0.3040, 0.3508, 0.5221], [-0.2772, -0.4459, 0.1818, ..., -0.0948, -0.0076, 0.9958], [-0.2841, -0.3917, 0.3753, ..., -0.2151, -0.1173, 1.0526], [ 0.2661, -0.5094, -0.3180, ..., -0.4203, 0.0144, -0.2149], [ 0.9441, 0.0112, -0.4714, ..., 0.1439, -0.7288, -0.1619]]], device='cuda:0'), hidden_states=None, attentions=None)
outputs.last_hidden_state.size()
# torch.Size([1, 6, 768]) 形状是 [batch_size, n_tokens, hidden_dim] 即六个输入标记中的每一个都会返回一个768 维的向量

特征矩阵

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def extract_hidden_states(batch):
# 将前面的步骤包裹在一个处理函数中
inputs = { k:v.to(device) for k,v in batch.items() if k in tokenizer.model_input_names } # Extract last hidden states
with torch.no_grad():
last_hidden_state = model(**inputs).last_hidden_state # Return vector for [CLS] token
return {"hidden_state": last_hidden_state[:,0].cpu().numpy()}

emotions_hidden = emotions_encoded.map(extract_hidden_states, batched=True) # 一次性提取所有分片的隐藏状态
emotions_hidden["train"].column_names
# ['attention_mask', 'hidden_state', 'input_ids', 'label', 'text']

import numpy as np
X_train = np.array(emotions_hidden["train"]["hidden_state"])
X_valid = np.array(emotions_hidden["validation"]["hidden_state"])
y_train = np.array(emotions_hidden["train"]["label"]
y_valid = np.array(emotions_hidden["validation"]["label"])
X_train.shape, X_valid.shape # ((16000, 768), (2000, 768))

可视化训练集 使用强大的 UMAP 算法将向量向下投射到二维。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from umap import UMAP
from sklearn.preprocessing import MinMaxScaler # Scale features to [0,1] range

X_scaled = MinMaxScaler().fit_transform(X_train)
# Initialize and fit UMAP
mapper = UMAP(n_components=2, metric="cosine").fit(X_scaled)
# Create a DataFrame of 2D embeddings
df_emb = pd.DataFrame(mapper.embedding_, columns=["X", "Y"])
df_emb["label"] = y_train
df_emb.head()

fig, axes = plt.subplots(2, 3, figsize=(7,5))
axes = axes.flatten()
cmaps = ["Greys", "Blues", "Oranges", "Reds", "Purples", "Greens"]
labels = emotions["train"].features["label"].names
for i, (label, cmap) in enumerate(zip(labels, cmaps)):
df_emb_sub = df_emb.query(f"label == {i}")
axes[i].hexbin(df_emb_sub["X"], df_emb_sub["Y"], cmap=cmap, gridsize=20, linewidths=(0,))
axes[i].set_title(label)
axes[i].set_xticks([]), axes[i].set_yticks([])
plt.tight_layout()
plt.show()

微调 Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 加载预训练模型
from transformers import AutoModelForSequenceClassification
num_labels = 6
model = (AutoModelForSequenceClassification .from_pretrained(model_ckpt,
num_labels=num_labels) .to(device))

# 定义训练指标
from sklearn.metrics import accuracy_score, f1_score
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
f1 = f1_score(labels, preds, average="weighted")
acc = accuracy_score(labels, preds)
return {"accuracy": acc, "f1": f1}

# 训练模型
from transformers import Trainer, TrainingArguments
batch_size = 64
logging_steps = len(emotions_encoded["train"]) # batch_size
model_name = f"{model_ckpt}-finetuned-emotion"
training_args = TrainingArguments(output_dir=model_name, num_train_epochs=2, learning_rate=2e-5, per_device_train_batch_size=batch_size, per_device_eval_batch_size=batch_size, weight_decay=0.01, evaluation_strategy="epoch", disable_tqdm=False, logging_steps=logging_steps, push_to_hub=True, log_level="error")

trainer = Trainer(model=model, args=training_args,
compute_metrics=compute_metrics, train_dataset=emotions_encoded["train"],
eval_dataset=emotions_encoded["validation"], tokenizer=tokenizer)
trainer.train()

# 回测
preds_output = trainer.predict(emotions_encoded["validation"])
preds_output.metrics
{'test_loss': 0.22047173976898193, 'test_accuracy': 0.9225, 'test_f1': 0.9225500751072866, 'test_runtime': 1.6357, 'test_samples_per_second': 1222.725, 'test_steps_per_second': 19.564}

y_preds = np.argmax(preds_output.predictions, axis=1)
plot_confusion_matrix(y_preds, y_valid, labels)

文本摘要

某一时刻可能需要对一份报告、研究文章、财务报告进行摘要,摘要是一个经典的序列到序列(seq2seq)任务,有一个输入文本和一个目标文本。这也是 Transformer 编码器-解码器的优势所在。

CNN/DailyMail 数据集大约有 30w 条新闻文章及对应的摘录组成,这些摘要由 CNNDailyMail 在其文章中附加的要点组成。

1
2
3
4
5
# 数据概况
from datasets import load_dataset
dataset = load_dataset("cnn_dailymail", version="3.0.0")
print(f"Features: {dataset['train'].column_names}")
Features: ['article', 'highlights', 'id'] # 文章,其中包含新闻文章,亮点与摘要,以及唯一标识每篇文章的ID

文本摘要流水线:

  • GPT-2
  • T5
  • BART
  • PEGASUS

摘要评比:

  • BLEU -> SacreBLEU
  • ROUGE

问答系统

在某个时刻你可能需要在海量数据中寻找所需要的信息,搜索引擎会首先检索文件,然欧进行一个额外的处理步骤,提取带有相应段落和网页的答案片段。
这种技术背后的方法被称为问题回答 QA。最常见的是提取式 QA,问题的答案被识别为文档中的一段文字,首先检索文档,然后从中提取答案。

基于评论的问答系统

SubjQA 由超过 1w 条英文的顾客评论组成。

1
2
3
from datasets import get_dataset_config_names
domains = get_dataset_config_names("subjqa")
domains ['books', 'electronics', 'grocery', 'movies', 'restaurants', 'tripadvisor']

文本中提取答案

从文本中提取答案的最常见的方法是把问题看作一个跨度分类任务,其中答案跨度的开始和结束符号作为模型需要预测的标签。

1
2
3
4
5
6
7
8
9
10
11
from transformers import AutoTokenizer
model_ckpt = "deepset/minilm-uncased-squad2"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

uestion = "How much music can this hold?" # 问题
context = """An MP3 is about 1 MB/minute, so about 6000 hours depending on file size.""" # 语境
inputs = tokenizer(question, context, return_tensors="pt")

print(tokenizer.decode(inputs["input_ids"][0]))
# [CLS] how much music can this hold? [SEP] an mp3 is about 1 mb / minute, so about 6000 hours depending on file size. [SEP]
# 输出格式 [CLS] question tokens [SEP] context tokens [SEP]

文本已经被标记化,现在需要用一个 QA 头来实例化这个模型,并通过前向传递来运行输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import torch
from transformers import AutoModelForQuestionAnswering
model = AutoModelForQuestionAnswering.from_pretrained(model_ckpt)
with torch.no_grad():
outputs = model(**inputs)
print(outputs)
# QuestionAnsweringModelOutput(loss=None, start_logits=tensor([[-0.9862, -4.7750, -5.4025, -5.2378, -5.2863, -5.5117, -4.9819, -6.1880, -0.9862, 0.2596, -0.2144, -1.7136, 3.7806, 4.8561, -1.0546, -3.9097, -1.7374, -4.5944, -1.4278, 3.9949, 5.0390, -0.2018, -3.0193, -4.8549, -2.3107, -3.5110, -3.5713, -0.9862]]), end_logits=tensor([[-0.9623, -5.4733, -5.0326, -5.1639, -5.4278, -5.5151, -5.1749, -4.6233, -0.9623, -3.7855, -0.8715, -3.7745, -3.0161, -1.1780, 0.1758, -2.7365, 4.8934, 0.3046, -3.1761, -3.2762, 0.8937, 5.6606, -0.3623, -4.9554, -3.2531, -0.0914, 1.6211, -0.9623]]), hidden_states=None, attentions=None)

# 将输出转换为答案跨度,首先需要得到开始和结束标记的对数
start_logits = outputs.start_logits
end_logits = outputs.end_logits
print(f"Input IDs shape: {inputs.input_ids.size()}")
print(f"Start logits shape: {start_logits.size()}")
print(f"End logits shape: {end_logits.size()}")
# Input IDs shape: torch.Size([1, 28])
# Start logits shape: torch.Size([1, 28])
# End logits shape: torch.Size([1, 28])

为了得到最终的答案,可以计算开始和结束标记对数的 argmax,然后从输入中切出跨度。

1
2
3
4
5
6
7
8
9
10
import torch
start_idx = torch.argmax(start_logits)
end_idx = torch.argmax(end_logits) + 1
answer_span = inputs["input_ids"][0][start_idx:end_idx]
answer = tokenizer.decode(answer_span)
print(f"Question: {question}")
print(f"Answer: {answer}")

# Question: How much music can this hold?
# Answer: 6000 hours

很好,它成功了!

处理长段落文本

tokenizer 中设置 return_overflowing_tokens=True 参数启用滑动窗口(Transformer 训练预提取后的短信特征)。

检索器类型

  • TF-IDF
  • BM25
  • Embedding
  • DPR

评估检索器

  • 使用检索器内置的 eval() 方法。
  • 构建一个自定义流水线,将检索器与 EvalRetriever 类结合起来。

评估阅读器

  • 完全匹配(EM
  • F1-score

性能优化

探讨四种互补的技术:知识蒸馏、量化、剪枝、开放网络神经交换(ONNX)格式及 ONNXRuntimeORT)进行图的优化。

知识蒸馏

知识蒸馏是一种通用的方法,用于训练一个较小的学生模型来模仿一个较慢的、较大的、但表现较好的教师模型的行为。

Optuna 超参数优化框架。

量化

量化采取了一种不同的方法;它不是减少计算的数量,而是通过用 8 位整数(INT8)而不是通常的 32 位浮点(FP32)这样的低精度数据类型来表示权重和激活,从而使它们更有效率。
减少比特数意味着所产生的模型需要更少的内存存储,而且像矩阵乘法这样的操作可以通过整数运算更快地进行。

量化的基本思想是,我们可以将每个张量中的浮点值 f “离散化”,将其范围[fmax, fmin]映射到一个更小的[qmax]的定点数字 q 中。定点数字 q 的范围[qmax, qmin],并对两者之间的所有数值进行线性分配。

对于深度神经网络,通常主要有以下三种量化方法:

  • 动态量化。当使用动态量化时,在训练过程中没有任何变化,只有在推理过程中才会进行调整。这种方法是动态的,量化是即时进行的,其意味着所有的矩阵乘法都可以用高度优化的 INT8 函数来计算。
  • 静态量化。静态量化是通过在推理时间之前观察数据的代表性样本的激活模式来实现的。
  • 量化训练。量化训练中不使用 INT8 值,而是对 FP32 值进行四舍五入,以模拟量化的效果。

剪枝

权重修剪模型使得更加稀疏 top-K

ONNX

ONNX 是一个开放的标准,它定义了一套通用的运算符和通用的文件格式,以表示各种框架的深度学习模型,包括 PyTorchTensorFlow。当一个模型被导出到 ONNX 格式时,这些运算符被用来构建一个计算图(通常称为中间表示法),表示数据在神经网络中的流动。


从头开始训练一个 Transformer

在这部分将决定哪种架构最适合这项任务,初始化一个没有预训练权重的新模型,设置一个自定义数据加载类,并创建一个可扩展的训练循环。

创建Transformer

加载 gpt2-xl 的配置,使用相同的超参数,只为新的标记器调整词汇量大小。

1
2
3
4
5
6
7
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
config = AutoConfig.from_pretrained("gpt2", vocab_size=len(tokenizer))
model = AutoModelForCausalLM.from_config(config)

print(f'GPT-2 size: {model_size(model)/1000**2:.1f}M parameters') # 模型大小
# GPT-2 size: 110.0M parameters

配置超参数

设置训练用的超参数,将其封装在一个命名空间中,便于访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from argparse import Namespace
# Commented parameters correspond to the small model
config = {
"train_batch_size": 2, # 12
"valid_batch_size": 2, # 12
"weight_decay": 0.1,
"shuffle_buffer": 1000,
"learning_rate": 2e-4, # 5e-4
"lr_scheduler_type": "cosine",
"num_warmup_steps": 750, # 2000
"gradient_accumulation_steps": 16, # 1
"max_train_steps": 50000, # 150000
"max_eval_steps": -1,
"seq_length": 1024,
"seed": 1,
"save_checkpoint_steps": 50000 # 15000
}
args = Namespace(**config)

优化器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
no_decay = ["bias", "LayerNorm.weight", "ln_1.weight", "ln_2.weight", "ln_f.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": args.weight_decay,
},
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]

optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=args.learning_rate)

# 设置学习率调度器(Learning Rate Scheduler)
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.max_train_steps,
)

损失函数与指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def evaluate(model, eval_dataloader, max_eval_steps=-1):
"""评估函数:计算验证集上的平均损失和困惑度"""
model.eval()
losses = []

with torch.no_grad():
for step, batch in enumerate(eval_dataloader):
# 将数据移动到设备上 (假设 batch 是一个字典,包含 input_ids 和 labels)
inputs = {k: v.to(device) for k, v in batch.items()}

# GPT-2 如果输入了 labels,会自动在内部计算并返回 loss
outputs = model(**inputs)
loss = outputs.loss
losses.append(loss.repeat(inputs["input_ids"].size(0)))

if max_eval_steps > 0 and step >= max_eval_steps:
break
try:
mean_loss = torch.cat(losses).mean().item()
perplexity = math.exp(mean_loss)
except OverflowError:
perplexity = float("inf")

model.train()
return mean_loss, perplexity

训练与检查点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
model.train()

progress_bar = tqdm(range(args.max_train_steps))
completed_steps = 0
total_loss = 0.0

# 使用自动混合精度(AMP)来加速训练并节省显存
scaler = torch.amp.GradScaler("cuda") if torch.cuda.is_available() else None

# 用无限循环来模拟流式或多 epoch 数据读取,直到达到 max_train_steps
while completed_steps < args.max_train_steps:
for step, batch in enumerate(train_dataloader):
inputs = {k: v.to(device) for k, v in batch.items()}

# 1. 前向传播(混合精度)
if scaler is not None:
with torch.amp.autocast("cuda"):
outputs = model(**inputs)
loss = outputs.loss
# 损失缩放(应对梯度累积)
loss = loss / args.gradient_accumulation_steps
scaler.scale(loss).backward()
else:
outputs = model(**inputs)
loss = outputs.loss / args.gradient_accumulation_steps
loss.backward()

total_loss += loss.item()

# 2. 梯度累积步检查
if (step + 1) % args.gradient_accumulation_steps == 0 or (step + 1) == len(train_dataloader):
# 梯度裁剪,防止因长序列导致梯度爆炸
if scaler is not None:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
else:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()

lr_scheduler.step()
optimizer.zero_grad()

# 更新全局训练步数
completed_steps += 1
progress_bar.update(1)
progress_bar.set_description(f"Steps: {completed_steps}/{args.max_train_steps} | Loss: {total_loss:.4f}")

# 3. 触发定时评估与检查点保存
if completed_steps % args.save_checkpoint_steps == 0:
# 评估
eval_loss, eval_ppl = evaluate(model, eval_dataloader, args.max_eval_steps)
print(f"\n[Step {completed_steps}] 验证集 Loss: {eval_loss:.4f} | 困惑度 (PPL): {eval_ppl:.2f}")

# 保存检查点(同时保存模型、权重和优化器状态,方便中断后恢复)
output_dir = f"checkpoint-{completed_steps}"
if not os.path.exists(output_dir):
os.makedirs(output_dir)

# Hugging Face 专属的安全保存方式
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

# 保存额外的训练状态
torch.save({
"optimizer_state_dict": optimizer.state_dict(),
"lr_scheduler_state_dict": lr_scheduler.state_dict(),
"args": args,
"completed_steps": completed_steps
}, os.path.join(output_dir, "training_state.pt"))

print(f"检查点已成功保存至 {output_dir}")

total_loss = 0.0 # 重置计数

if completed_steps >= args.max_train_steps:
break

有兴趣的话可以去研究下 minimind ,是一篇很好的入门学习教程。


总结

当你在实践中遇到问题时,不妨静下心来去看看底层原理,也许问题就自而然就解决方案了。


个人备注

此博客内容均为作者学习《Transformers》所做笔记,侵删!
若转作其他用途,请注明来源!