什么是数据并行? 每个GPU加载完整模型,训练时每个GPU训练不同的数据
训练流程 #
数据并行的流程如下所示:
- Step1 GPU0加载模型和Batch的数据
- Step2 将Batch数据从GPU0分发至各卡
- Step3 将模型从GPU0复制到各卡
- Step4 各卡进行前向传播
- Step5 GPU0收集各卡输出,并统一计算Loss
- Step6 将Loss分发,在各卡反向传播并计算梯度
- Step7 GPU0收集梯度并进行汇总
- Step8 在GPU0上更新模型
pytorch的源码部分展示了这一过程,以下是nn.parallel.data_parallel
中DataParallel
类的forward
方法:
def forward(self, *inputs: Any, **kwargs: Any) -> Any:
with torch.autograd.profiler.record_function("DataParallel.forward"):
if not self.device_ids:
return self.module(*inputs, **kwargs)
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
f"on device {self.src_device_obj} (device_ids[0]) but found one of "
f"them on device: {t.device}")
inputs, module_kwargs = self.scatter(inputs, kwargs, self.device_ids) # 将数据分发到不同GPU
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not module_kwargs:
inputs = ((),)
module_kwargs = ({},)
if len(self.device_ids) == 1:
return self.module(*inputs[0], **module_kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # 对模型进行复制
outputs = self.parallel_apply(replicas, inputs, module_kwargs) # 并行调用模型
return self.gather(outputs, self.output_device) # 拿到结果
在parallel_apply
方法中,使用了多线程的方法调用不同训练任务。
使用Pytorch进行数据并行 #
from transformers import BertTokenizer, BertForSequenceClassification
import pandas as pd
data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
data = data.dropna()
from torch.utils.data import Dataset
# 创建Dataset
class MyDataset(Dataset):
def __init__(self) -> None:
super().__init__()
self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
self.data = self.data.dropna()
def __getitem__(self, index):
return self.data.iloc[index]["review"], self.data.iloc[index]["label"]
def __len__(self):
return len(self.data)
dataset = MyDataset()
for i in range(5):
print(dataset[i])
# 划分数据集
from torch.utils.data import random_split
trainset, validset = random_split(dataset, lengths=[0.9, 0.1])
len(trainset), len(validset)
# 创建DataLoader
import torch
tokenizer = BertTokenizer.from_pretrained("./model")
def collate_func(batch):
texts, labels = [], []
for item in batch:
texts.append(item[0])
labels.append(item[1])
inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
inputs["labels"] = torch.tensor(labels)
return inputs
from torch.utils.data import DataLoader
trainloader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)
前面准备工作完成之后,就到了模型创建的部分。这里只需要使用torch.nn.DataParallel
来包装一下模型。这时如果需要查看模型本身的结构,就需要使用model.module
进行查看。
# 创建模型及优化器
from torch.optim import Adam
model = BertForSequenceClassification.from_pretrained("./model")
if torch.cuda.is_available():
model = model.cuda()
model = torch.nn.DataParallel(model, device_ids=None) # 加载模型(单卡时则无需这一步)
optimizer = Adam(model.parameters(), lr=2e-5)
模型加载完后,就需要修改一下训练时的损失计算,因为每个卡都有一个损失,因此需要使用mean()
方法来求一下均值。
# 训练与验证
import time
def evaluate():
model.eval()
acc_num = 0
with torch.inference_mode():
for batch in validloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
output = model(**batch)
pred = torch.argmax(output.logits, dim=-1)
acc_num += (pred.long() == batch["labels"].long()).float().sum()
return acc_num / len(validset)
def train(epoch=3, log_step=100):
global_step = 0
for ep in range(epoch):
model.train()
start = time.time()
for batch in trainloader:
if torch.cuda.is_available():
batch = {k: v.cuda() for k, v in batch.items()}
optimizer.zero_grad()
output = model(**batch)
loss = output.loss.mean() # 多卡时需要计算损失的均值
# loss = output.loss 单卡时损失只有一个
loss.backward()
optimizer.step()
if global_step % log_step == 0:
print(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
global_step += 1
acc = evaluate()
print(f"ep: {ep}, acc: {acc}, time: {time.time() - start}")
train()
完成训练后就可以进行推理预测了。
# 模型预测
sen = "我觉得这家酒店不错,饭很好吃!"
id2_label = {0: "差评!", 1: "好评!"}
model.eval()
with torch.inference_mode():
inputs = tokenizer(sen, return_tensors="pt")
inputs = {k: v.cuda() for k, v in inputs.items()}
logits = model(**inputs).logits
pred = torch.argmax(logits, dim=-1)
print(f"输入:{sen}\n模型预测结果:{id2_label.get(pred.item())}")
from transformers import pipeline
model.config.id2label = id2_label
pipe = pipeline("text-classification", model=model, tokenizer=tokenizer, device=0)
pipe(sen)
使用这种方法,在Batch Size比较小的时候(32),无法和单卡运行拉开差距,当提高BS时,才能体现多卡的优势。
使用Trainer进行数据并行 #
在HuggingFace的Trainer类中,提供了较为便捷的数据并行的方法。在TrainingArguments
类中,包含了一个私有属性_n_gpu
,会自动识别主机的显卡数量。
train_args = TrainingArguments(output_dir="./checkpoints", # 输出文件夹
per_device_train_batch_size=64, # 训练时的batch_size
per_device_eval_batch_size=128, # 验证时的batch_size
logging_steps=10, # log 打印的频率
evaluation_strategy="epoch", # 评估策略
save_strategy="epoch", # 保存策略
save_total_limit=3, # 最大保存数
learning_rate=2e-5, # 学习率
weight_decay=0.01, # weight_decay
metric_for_best_model="f1", # 设定评估指标
load_best_model_at_end=True) # 训练完成后加载最优模型
train_args
# 创建Trainner
from transformers import DataCollatorWithPadding
trainer = Trainer(model=model,
args=train_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["test"],
data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
compute_metrics=eval_metric)
# 训练
trainer.train()
DP的优点 #
- 并行化多个GPU上的训练,因此与累积梯度相比,它减少了训练时间。
- 代码简单。
DP的不足 #
nn.DataParallel
由于自身的不足,目前已经被淘汰了。淘汰的原因主要如下:
- Data Parallel实际上是单进程,多线程的工作模式。然而由于GIL锁,无法发挥多线程在多卡的优势。
- 训练策略问题导致主节点占用更高,GPU利用率不高。
- 效率低,每次训练开始都需要同步模型,对于大模型,通信时间是一个问题
- 只适用于单机训练,而非多机训练。
2024/3/6 于苏州