首页IT科技pytorch多核cpu(PyTorch使用多GPU并行训练及其原理和注意事项)

pytorch多核cpu(PyTorch使用多GPU并行训练及其原理和注意事项)

时间2025-06-20 07:12:12分类IT科技浏览4354
导读:1. 常见的多GPU使用方法 模型并行(model parallel) -> 大型网络(对速度没有提升)...

1. 常见的多GPU使用方法

模型并行(model parallel) -> 大型网络(对速度没有提升)

当模型需要的显卡很大            ,一张GPU的显存放不下时                    ,使用这种方式就可以训练一个大型的网络 数据并行(data parallel)-> 加速训练速度

可以将整个模型放到一张GPU时      ,我们可以将每一个模型放到每一个GPU上         ,让它们同时进行训练(正向传播+反向传播)

2. 训练速度与GPU数量之间的关系

性能实测:数据来源霹雳巴拉WZ

PyTorch 1.7 CUDA:10.1 Model:ResNet-34 Dataset:flower_photos(非常小的一个数据集) BatchSize:16 Optimizer:SGD GPU:Tesla V100(上一代卡皇)

从上图我们可以看到                    ,训练速度和GPU数量并不是简单的倍乘关系             。随着GPU数量的增加         ,提速效果越来越差      ,这时因为不同GPU之间需要进行通信                    ,会有性能的损耗                  。

3. 重点

数据集如何在不同设备间分配 误差梯度如何在不同设备之间通信 Batch Normalization如何在不同设备之间同步

3.1 数据集分配

3.2 误差通信

在使用多GPU训练时            ,每张GPU在进行完一个step后会产生梯度   ,我们需要将所有GPU的梯度求平均(并不是每张GPU各自学各自的                    ,那样就没有意义了)                ,这样才能将每张GPU的训练结果结合在一起       。

3.3 BN同步

假设BS=2,feature1和feature2是网络中的某一层经过卷积操作后得到特征图                ,因为BS=2                    ,所以有两个feature          。在正向传播过程中   ,BN会求特征矩阵每一个channel的均值和方差            ,再对每个通道上的数据进行“减均值除标准差            ”的操作                    ,这样就得到经过BN的特征图了                  。

当我们使用多GPU训练时      ,每个GPU都会计算各自的均值和方差          。这里我们同样假设每个GPU上数据的BS=2         ,那么每个BN层求的均值

μ

i

\mu_i

μi和方差

σ

i

2

\sigma^2_i

σi2
都是针对两个特征图而言的       。

之前我们说过                    ,BS越大         ,BN求的均值和方差越接近全体样本      ,准确率越高                  。

所以如果我们使用多GPU训练                    ,我们就应该考虑“我们是不是应该去求每一个BN层在所有设备上的均值和方差                    ”             。这样我们所求得的均值和方差是更加有意义的    。

如果我们不考虑多GPU之间BN的参数关系            ,那么我们所求得的BN层的均值和方差都是针对输入的两个样本(BS=2)求解的                   。

如果我们考虑到另外的设备呢?

GPU1的BN层得到两个特征图1和2   ,GPU2的BN层得到两个特征图3和4                。如果我们求的BN参数是特征图12+34                    ,那么我们的BN就变相等于在BS=4的情况下求得的均值和方差。这样对我们最终训练的结果是有一定帮助的                。

霹雳巴拉WZ说                ,如果不使用同步的BN(即普通的nn.BatchNorm),那么得到的结果和使用单GPU的结果基本上是一致的                   。

当然                ,使用不同BN的多GPU也对模型的训练速度有很大的帮助

如果使用了同步的BN后                    ,最终结果一般会有将近一个点的提升

所有同步的BN确实的有一定的作用

如果你的GPU显存很大   ,本来在一张GPU上就可以很大的BS            ,那么使用同步的BN也不会有很大的作用    。

同步BN主要用在:网络比较大                    ,一张GPU它的BS不能设置很大的情况下      ,同步的BN对准确率的帮助比较大             。

注意:使用了具有同步BN的方法         ,多GPU的并行速度会下降                  。可能会降低30%的速度       。

想要更快的速度 -> 不使用同步的BN 更高的精度 -> 使用同步的BN

4. PyTorch实现多卡并行计算的方式

分为两种:

DataParallel

PyTorch官方很久之前给出的一种方案 DistributedDataParallel

更新一代的多卡训练方法

DDP不仅仅局限于单机多卡的情况                    ,还适用于多级多卡的场景          。

PyTorch关于DP的文档 [PyTorch关于DDP的文档](DistributedDataParallel — PyTorch 1.11.0 documentation) DP是一个单进程            、多线程并且仅仅只能工作在单一的设备中(单节点         ,不适用于多机的情况) DDP是一个多进程的      ,可以工作在单机或者多机的场景中 DP通常要慢于DDP(即便在单一的设备上)

这里所说的单机和多机并不是单GPU和多GPU                    ,而是单个服务器和多个服务器的意思

DP和DDP都可以用在单机的情况下            ,单DDP可以用在多机的情况下   ,而且即便在单机下(一台机器有多个GPU)                    ,DDP的速度也要比DP的速度快                  。

[PyTorch关于单机多卡和多级多卡的训练教程](PyTorch Distributed Overview — PyTorch Tutorials 1.11.0+cu102 documentation)

5. PyTorch中使用多GPU训练的常用启动方式

torch.distributed.launch:代码量少                ,启动速度快

python -m torch.distributed.launch # -m: run library module as a script # --help: 可以通过`torch.distributed.launch --help`这样真的方式去查看使用方法

在PyTorch官方实现的Faster R-CNN源码中,多GPU训练就是使用distributed.launch进行启动的                ,因此后面将的主要基于distributed.launch来启动

torch.multiprocessing:代码量更多                    ,但拥有更好的控制和灵活性

注意事项:

在使用torch.distributed.launch的方法进行训练          。一旦训练开始后   ,手动强制终止训练程序(ctrl + c)            ,会有小概率出现进程没有杀掉的情况       。

此时程序还会占用GPU的显存以及资源                  。所以需要我们将这些进程kill -9掉

5.1 单机单卡的训练脚本

5.1.1 main

""" 单机单卡的训练脚本 —— 训练ResNet34/101 """ import os import math import argparse import torch import torch.optim as optim from torch.utils.tensorboard import SummaryWriter from torchvision import transforms import torch.optim.lr_scheduler as lr_scheduler from model import resnet34, resnet101 from my_dataset import MyDataSet from utils import read_split_data from multi_train_utils.train_eval_utils import train_one_epoch, evaluate def main(args): # 检查机器的配置(是否有GPU                    ,没有GPU则为CPU) device = torch.device(args.device if torch.cuda.is_available() else "cpu") print(args) # 打印传入的参数 print(Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/) tb_writer = SummaryWriter() # 创建Tensorborad对象 if os.path.exists("./weights") is False: # 检查保存权值文件的文件夹是否存在      ,不存在则创建该文件夹 os.makedirs("./weights") train_info, val_info, num_classes = read_split_data(args.data_path) train_images_path, train_images_label = train_info val_images_path, val_images_label = val_info # check num_classes assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes, num_classes) data_transform = { "train": transforms.Compose([transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]), "val": transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])} # 实例化训练数据集 train_data_set = MyDataSet(images_path=train_images_path, images_class=train_images_label, transform=data_transform["train"]) # 实例化验证数据集 val_data_set = MyDataSet(images_path=val_images_path, images_class=val_images_label, transform=data_transform["val"]) batch_size = args.batch_size # 根据BS的数量和训练设备CPU核心数来定义num_worker的大小 nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers print(Using {} dataloader workers every process.format(nw)) # 读取训练数据 train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=nw, collate_fn=train_data_set.collate_fn) # 读取验证数据 val_loader = torch.utils.data.DataLoader(val_data_set, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=nw, collate_fn=val_data_set.collate_fn) # 定义模型对象并添加到所属设备中 model = resnet34(num_classes=args.num_classes).to(device) # 如果存在预训练权重则载入 if args.weights != "": if os.path.exists(args.weights): # 先使用torch.load加载指定文件中的权重 weights_dict = torch.load(args.weights, map_location=device) # 只加载key和value元素相等的键值对 load_weights_dict = {k: v for k, v in weights_dict.items() if model.state_dict()[k].numel() == v.numel()} # 1. 模型加载字典(不严格加载);2. 打印 print(model.load_state_dict(load_weights_dict, strict=False)) else: raise FileNotFoundError("not found weights file: {}".format(args.weights)) # 是否冻结权重 if args.freeze_layers: for name, para in model.named_parameters(): # name: 层的名字; para: 对应的参数 # 除最后的全连接层外         ,其他权重全部冻结 if "fc" not in name: # 除了fc层外                    ,所有层的参数都没有梯度(不进行反向传播         ,即不进行参数更新) para.requires_grad_(False) # 将带有梯度的参数传入pg这个list中 pg = [p for p in model.parameters() if p.requires_grad] # 定义参数优化器      ,第一个参数即为需要更新的参数                    ,也就是上一行pg列表中的结果 optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005) # 定义学习率变化函数            ,参考:Scheduler https://arxiv.org/pdf/1812.01187.pdf -> 其实就是一个余弦函数[0, pi] lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine # scheduler即为调整学习率变化的对象   ,将优化器和学习率变化曲线传给它就可以实现学习率的规律性变化 scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf) # 开始迭代训练 for epoch in range(args.epochs): # train """ 可以看到                    ,这里的train阶段只是返回平均loss                ,没有预测概率 """ mean_loss = train_one_epoch(model=model, optimizer=optimizer, data_loader=train_loader, device=device, epoch=epoch) # epoch进行了一个,学习率变化器需要更新一下(使得optimizer中的学习率进行变化) scheduler.step() # validate sum_num = evaluate(model=model, data_loader=val_loader, device=device) acc = sum_num / len(val_data_set) # top-1准确率 = 预测正确数 / 验证样本数 # 打印该epoch下的准确率 print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3))) # 将train和validation得到的数据添加到tensorboard中 # optimizer.param_groups[0]["lr"]即为对应epoch的学习率 tags = ["loss", "accuracy", "learning_rate"] tb_writer.add_scalar(tags[0], mean_loss, epoch) tb_writer.add_scalar(tags[1], acc, epoch) tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch) # 保存对应epoch的模型(这里没有选择最佳模型                ,可以通过acc达到只保留准确率高的权值文件) torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch)) if __name__ == __main__: parser = argparse.ArgumentParser() parser.add_argument(--num_classes, type=int, default=5) parser.add_argument(--epochs, type=int, default=30) parser.add_argument(--batch-size, type=int, default=16) parser.add_argument(--lr, type=float, default=0.001) # lrf为倍率因子                    ,即学习率最终降低到初始学习率lr的多少倍             。 # 最终学习率为lr * lrf parser.add_argument(--lrf, type=float, default=0.1) # 数据集所在根目录 # http://download.tensorflow.org/example_images/flower_photos.tgz parser.add_argument(--data-path, type=str, default="/home/w180662/my_project/my_github/data_set/flower_data/flower_photos") # resnet34 官方权重下载地址 # https://download.pytorch.org/models/resnet34-333f7ec4.pth parser.add_argument(--weights, type=str, default=resNet34.pth, help=initial weights path) # 为""表示不使用预训练模型 parser.add_argument(--freeze-layers, type=bool, default=False) parser.add_argument(--device, default=cuda, help=device id (i.e. 0 or 0,1 or cpu)) opt = parser.parse_args() main(opt)

5.1.2 train_eval_utils

import sys from tqdm import tqdm import torch from multi_train_utils.distributed_utils import reduce_value, is_main_process def train_one_epoch(model, optimizer, data_loader, device, epoch): model.train() # 声明模型的状态 loss_function = torch.nn.CrossEntropyLoss() # 定义损失函数 mean_loss = torch.zeros(1).to(device) # 生成shape为[1, ]的全零矩阵   ,并添加到对应的设备中 -> 用于存储后续计算得到mean_loss # 清空优化器中的梯度残留 optimizer.zero_grad() # 在进程0中打印训练进度 if is_main_process(): # 判断是否为主进程 # 使用tqdm库包装一下data_loader这个变量            ,一会儿在遍历data_loader时就会打印一个进度条 data_loader = tqdm(data_loader, file=sys.stdout) # 迭代data_loader                    ,获取step和对应的数据 """ 在data_loader中就定义了BS的大小, 所以这里step就是$step = 所有图片的数量 / BS$ step即为BS的数量, 每走过一次step即为处理了BS张图片 第1个step对应第1个bs 第2个step对应第2个bs 第3个step对应第3个bs ... """ for step, data in enumerate(data_loader): images, labels = data # data包含两部分数据:1. 预处理后的图片;2. 对应的真实标签 # 通过正向传播获取图片的预测结果 pred = model(images.to(device)) # 根据预测结果与真实标签计算loss loss = loss_function(pred, labels.to(device)) # 对loss进行反向传播 loss.backward() # 对loss进行求平均处理 """ reduce_value: def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: # 单GPU的情况 return value # 原值返回 with torch.no_grad(): dist.all_reduce(value) if average: value /= world_size # 需要除以GPU数量后再返回 return value """ loss = reduce_value(loss, average=True) # 对历史损失求平均 mean_loss = (mean_loss * step + loss.detach()) / (step + 1) # update mean losses # 在进程0中打印平均loss if is_main_process(): # 判断是否为主进程 data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3)) # 判断loss是否为有限数据(不能是infty) if not torch.isfinite(loss): print(WARNING: non-finite loss, ending training , loss) sys.exit(1) # 如果loss为无穷      ,则退出训练 optimizer.step() # 参数优化器进行参数更新 optimizer.zero_grad() # 参数优化器更新完参数后         ,需要将梯度清空 # 等待所有进程计算完毕 if device != torch.device("cpu"): torch.cuda.synchronize(device) # 等待CUDA设备上所有流中的所有内核完成    。 # 返回计算求得的平均loss return mean_loss.item() """ 这里使用了@torch.no_grad()这个装饰器对该方法进行修改                    ,也可以使用 with torch.no_grad: 这个上下文管理器 """ @torch.no_grad() def evaluate(model, data_loader, device): model.eval() # 声明模型状态 -> 1. 关闭BN; 2. Dropout # 用于存储预测正确的样本个数 sum_num = torch.zeros(1).to(device) # 在进程0中打印验证进度 if is_main_process(): data_loader = tqdm(data_loader, file=sys.stdout) # 在主进程中包装dataloader for step, data in enumerate(data_loader): images, labels = data pred = model(images.to(device)) # 获取预测概率 pred = torch.max(pred, dim=1)[1] # 获取预测概率最的max """ torch.eq(tensor, tensor/value) 对两个张量Tensor进行逐元素的比较         ,若相同位置的两个元素相同      ,则返回True;若不同                    ,返回False                   。 """ sum_num += torch.eq(pred, labels.to(device)).sum() # 计算所有预测正确的数量 # 等待所有进程计算完毕 if device != torch.device("cpu"): torch.cuda.synchronize(device) sum_num = reduce_value(sum_num, average=False) # 统计所有预测正确的数量 return sum_num.item()

5.2 分布式训练

5.2.1 main

from cgi import test import os import math import tempfile import argparse import torch import torch.optim as optim import torch.optim.lr_scheduler as lr_scheduler from torch.utils.tensorboard import SummaryWriter from torchvision import transforms from model import resnet34 from my_dataset import MyDataSet from utils import read_split_data, plot_data_loader_image from multi_train_utils.distributed_utils import init_distributed_mode, dist, cleanup from multi_train_utils.train_eval_utils import train_one_epoch, evaluate def main(args): if torch.cuda.is_available() is False: # 没有GPU设备会直接报错 raise EnvironmentError("not find GPU device for training.") # 初始化各进程环境 -> args容器中多了几个参数:1. args.rank; 2. args.world_size; 3. args.gpu init_distributed_mode(args=args) # 将args中新增的DDP参数赋值到全局变量中 rank = args.rank device = torch.device(args.device) batch_size = args.batch_size weights_path = args.weights """ 当我们在使用多GPU并行训练时            ,梯度一般是将多块GPU的梯度求平均                。 在原本的单卡上   ,我们的每学习一个step                    ,梯度前进1m(这里是为了方便理解)                ,如果学习两部,则梯度前进2m 假设我们的GPU数量为2。那么我们看起来是学习了一步                ,但因为有2块GPU                    ,所以是两块一起运算   ,那么就是一次性 算了两个step            ,但更新时我们对梯度进行了平均                    ,所以这2个step的值只更新了依次      ,意味着梯度只前进了1m 学习率变相地降低了         ,所以我们需要扩大学习率 """ args.lr *= args.world_size # 学习率要根据并行GPU的数量进行倍增 -> 这里是简单粗暴的增大学习率 """ 使用DDP时                    ,一般的写入操作                    、打印操作都是放在第一个进程(主进程)中操作的(没有必要在每一个进程中执行相同的操作) """ if rank == 0: # 在第一个进程中打印信息         ,并实例化tensorboard(只在第一个进程中打印参数) print(args) print(Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/) tb_writer = SummaryWriter() if os.path.exists("./weights") is False: os.makedirs("./weights") train_info, val_info, num_classes = read_split_data(args.data_path) train_images_path, train_images_label = train_info val_images_path, val_images_label = val_info # check num_classes assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes, num_classes) data_transform = { "train": transforms.Compose([transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]), "val": transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])} # 实例化训练数据集 train_data_set = MyDataSet(images_path=train_images_path, images_class=train_images_label, transform=data_transform["train"]) # 实例化验证数据集 val_data_set = MyDataSet(images_path=val_images_path, images_class=val_images_label, transform=data_transform["val"]) # 给每个rank对应的进程分配训练的样本索引 train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set) val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set) # 将样本索引每batch_size个元素组成一个list # BatchSampler是对train_sampler做进一步的处理 train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, batch_size, drop_last=True) nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers if rank == 0: # 打印也是在第一个进程中 print(Using {} dataloader workers every process.format(nw)) train_loader = torch.utils.data.DataLoader(train_data_set, # 传入的仍然是实例化的训练集(不是DistributedSampler) batch_sampler=train_batch_sampler, # 这里传入的是BatchSampler      ,而不是简单的batch_size pin_memory=True, # 直接将数据加载到GPU中                    ,从而达到加速的效果 num_workers=nw, # num workers collate_fn=train_data_set.collate_fn) val_loader = torch.utils.data.DataLoader(val_data_set, # 传入的仍然是实例化的验证集(不是DistributedSampler) batch_size=batch_size, # 这里传入的还是batch_size            ,而不是BatchSampler sampler=val_sampler, # 因为我们刚才没有使用BatchSampler对DistributedSampler进行处理   ,所以这里 # 直接传入DistributedSampler                    ,将验证集进行随机打乱后均匀地分配给不同设备 # 这里重点不是随机打乱                ,而是均匀地分配给不同的设备                。 pin_memory=True, num_workers=nw, collate_fn=val_data_set.collate_fn) # 实例化模型并指认到指定的设备中                   。 # Note:刚才在定义args时,device默认的是cuda                ,而在初始化DDP环境时有 torch.cuda.set_device(args.gpu) # 将对应的GPU指认到args.gpu                    ,后面还有device = torch.device(args.device) # 所以这里直接使用device就可以了 -> 会帮助我们自动分配到对应的GPU上 model = resnet34(num_classes=num_classes).to(device) # 如果存在预训练权重则载入 if os.path.exists(weights_path): weights_dict = torch.load(weights_path, map_location=device) load_weights_dict = {k: v for k, v in weights_dict.items() if model.state_dict()[k].numel() == v.numel()} model.load_state_dict(load_weights_dict, strict=False) """ 如果我们使用多GPU训练   ,我们必须保证每个设备上初始的权重的是一模一样的!这样我们使用多GPU训练 才是正确的;如果我们初始化的权重都不一样的化            ,那么在训练过程中所求得的梯度其实就不是针对同一组 参数而言的    。 那我们应该怎么做呢?这里使用的是以下的语句: checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt") 其中                    ,tempfile.gettempdir()用于返回保存临时文件的文件夹路径             。所以我们可以得到一个保存临时文件的路径: xxxxtempfolder/initial_weights.pt 之后我们再将主进程模型的权值文件保存到这个临时文件中 if rank == 0: # 在主进程中保存模块的初始化权重 torch.save(model.state_dict(), checkpoint_path) 最终让所有模型都读入该权重(不管是主进程还是其他进程 <=> 不管是生成pt文件的GPU还是其他GPU): model.load_state_dict(torch.load(checkpoint_path, map_location=device)) """ else: # 如果不存在权重 checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt") # 如果不存在预训练权重      ,需要将第一个进程中的权重保存         ,然后其他进程载入                    ,保持初始化权重一致 if rank == 0: # 在主进程中保存模块的初始化权重 torch.save(model.state_dict(), checkpoint_path) dist.barrier() # 等待所有GPU都执行到这步 # 这里注意         ,一定要指定map_location参数      ,否则会导致第一块GPU占用更多资源 # map_location用于重定向                    ,参考:https://blog.csdn.net/qq_43219379/article/details/123675375 model.load_state_dict(torch.load(checkpoint_path, map_location=device)) # 是否冻结权重 if args.freeze_layers: for name, para in model.named_parameters(): # 除最后的全连接层外            ,其他权重全部冻结(FC层中是没有BN的   ,所以使用同步功能的BN是没有意义的) if "fc" not in name: para.requires_grad_(False) else: # 不冻结权重 # 只有训练带有BN结构的网络时使用SyncBatchNorm才有意义(不带BN的话就不用了) if args.syncBN: # 使用带有同步功能的BN # 使用SyncBatchNorm后训练会更耗时 # 这个将BN换为sync_BN是不用管BN是2d还是3d的 model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device) # 转为DDP模型:使用torch.nn.parallel.DistributedDataParallel包装我们的模型                    ,再指认对应的设备ID model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) # optimizer pg = [p for p in model.parameters() if p.requires_grad] optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005) # Scheduler https://arxiv.org/pdf/1812.01187.pdf lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf) for epoch in range(args.epochs): """ 前面我们提到了                ,使用DistributedSampler会打乱数据并将数据均匀的分配给每个设备                  。 但是设备的拥有的数据是不变的       。设置该方法后,每次DistributedSampler打乱的顺序就不同了                ,这就导致 每个设备所拥有的数据不同 -> 让每个设备都能接触到所有的样本 """ train_sampler.set_epoch(epoch) mean_loss = train_one_epoch(model=model, optimizer=optimizer, data_loader=train_loader, device=device, epoch=epoch) scheduler.step() sum_num = evaluate(model=model, data_loader=val_loader, device=device) # 这里的val_sampler.total_size为整个验证集样本的总数(包括补充的数据) acc = sum_num / val_sampler.total_size if rank == 0: # 在主进程中给Tensorboard添加数据 print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3))) tags = ["loss", "accuracy", "learning_rate"] tb_writer.add_scalar(tags[0], mean_loss, epoch) tb_writer.add_scalar(tags[1], acc, epoch) tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch) # 在主进程中保存权重参数 torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch)) # 删除临时缓存文件 """ 如果不使用预训练权重                    ,那么会生成一个temp file用来保证所有设备模型的初始化一致 所以我们要将其删除 """ if rank == 0: if os.path.exists(checkpoint_path) is True: os.remove(checkpoint_path) # 当训练完毕后   ,我们需要调用 cleanup 这个方法去摧毁进程组 -> 释放资源 cleanup() if __name__ == __main__: parser = argparse.ArgumentParser() parser.add_argument(--num_classes, type=int, default=5) parser.add_argument(--epochs, type=int, default=30) parser.add_argument(--batch-size, type=int, default=16) parser.add_argument(--lr, type=float, default=0.001) parser.add_argument(--lrf, type=float, default=0.1) # 是否启用SyncBatchNorm parser.add_argument(--syncBN, type=bool, default=True) # 数据集所在根目录 # http://download.tensorflow.org/example_images/flower_photos.tgz parser.add_argument(--data-path, type=str, default="/home/wz/data_set/flower_data/flower_photos") # resnet34 官方权重下载地址 # https://download.pytorch.org/models/resnet34-333f7ec4.pth parser.add_argument(--weights, type=str, default=resNet34.pth, help=initial weights path) parser.add_argument(--freeze-layers, type=bool, default=False) # 以下三个参数不要进行修改            ,系统会自动分配 parser.add_argument(--device, default=cuda, help=device id (i.e. 0 or 0,1 or cpu)) # 开启的进程数(注意不是线程),不用设置该参数                    ,会根据nproc_per_node自动设置 parser.add_argument(--world-size, default=4, type=int, help=number of distributed processes) parser.add_argument(--dist-url, default=env://, help=url used to set up distributed training) opt = parser.parse_args() main(opt)

5.2.2 distributed_utils

import os import torch import torch.distributed as dist def init_distributed_mode(args): """ DDP可以使用在: 1. 多机多卡 2. 单机多卡 + 如果是多机多卡的场景下      ,WORLD_SIZE对应所有机器中使用的进程数量(一个进程对应一块GPU)          。 + RANK代表所有进程中的第几个进程 + LOCAL_RANK对应当前机器中第几个进程 因为我们讲的是单机多卡的场景         ,所以这里的: + WORLD_SIZE就是有几块GPU + RANK代表哪块GPU + LOCAL_RANK和RANK是一样的 因为我们的脚本使用的是 torch.distributed.launch 这个方法                    ,所以需要使用 --use_env 这个方法(args.use_env==True) """ if RANK in os.environ and WORLD_SIZE in os.environ: # 我们使用的是这个 args.rank = int(os.environ["RANK"]) # 将环境中的RANK强转为int后传入args.rank变量 args.world_size = int(os.environ[WORLD_SIZE]) args.gpu = int(os.environ[LOCAL_RANK]) elif SLURM_PROCID in os.environ: args.rank = int(os.environ[SLURM_PROCID]) args.gpu = args.rank % torch.cuda.device_count() else: print(Not using distributed mode) args.distributed = False return args.distributed = True # 将是否使用分布式训练的flag设置为True # 指定当前进程使用的GPU                  。在使用单机多卡的时候         ,其实是针对每一个GPU起了一个进程 torch.cuda.set_device(args.gpu) args.dist_backend = nccl # 通信后端      ,nvidia GPU推荐使用NCCL # 打印当前所使用GPU的RANK及其distributed url信息 print(| distributed init (rank {}): {}.format(args.rank, args.dist_url), flush=True) # 通过 init_process_group方法创建进程组 dist.init_process_group(backend=args.dist_backend, # 通信后端(N卡推荐使用nccl) init_method=args.dist_url, # 初始化方法(直接使用默认的方法 -> "env://") world_size=args.world_size, # 对于不同的进程而言(一块GPU分配一个进程)                    ,WORLD_SIZE是一样的 rank=args.rank) # 但不同的进程中            ,RANK是不一样的(如果有两张GPU并行   ,第一张GPU的RANK=0                    ,第二张GPU的RANK=1) # 调用barrier方法等待所有的GPU都运行到这个地方之后再往下走 dist.barrier() def cleanup(): dist.destroy_process_group() def is_dist_avail_and_initialized(): """检查是否支持分布式环境""" if not dist.is_available(): return False if not dist.is_initialized(): return False return True def get_world_size(): if not is_dist_avail_and_initialized(): return 1 return dist.get_world_size() def get_rank(): if not is_dist_avail_and_initialized(): return 0 return dist.get_rank() def is_main_process(): return get_rank() == 0 def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: # 单GPU的情况 return value with torch.no_grad(): # 多GPU的情况下 dist.all_reduce(value) # 通过 all_reduce 方法对不同设备的value进行求和操作 # 通过 all_reduce 操作之后                ,value就成了所有设备value的之和 if average: # 如果要进行求平均 value /= world_size # world_size为GPU数量 return value

5.2.3 train_eval_utils

import sys from tqdm import tqdm import torch from multi_train_utils.distributed_utils import reduce_value, is_main_process def train_one_epoch(model, optimizer, data_loader, device, epoch): model.train() # 声明模型的状态 loss_function = torch.nn.CrossEntropyLoss() # 定义损失函数 mean_loss = torch.zeros(1).to(device) # 生成shape为[1, ]的全零矩阵,并添加到对应的设备中 -> 用于存储后续计算得到mean_loss # 清空优化器中的梯度残留 optimizer.zero_grad() # 在进程0中打印训练进度 if is_main_process(): # 判断是否为主进程 # 使用tqdm库包装一下data_loader这个变量                ,一会儿在遍历data_loader时就会打印一个进度条 data_loader = tqdm(data_loader, file=sys.stdout) # 迭代data_loader                    ,获取step和对应的数据 """ 在data_loader中就定义了BS的大小, 所以这里step就是$step = 所有图片的数量 / BS$ step即为BS的数量, 每走过一次step即为处理了BS张图片 第1个step对应第1个bs 第2个step对应第2个bs 第3个step对应第3个bs ... """ for step, data in enumerate(data_loader): images, labels = data # data包含两部分数据:1. 预处理后的图片;2. 对应的真实标签 # 通过正向传播获取图片的预测结果 pred = model(images.to(device)) # 根据预测结果与真实标签计算loss loss = loss_function(pred, labels.to(device)) # 对loss进行反向传播 """ 这里计算的loss是针对当前GPU当前batch求得的损失          。但是在单机多卡的环境中   ,我们想要求得所有GPU的平均损失       。 所有我们应该想办法求得不同设备之间求得的loss的均值                  。这里是通过 reduce_value这个方法实现的 """ loss.backward() # 对loss进行求平均处理 """ reduce_value: def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: # 单GPU的情况 return value # 原值返回 with torch.no_grad(): dist.all_reduce(value) if average: value /= world_size # 需要除以GPU数量后再返回 return value """ loss = reduce_value(loss, average=True) # 这行代码在单机单卡环境是不起作用的 # 对历史损失求平均 mean_loss = (mean_loss * step + loss.detach()) / (step + 1) # update mean losses # 在进程0中打印平均loss if is_main_process(): # 判断是否为主进程 data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3)) # 判断loss是否为有限数据(不能是infty) if not torch.isfinite(loss): print(WARNING: non-finite loss, ending training , loss) sys.exit(1) # 如果loss为无穷            ,则退出训练 optimizer.step() # 参数优化器进行参数更新 optimizer.zero_grad() # 参数优化器更新完参数后                    ,需要将梯度清空 # 等待所有进程计算完毕 if device != torch.device("cpu"): torch.cuda.synchronize(device) # 等待CUDA设备上所有流中的所有内核完成             。 # 返回计算求得的平均loss return mean_loss.item() """ 这里使用了@torch.no_grad()这个装饰器对该方法进行修改      ,也可以使用 with torch.no_grad: 这个上下文管理器 """ @torch.no_grad() def evaluate(model, data_loader, device): model.eval() # 声明模型状态 -> 1. 关闭BN; 2. Dropout # 用于存储预测正确的样本个数 sum_num = torch.zeros(1).to(device) # 在进程0中打印验证进度 if is_main_process(): data_loader = tqdm(data_loader, file=sys.stdout) # 在主进程中包装dataloader for step, data in enumerate(data_loader): images, labels = data pred = model(images.to(device)) # 获取预测概率 pred = torch.max(pred, dim=1)[1] # 获取预测概率最的max """ torch.eq(tensor, tensor/value) 对两个张量Tensor进行逐元素的比较         ,若相同位置的两个元素相同                    ,则返回True;若不同         ,返回False    。 """ sum_num += torch.eq(pred, labels.to(device)).sum() # 计算所有预测正确的数量 # 等待所有进程计算完毕 if device != torch.device("cpu"): torch.cuda.synchronize(device) # 统计所有预测正确的数量 # 这里使用 reduce_value 方法实现所有GPU的sum_num变量之和 sum_num = reduce_value(sum_num, average=False) return sum_num.item()

6. 补充知识

6.1 Cosine学习率变化曲线

lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - arg.lrf) + args.lrf

l

r

(

e

p

o

c

h

)

=

1

2

[

1

+

cos

π

e

p

o

c

h

e

p

o

c

h

s

]

×

(

1

α

)

+

α

\mathrm{lr(epoch)} = \frac{1}{2}[1 + \cos\frac{\pi \cdot \mathrm{epoch}}{\mathrm{epochs}}] \times (1 - \alpha) + \alpha

lr(epoch)=21[1+cosepochsπepoch]×(1α)+α

其中:

l

f

\mathrm{lf}

lf
为当前学习率

e

p

o

c

h

\mathrm{epoch}

epoch
为当前迭代的epoch数

e

p

o

c

h

s

\mathrm{epochs}

epochs
为总的epoch数

α

\alpha

α
为倍率因子

其函数曲线为:

6.2 DistributedSampler的讲解

# 实例化训练数据集 train_data_set = MyDataSet(images_path=train_images_path, images_class=train_images_label, transform=data_transform["train"]) # 实例化验证数据集 val_data_set = MyDataSet(images_path=val_images_path, images_class=val_images_label, transform=data_transform["val"]) # 给每个rank对应的进程分配训练的样本索引 train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set) val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)

现在对torch.utils.data.distributed.DistributedSampler进行讲解                   。

假设当前的数据集一共有11个样本 -> [0, 1, 2, 3, 4, …, 10]                。

首先DistributedSampler会对数据进行Shuffle处理      ,得到[6, 1, 9, 3, …, 7]这样随机的数据顺序。然后根据GPU的数量进行计算                。假设我们使用2块GPU进行并行                   。那么DistributedSampler:

先会对总的样本数/2(向上取整)=6    。 再对得到的数乘上GPU的个数                    ,即6*2=12 这个12就是2块GPU需要的数据总的样本个数             。但是我们的样本总数是11            ,那么就会对其进行补充                  。怎么补充?—— 对最开头的数据进行复制   ,再放到最后       。这里我们缺了1个数据                    ,所有就将6补充到最后          。(如果我们差3个数据                ,就将[6, 1, 9]进行复制,补充到后面)

这样我们就有12个数据了                ,就可以均衡的分配到每一个GPU设备中了                  。

最后对数据进行分配          。分配的方式也非常简单                    ,就是间隔地将数据分配到不同的设备中       。

通过DistributedSampler   ,我们就将所有的数据平分到各个设备上了                  。设备只能使用分配的数据            ,不能使用其他设备拥有的数据             。

6.3 BatchSampler

# 给每个rank对应的进程分配训练的样本索引 train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set) val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set) # ----------------------------------------------------------- # 将样本索引每batch_size个元素组成一个list # BatchSampler是对train_sampler做进一步的处理 train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, batch_size, drop_last=True)

假设我们有2块GPU                    ,通过DistributedSampler后      ,数据被均匀地分配到两个设备中    。我们拿第一块GPU为例         ,它分配到的数据索引为[6, 9, 10, 1, 8, 7]                   。假设我们的Batch Size=2                    ,那么BatchSampler会将不同设备的数据以2个为一组组合在一起                。

在BatchSampler中还有一个参数drop_last。这个参数的意思是:

如果我们的BS=4         ,那么我们发现      ,只有前4个数据可以打包为一组                    ,后2个数据凑不够了:

如果Droplast==True            ,那么剩下的2个数据就被丢弃了                。 如果Droplast==False   ,那么剩下的2个数据打包为一个batch                   。

Note:

一般BatchSampler只处理训练集的DistributedSampler                    ,验证集的不需要处理    。

7. 总结

打印      、保存模型         、计算时间等操作要放在主线程中执行 (rank==0) 别忘记给模型用DDP进行包装 加载数据集后别忘记使用DistributedSampler进行处理;还有BatchSampler对前者进行处理 记得对loss队形reduce_value的计算 对需要累加的东西都别忘了使用reduce_value进行计算 loss.backward()直接进行梯度反向传播就行                ,不需要先进行reduce_value再传播             。因为每张卡的loss是不一样的,是针对自己的数据求出来的值                ,mean loss只是展示用                    ,并不是实际反向传播的loss to(device)的方法很好用   ,以后就可以不用.cuda()了 😂 tensorboard里面东西排序是看英文顺序            ,不是语句顺序 如果追求速度                    ,关闭同步的BN(SyncBN)

知识来源

https://www.bilibili.com/video/BV1yt4y1e7sZ?share_source=copy_pc

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
wordpress模板兔(破解WordPress模版兔插件ErphpDown会员系统付费下载插件11.7完美破解版-电脑学习网破解)