前一篇博客讲了已经被淘汰的数据并行,这次学习一下目前用的比较多的分布式数据并行(Distributed Data Parallel)。对比DP,DDP能够使用于单机多卡和多机多卡,并且对GPU的利用率更佳。
和DP有什么不同?
使用torch.distributed
,编写一份训练代码,torch会将代码分配给每个进程。此时就没有主GPU的区别,每个GPU都执行相同的工作。此外,每个GPU加载自己的数据,并且对比DP,反向传播这个过程是在每个GPU上实现的,而不是汇集到主GPU上执行。
训练流程 分布式数据并行的流程如下所示:
Step 1 使用多进程,每个进程加载模型和数据
Step2 各进程前向传播,得到输出
Step3 各进程计算Loss,反向传播并得到梯度
Step4 各进程通信,梯度在各卡进行同步
Step5 各进程更新模型
分布式训练中的基本概念 分布式训练中包含了几个基本的概念:
group
: 进程组。一个分布式任务对应一个进程组。一般所有的显卡都在一个组里。一个 任务即为一个组,也即一个 world。
world_size
:全局并行数,一般是总卡数。
node
:节点。一般是一台机器,或是一个容器。里面会包含多个GPU。
rank(global_rank)
:整个分布式训练任务内的进程序号,一般rank
为0指的是主进程。
local_rank
:区别于rank
,是每个节点内部的相对的进程的序号。可以理解为进程内的GPU 编号,例如 rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。
例如下图是一个示例:2机4卡的分布式训练。此时node
为2,world_size
为4。
分布式训练中的通信 分布式训练中,不同节点一般都需要进行信息交换,这就叫做通信。通信被分为两个大类:点对点通信就是将数据从一个进程传输到另一个进程。
集合通信则是指一个分组内所有进程的通信,也就是多卡之间的通信。包含了六种通信类型:
Scatter
: 分发。将主进程上Rank 0 上的数据平均分发给其他Rank。
Gather
: 与Scatter
相反,将子进程的数据汇集在主进程。
Reduce
: 将子进程上的数据合并后,进行某种计算(加减乘除,平均等等)后传到主进程。
All Reduce
: 将多个进程的信息先汇总并处理/计算后,在将结果发送回每个进程。
Broadcast
: 将Rank 0上的完整数据广播到各个Rank。
All Gather
: 将所有进程上的数据汇总(不计算)后,分发到每个进程。这样每个进程都会有一样的完整数据。
Pytorch单机多卡代码实现 使用分布式数据并行需要用python文件进行执行,运行文件也需要使用torchrun
的方式来执行。例如下面这个示例代码,需要在命令行运行torchrun --nproc_per_node=2 ddp.py
来运行,这代表着使用两个卡来进行训练。nproc_per_node
参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量。
还有一些需要注意的参数:
首先需要进行初始化和导入依赖项:
1 2 3 4 from transformers import BertTokenizer, BertForSequenceClassificationimport torch.distributed as dist dist.init_process_group(backend="nccl" )
参数backend="nccl"
指定了使用NCCL(Nvidia Collective Communications Library)作为后端来实现分布式处理。
接下来是导入数据和准备Dataset等前置工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import pandas as pd data = pd.read_csv("./ChnSentiCorp_htl_all.csv" ) data = data.dropna()from torch.utils.data import Datasetclass MyDataset (Dataset ): def __init__ (self ) -> None : super ().__init__() self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv" ) self.data = self.data.dropna() def __getitem__ (self, index ): return self.data.iloc[index]["review" ], self.data.iloc[index]["label" ] def __len__ (self ): return len (self.data) dataset = MyDataset()import torchfrom torch.utils.data import random_split trainset, validset = random_split(dataset, lengths=[0.9 , 0.1 ], generator=torch.Generator().manual_seed(42 )) len (trainset), len (validset)
在准备一下Tokenizer和DataLoader。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 tokenizer = BertTokenizer.from_pretrained("/gemini/code/model" )def collate_func (batch ): texts, labels = [], [] for item in batch: texts.append(item[0 ]) labels.append(item[1 ]) inputs = tokenizer(texts, max_length=128 , padding="max_length" , truncation=True , return_tensors="pt" ) inputs["labels" ] = torch.tensor(labels) return inputsfrom torch.utils.data import DataLoaderfrom torch.utils.data.distributed import DistributedSampler trainloader = DataLoader(trainset, batch_size=32 , collate_fn=collate_func, sampler=DistributedSampler(trainset)) validloader = DataLoader(validset, batch_size=64 , collate_fn=collate_func, sampler=DistributedSampler(validset))
这里的DistributedSampler
能够将不同进程上的数据进行分配,并且不会出现重复。对比之前,DataLoader
里少了shuffle
这个参数,取而代之的是sampler
。
接下来是设置模型,我们需要将模型传到各自的GPU上。为了获取当前机器的GPU参数,我们需要从环境变量导入一下参数。
随后用DDP
包装一下模型。这样模型就准备好了。
1 2 3 4 5 6 7 8 9 10 11 from torch.optim import Adamimport osfrom torch.nn.parallel import DistributedDataParallel as DDP model = BertForSequenceClassification.from_pretrained("/gemini/code/model" )if torch.cuda.is_available(): model = model.to(int (os.environ["LOCAL_RANK" ])) model = DDP(model) optimizer = Adam(model.parameters(), lr=2e-5 )
模型准备完,就可以准备训练了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def print_rank_0 (info ): if int (os.environ["RANK" ]) == 0 : print (info)def evaluate (): model.eval () acc_num = 0 with torch.inference_mode(): for batch in validloader: if torch.cuda.is_available(): batch = {k: v.to(int (os.environ["LOCAL_RANK" ])) for k, v in batch.items()} output = model(**batch) pred = torch.argmax(output.logits, dim=-1 ) acc_num += (pred.long() == batch["labels" ].long()).float ().sum () dist.all_reduce(acc_num, op=dist.ReduceOP.SUM) return acc_num / len (validset) def train (epoch=3 , log_step=100 ): global_step = 0 for ep in range (epoch): model.train() trainloader.sampler.set_epoch(ep) for batch in trainloader: if torch.cuda.is_available(): batch = {k: v.to(int (os.environ["LOCAL_RANK" ])) for k, v in batch.items()} optimizer.zero_grad() output = model(**batch) loss = output.loss loss.backward() optimizer.step() if global_step % log_step == 0 : dist.all_reduce(loss, op=dist.ReduceOp.AVG) print_rank_0(f"ep: {ep} , global_step: {global_step} , loss: {loss.item()} " ) global_step += 1 acc = evaluate() print_rank_0(f"ep: {ep} , acc: {acc} " ) train()
Trainer单机多卡代码实现 HuggingFace的Trainer代码中同样封装了分布式数据并行,下面是示例代码。运行时也是一样的使用torchrun --nproc_per_node=2 ddp_trainer.py
。可以看到其实并没有在代码层面进行修改太多,这是因为Trainer类本身已经内置了分布式训练的判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, BertTokenizer, BertForSequenceClassificationfrom datasets import load_dataset dataset = load_dataset("csv" , data_files="./ChnSentiCorp_htl_all.csv" , split="train" ) dataset = dataset.filter (lambda x: x["review" ] is not None ) dataset datasets = dataset.train_test_split(test_size=0.1 , seed=42 ) datasetsimport torch tokenizer = BertTokenizer.from_pretrained("/gemini/code/model" )def process_function (examples ): tokenized_examples = tokenizer(examples["review" ], max_length=128 , truncation=True ) tokenized_examples["labels" ] = examples["label" ] return tokenized_examples tokenized_datasets = datasets.map (process_function, batched=True , remove_columns=datasets["train" ].column_names) tokenized_datasets model = BertForSequenceClassification.from_pretrained("/gemini/code/model" ) model.configimport evaluate acc_metric = evaluate.load("./metric_accuracy.py" ) f1_metirc = evaluate.load("./metric_f1.py" )def eval_metric (eval_predict ): predictions, labels = eval_predict predictions = predictions.argmax(axis=-1 ) acc = acc_metric.compute(predictions=predictions, references=labels) f1 = f1_metirc.compute(predictions=predictions, references=labels) acc.update(f1) return acc train_args = TrainingArguments(output_dir="./checkpoints" , per_device_train_batch_size=32 , per_device_eval_batch_size=128 , logging_steps=10 , evaluation_strategy="epoch" , save_strategy="epoch" , save_total_limit=3 , learning_rate=2e-5 , weight_decay=0.01 , metric_for_best_model="f1" , load_best_model_at_end=True ) from transformers import DataCollatorWithPadding trainer = Trainer(model=model, args=train_args, train_dataset=tokenized_datasets["train" ], eval_dataset=tokenized_datasets["test" ], data_collator=DataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=eval_metric) trainer.train()
Pytorch多机多卡代码实现 上面的代码是在单机多卡的环境下实现并行训练,只需要在--nproc-per-node
这个参数设置任务的并行数量。在多机环境下就不一样了,我们需要解决多机的通信问题。
我们需要指定一台机器作为主节点,这个设置由参数MASTER_ADDR
决定。
这里我放上两段代码,一个是Pytorch的官方教程,使用了torchrun
执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import torchimport torch.distributed as distimport torch.nn as nnimport torch.optim as optimfrom torch.nn.parallel import DistributedDataParallel as DDPclass ToyModel (nn.Module): def __init__ (self ): super (ToyModel, self).__init__() self.net1 = nn.Linear(10 , 10 ) self.relu = nn.ReLU() self.net2 = nn.Linear(10 , 5 ) def forward (self, x ): return self.net2(self.relu(self.net1(x)))
接下来是重点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def demo_basic (): dist.init_process_group("nccl" ) rank = dist.get_rank() print (f"Start running basic DDP example on rank {rank} ." ) device_id = rank % torch.cuda.device_count() model = ToyModel().to(device_id) ddp_model = DDP(model, device_ids=[device_id]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001 ) optimizer.zero_grad() outputs = ddp_model(torch.randn(20 , 10 )) labels = torch.randn(20 , 5 ).to(device_id) loss_fn(outputs, labels).backward() optimizer.step() dist.destroy_process_group()if __name__ == "__main__" : demo_basic()
在执行时,需要每个节点上都在命令行执行同样的命令,下述代码是一个例子,代表在两个机器(节点)上训练,每个机器各8个进程(GPU),共计16张GPU。
1 2 3 export MASTER_ADDR=localhost torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR :29400 elastic_ddp.py
执行完成后即可开始训练。
接下来是知乎上找的代码,也记录一下:
首先构建模型:
1 2 3 4 5 6 7 8 9 class ToyModel (nn.Module): def __init__ (self ): super (ToyModel, self).__init__() self.net1 = nn.Linear(10 , 10 ) self.relu = nn.ReLU() self.net2 = nn.Linear(10 , 5 ) def forward (self, x ): return self.net2(self.relu(self.net1(x)))
随后进入训练流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def train (): local_rank = int (os.environ["LOCAL_RANK" ]) rank = int (os.environ["RANK" ]) print (f"[{os.getpid()} ] (rank = {rank} , local_rank = {local_rank} ) training..." ) model = ToyModel().cuda(local_rank) ddp_model = DDP(model, [local_rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001 ) optimizer.zero_grad() outputs = ddp_model(torch.randn(20 , 10 ).to(local_rank)) labels = torch.randn(20 , 5 ).to(local_rank) loss = loss_fn(outputs, labels) loss.backward() optimizer.step() print (f"[{os.getpid()} ] (rank = {rank} , local_rank = {local_rank} ) loss = {loss.item()} \n" )
接下来是调用部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 def run (): env_dict = { key: os.environ[key] for key in ("MASTER_ADDR" , "MASTER_PORT" , "WORLD_SIZE" , "LOCAL_WORLD_SIZE" ) } print (f"[{os.getpid()} ] Initializing process group with: {env_dict} " ) dist.init_process_group(backend="nccl" ) train() dist.destroy_process_group()if __name__ == "__main__" : run()
在主节点上执行如下脚本:
--nproc_per_node=4
: 表示在一个node上启动4个process
--nnodes=2
:表示一共有2个node进行分布式训练
--node_rank=0
:当前node的id为0
--master_addr="192.0.0.1
“:主节点的地址
--master_port=1234
:主节点的port
trian_multi_node.py
:训练代码
1 2 3 4 5 6 torchrun --nproc_per_node=4 \ --nnodes=2 \ --node_rank=0 \ --master_addr="192.0.0.1" \ --master_port=1234 \ trian_multi_node.py
在子节点上执行如下脚本,唯一的区别是–node_rank设置为1:
1 2 3 4 5 6 torchrun --nproc_per_node=4 \ --nnodes=2 \ --node_rank=1 \ --master_addr="192.0.0.1" \ --master_port=1234 \ trian_multi_node.py
运行结果如下:
主节点的执行结果:
2~4行:node0上四个进程的显示的全局信息
6~9行:node0上四个进程准备开始训练
10~13行:node0上四个进程完成训练,并输出loss信息
1 2 3 4 5 6 7 8 9 10 11 12 13 /workspace/DDP# sh run_node0.sh [594] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [595] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [593] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [592] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [593] (rank = 1, local_rank = 1) training... [595] (rank = 3, local_rank = 3) training... [592] (rank = 0, local_rank = 0) training... [594] (rank = 2, local_rank = 2) training... [595] (rank = 3, local_rank = 3) loss = 1.12112295627594 [592] (rank = 0, local_rank = 0) loss = 1.5381203889846802 [593] (rank = 1, local_rank = 1) loss = 1.1606591939926147 [594] (rank = 2, local_rank = 2) loss = 0.973732590675354
子节点的执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 /workspace/DDP# sh run_node1.sh [292] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [294] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [293] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [295] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '1234', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'} [295] (rank = 7, local_rank = 3) training... [292] (rank = 4, local_rank = 0) training... [294] (rank = 6, local_rank = 2) training... [293] (rank = 5, local_rank = 1) training... [292] (rank = 4, local_rank = 0) loss = 1.3587342500686646 [294] (rank = 6, local_rank = 2) loss = 1.0895851850509644 [295] (rank = 7, local_rank = 3) loss = 1.1472846269607544 [293] (rank = 5, local_rank = 1) loss = 1.1993836164474487
实现细节 分布式数据并行在实现中需要注意几个细节:
数据内一定要注意数据划分的一致,不然其他进程会用验证集去训练。因此需要设置随机种子。
需要用分布式采样器。
查看全局信息时需要记住使用通信,否则只能看到某一个进程内的信息。
将数据放置在设备上时需要注意使用正确的device_id
,通常会使用local_rank
来指定。
2024/3/7 于苏州