Avatar

分布式训练:Data Parallel

Mar 6, 2024 · 14min · comments

分布式训练DPDDP
分布式训练的介绍。

什么是数据并行? 每个GPU加载完整模型,训练时每个GPU训练不同的数据

训练流程

数据并行的流程如下所示:

  • Step1 GPU0加载模型和Batch的数据
  • Step2 将Batch数据从GPU0分发至各卡
  • Step3 将模型从GPU0复制到各卡
  • Step4 各卡进行前向传播
  • Step5 GPU0收集各卡输出,并统一计算Loss
  • Step6 将Loss分发,在各卡反向传播并计算梯度
  • Step7 GPU0收集梯度并进行汇总
  • Step8 在GPU0上更新模型

pytorch的源码部分展示了这一过程,以下是nn.parallel.data_parallelDataParallel类的forward方法:

def forward(self, *inputs: Any, **kwargs: Any) -> Any:
    with torch.autograd.profiler.record_function("DataParallel.forward"):
        if not self.device_ids:
            return self.module(*inputs, **kwargs)

        for t in chain(self.module.parameters(), self.module.buffers()):
            if t.device != self.src_device_obj:
                raise RuntimeError("module must have its parameters and buffers "
                                   f"on device {self.src_device_obj} (device_ids[0]) but found one of "
                                   f"them on device: {t.device}")

        inputs, module_kwargs = self.scatter(inputs, kwargs, self.device_ids) # 将数据分发到不同GPU
        # for forward function without any inputs, empty list and dict will be created
        # so the module can be executed on one device which is the first one in device_ids
        if not inputs and not module_kwargs:
            inputs = ((),)
            module_kwargs = ({},)

        if len(self.device_ids) == 1:
            return self.module(*inputs[0], **module_kwargs[0])
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # 对模型进行复制
        outputs = self.parallel_apply(replicas, inputs, module_kwargs) # 并行调用模型
        return self.gather(outputs, self.output_device) # 拿到结果

parallel_apply方法中,使用了多线程的方法调用不同训练任务。

使用Pytorch进行数据并行

from transformers import BertTokenizer, BertForSequenceClassification

import pandas as pd

data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
data = data.dropna()

from torch.utils.data import Dataset

# 创建Dataset
class MyDataset(Dataset):

    def __init__(self) -> None:
        super().__init__()
        self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")
        self.data = self.data.dropna()

    def __getitem__(self, index):
        return self.data.iloc[index]["review"], self.data.iloc[index]["label"]
    
    def __len__(self):
        return len(self.data)

dataset = MyDataset()
for i in range(5):
    print(dataset[i])
    
# 划分数据集

from torch.utils.data import random_split

trainset, validset = random_split(dataset, lengths=[0.9, 0.1])
len(trainset), len(validset)

# 创建DataLoader
import torch

tokenizer = BertTokenizer.from_pretrained("./model")

def collate_func(batch):
    texts, labels = [], []
    for item in batch:
        texts.append(item[0])
        labels.append(item[1])
    inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")
    inputs["labels"] = torch.tensor(labels)
    return inputs
    
from torch.utils.data import DataLoader

trainloader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=collate_func)
validloader = DataLoader(validset, batch_size=64, shuffle=False, collate_fn=collate_func)

前面准备工作完成之后,就到了模型创建的部分。这里只需要使用torch.nn.DataParallel来包装一下模型。这时如果需要查看模型本身的结构,就需要使用model.module进行查看。

# 创建模型及优化器
from torch.optim import Adam

model = BertForSequenceClassification.from_pretrained("./model")

if torch.cuda.is_available():
    model = model.cuda()
    
model = torch.nn.DataParallel(model, device_ids=None) # 加载模型(单卡时则无需这一步)
optimizer = Adam(model.parameters(), lr=2e-5)

模型加载完后,就需要修改一下训练时的损失计算,因为每个卡都有一个损失,因此需要使用mean()方法来求一下均值。

# 训练与验证
import time

def evaluate():
    model.eval()
    acc_num = 0
    with torch.inference_mode():
        for batch in validloader:
            if torch.cuda.is_available():
                batch = {k: v.cuda() for k, v in batch.items()}
            output = model(**batch)
            pred = torch.argmax(output.logits, dim=-1)
            acc_num += (pred.long() == batch["labels"].long()).float().sum()
    return acc_num / len(validset)

def train(epoch=3, log_step=100):
    global_step = 0
    for ep in range(epoch):
        model.train()
        start = time.time()
        for batch in trainloader:
            if torch.cuda.is_available():
                batch = {k: v.cuda() for k, v in batch.items()}
            optimizer.zero_grad()
            output = model(**batch)
            loss = output.loss.mean() # 多卡时需要计算损失的均值
            # loss = output.loss 单卡时损失只有一个
            loss.backward()
            optimizer.step()
            if global_step % log_step == 0:
                print(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")
            global_step += 1
        acc = evaluate()
        print(f"ep: {ep}, acc: {acc}, time: {time.time() - start}")
        
train()

完成训练后就可以进行推理预测了。

# 模型预测

sen = "我觉得这家酒店不错,饭很好吃!"
id2_label = {0: "差评!", 1: "好评!"}
model.eval()
with torch.inference_mode():
    inputs = tokenizer(sen, return_tensors="pt")
    inputs = {k: v.cuda() for k, v in inputs.items()}
    logits = model(**inputs).logits
    pred = torch.argmax(logits, dim=-1)
    print(f"输入:{sen}\n模型预测结果:{id2_label.get(pred.item())}")

from transformers import pipeline

model.config.id2label = id2_label
pipe = pipeline("text-classification", model=model, tokenizer=tokenizer, device=0)

pipe(sen)

使用这种方法,在Batch Size比较小的时候(32),无法和单卡运行拉开差距,当提高BS时,才能体现多卡的优势。

使用Trainer进行数据并行

在HuggingFace的Trainer类中,提供了较为便捷的数据并行的方法。在TrainingArguments类中,包含了一个私有属性_n_gpu,会自动识别主机的显卡数量。

train_args = TrainingArguments(output_dir="./checkpoints",      # 输出文件夹
                               per_device_train_batch_size=64,  # 训练时的batch_size
                               per_device_eval_batch_size=128,  # 验证时的batch_size
                               logging_steps=10,                # log 打印的频率
                               evaluation_strategy="epoch",     # 评估策略
                               save_strategy="epoch",           # 保存策略
                               save_total_limit=3,              # 最大保存数
                               learning_rate=2e-5,              # 学习率
                               weight_decay=0.01,               # weight_decay
                               metric_for_best_model="f1",      # 设定评估指标
                               load_best_model_at_end=True)     # 训练完成后加载最优模型
train_args

# 创建Trainner
from transformers import DataCollatorWithPadding
trainer = Trainer(model=model, 
                  args=train_args, 
                  train_dataset=tokenized_datasets["train"], 
                  eval_dataset=tokenized_datasets["test"], 
                  data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
                  compute_metrics=eval_metric)
                  
# 训练
trainer.train()

DP的优点

  • 并行化多个GPU上的训练,因此与累积梯度相比,它减少了训练时间。
  • 代码简单。

DP的不足

nn.DataParallel由于自身的不足,目前已经被淘汰了。淘汰的原因主要如下:

  • Data Parallel实际上是单进程,多线程的工作模式。然而由于GIL锁,无法发挥多线程在多卡的优势。
  • 训练策略问题导致主节点占用更高,GPU利用率不高。
  • 效率低,每次训练开始都需要同步模型,对于大模型,通信时间是一个问题
  • 只适用于单机训练,而非多机训练。

2024/3/6 于苏州

Comments
CC BY-NC-SA 4.0 2020-PRESENT © zerolovesea