分布式训练:Data Parallel

什么是数据并行?
每个GPU加载完整模型,训练时每个GPU训练不同的数据

训练流程

数据并行的流程如下所示:

  • Step1 GPU0加载模型和Batch的数据
  • Step2 将Batch数据从GPU0分发至各卡
  • Step3 将模型从GPU0复制到各卡
  • Step4 各卡进行前向传播
  • Step5 GPU0收集各卡输出,并统一计算Loss
  • Step6 将Loss分发,在各卡反向传播并计算梯度
  • Step7 GPU0收集梯度并进行汇总
  • Step8 在GPU0上更新模型

pytorch的源码部分展示了这一过程,以下是nn.parallel.data_parallelDataParallel类的forward方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def forward(self, *inputs: Any, **kwargs: Any) -> Any:
with torch.autograd.profiler.record_function("DataParallel.forward"):
if not self.device_ids:
return self.module(*inputs, **kwargs)

for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
f"on device {self.src_device_obj} (device_ids[0]) but found one of "
f"them on device: {t.device}")

inputs, module_kwargs = self.scatter(inputs, kwargs, self.device_ids) # 将数据分发到不同GPU
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not module_kwargs:
inputs = ((),)
module_kwargs = ({},)

if len(self.device_ids) == 1:
return self.module(*inputs[0], **module_kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # 对模型进行复制
outputs = self.parallel_apply(replicas, inputs, module_kwargs) # 并行调用模型
return self.gather(outputs, self.output_device) # 拿到结果

parallel_apply方法中,使用了多线程的方法调用不同训练任务。

使用Pytorch进行数据并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from transformers import BertTokenizer, BertForSequenceClassification

import pandas as pd

data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
data = data.dropna()

from torch.utils.data import Dataset

# 创建Dataset
class MyDataset(Dataset):

def __init__(self) -> None:
super().__init__()
self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
self.data = self.data.dropna()

def __getitem__(self, index):
return self.data.iloc[index]["review"], self.data.iloc[index]["label"]

def __len__(self):
return len(self.data)

dataset = MyDataset()
for i in range(5):
print(dataset[i])

# 划分数据集

from torch.utils.data import random_split

trainset, validset = random_split(dataset, lengths=[0.9, 0.1])
len(trainset), len(validset)

# 创建DataLoader
import torch

tokenizer = BertTokenizer.from_pretrained("./model")

def collate_func(batch):
texts, labels = [], []
for item in batch:
texts.append(item[0])
labels.append(item[1])
inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
inputs["labels"] = torch.tensor(labels)
return inputs

from torch.utils.data import DataLoader

trainloader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)

前面准备工作完成之后,就到了模型创建的部分。这里只需要使用torch.nn.DataParallel来包装一下模型。这时如果需要查看模型本身的结构,就需要使用model.module进行查看。

1
2
3
4
5
6
7
8
9
10
# 创建模型及优化器
from torch.optim import Adam

model = BertForSequenceClassification.from_pretrained("./model")

if torch.cuda.is_available():
model = model.cuda()

model = torch.nn.DataParallel(model, device_ids=None) # 加载模型(单卡时则无需这一步)
optimizer = Adam(model.parameters(), lr=2e-5)

模型加载完后,就需要修改一下训练时的损失计算,因为每个卡都有一个损失,因此需要使用mean()方法来求一下均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 训练与验证
import time

def evaluate():
model.eval()
acc_num = 0
with torch.inference_mode():
for batch in validloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
output = model(**batch)
pred = torch.argmax(output.logits, dim=-1)
acc_num += (pred.long() == batch["labels"].long()).float().sum()
return acc_num / len(validset)

def train(epoch=3, log_step=100):
global_step = 0
for ep in range(epoch):
model.train()
start = time.time()
for batch in trainloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
optimizer.zero_grad()
output = model(**batch)
loss = output.loss.mean() # 多卡时需要计算损失的均值
# loss = output.loss 单卡时损失只有一个
loss.backward()
optimizer.step()
if global_step % log_step == 0:
print(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
global_step += 1
acc = evaluate()
print(f"ep: {ep}, acc: {acc}, time: {time.time() - start}")

train()

完成训练后就可以进行推理预测了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 模型预测

sen = "我觉得这家酒店不错,饭很好吃!"
id2_label = {0: "差评!", 1: "好评!"}
model.eval()
with torch.inference_mode():
inputs = tokenizer(sen, return_tensors="pt")
inputs = {k: v.cuda() for k, v in inputs.items()}
logits = model(**inputs).logits
pred = torch.argmax(logits, dim=-1)
print(f"输入:{sen}\n模型预测结果:{id2_label.get(pred.item())}")

from transformers import pipeline

model.config.id2label = id2_label
pipe = pipeline("text-classification", model=model, tokenizer=tokenizer, device=0)

pipe(sen)

使用这种方法,在Batch Size比较小的时候(32),无法和单卡运行拉开差距,当提高BS时,才能体现多卡的优势。

使用Trainer进行数据并行

在HuggingFace的Trainer类中,提供了较为便捷的数据并行的方法。在TrainingArguments类中,包含了一个私有属性_n_gpu,会自动识别主机的显卡数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
train_args = TrainingArguments(output_dir="./checkpoints",      # 输出文件夹
per_device_train_batch_size=64, # 训练时的batch_size
per_device_eval_batch_size=128, # 验证时的batch_size
logging_steps=10, # log 打印的频率
evaluation_strategy="epoch", # 评估策略
save_strategy="epoch", # 保存策略
save_total_limit=3, # 最大保存数
learning_rate=2e-5, # 学习率
weight_decay=0.01, # weight_decay
metric_for_best_model="f1", # 设定评估指标
load_best_model_at_end=True) # 训练完成后加载最优模型
train_args

# 创建Trainner
from transformers import DataCollatorWithPadding
trainer = Trainer(model=model,
args=train_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["test"],
data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
compute_metrics=eval_metric)

# 训练
trainer.train()

DP的优点

  • 并行化多个GPU上的训练,因此与累积梯度相比,它减少了训练时间。
  • 代码简单。

DP的不足

nn.DataParallel由于自身的不足,目前已经被淘汰了。淘汰的原因主要如下:

  • Data Parallel实际上是单进程,多线程的工作模式。然而由于GIL锁,无法发挥多线程在多卡的优势。
  • 训练策略问题导致主节点占用更高,GPU利用率不高。
  • 效率低,每次训练开始都需要同步模型,对于大模型,通信时间是一个问题
  • 只适用于单机训练,而非多机训练。

2024/3/6 于苏州