PyTorch并⾏与分布式(三)DataParallel原理、源码解析、举例实战
⽂章⽬录
简要概览
pytorch官⽅提供的数据并⾏类为:
当给定model时,主要实现功能是将input数据依据batch的这个维度,将数据划分到指定的设备上。其他的对象(objects)复制到每个设备上。在前向传播的过程中,module被复制到每个设备上,每个复制的副本处理⼀部分输⼊数据。在反向传播过程中,每个副本module的梯度被汇聚到原始的module上计算(⼀般为第0块GPU)。
并且这⾥要注意的⼀点是,这⾥官⽅推荐是⽤DistributedDataParallel,因为DistributedDataParallel使⽤的是多进程⽅式,
⽽DataParallel使⽤的是多线程的⽅式。如果使⽤的是DistributedDataParallel,你需要使⽤torch.distributed.launch去launch程序,参考。
batch size的⼤⼩⼀定要⼤于GPU的数量,我在实践过程中batch size的⼤⼩⼀般设置为GPU块数的倍数。在数据分配到不同的机器上的时候,传⼊module的数据同样都可以传⼊DataParallel(并⾏之后的module类型)中,但是tensor默认按照dim=0分配到不同的机器
上,tuple, list,dict类型的数据被浅拷贝到不同的GPU上,其它类型的数据将会被分配到不同的进程中。
在调⽤DataParallel之前,module必须要具有他⾃⼰的参数(能获取到模型的参数),还需要在指定的GPU上具有buffer(不然会报内存出错)。
火锅底料炒锅在前向传播的过程中,module被复制到每个设备上,因此在前线传播过程中的任何更新都会丢失。举例来说,如果module有⼀个counter属性,在每次前线传播过程中都会加1,它将会保留在初始值状态,因为更新在副本上,但是副本前线传播完就被销毁了。
然⽽在DataParallel中,device[0]上的副本将其参数和内存数据与并⾏的module共享,因此在device[0]上更新数据将会被记录。
返回的结果是来⾃各个device上的数据的汇总。默认是dim 0维度上的汇总。因此在处理RNN时序数据时就需要注意这⼀点。
DataParallel()函数的参数主要有module,device_ids,output_device这三个。
1. module为需要并⾏的module。
2. device_ids为⼀个list,默认为所有可操作的devices。
3. output_device为需要输出汇总的指定GPU,默认为device_ids[0]号。
简单的举例为:
>>> net = DataParallel(model, device_ids=[0,1,2])
>>> output = net(input_var)# input_var can be on any device, including CPU
源码解析
data_parallel.py的源码地址为:
源码注释
import operator
import torch
import warnings
from itertools import chain
from.scatter_gather import scatter_kwargs, gather
from.parallel_apply import parallel_apply
from torch._utils import(
_get_all_device_indices,
锦鲤混养最佳搭配
_get_available_device_type,
_get_device_index,
_get_devices_properties
)
def_check_balance(device_ids):
imbalance_warn ="""
进博会放假There is an imbalance between your GPUs. You may want to exclude GPU {} which
has less than 75% of the memory or cores of GPU {}. You can do so by tting
the device_ids argument to DataParallel, or by tting the CUDA_VISIBLE_DEVICES
environment variable."""
device_ids =[_get_device_index(x,True)for x in device_ids]
dev_props = _get_devices_properties(device_ids)
def warn_imbalance(get_prop):
values =[get_prop(props)for props in dev_props]
min_pos, min_val =min(enumerate(values), key=operator.itemgetter(1))
max_pos, max_val =max(enumerate(values), key=operator.itemgetter(1)) if min_val / max_val <0.75:
warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos])) return True
return Fal
if warn_imbalance(lambda props: al_memory):
return
if warn_imbalance(lambda props: props.multi_processor_count):
return
DataParallel类初始化:
class DataParallel(Module):
# TODO: update notes/cuda.rst when this class handles 8+ GPUs well
def__init__(lf, module, device_ids=None, output_device=None, dim=0): super(DataParallel, lf).__init__()
# 通过调⽤torch.cuda.is_available()判断是返回“cuda”还是None。
device_type = _get_available_device_type()
if device_type is None:# 检查是否有GPU
# 如果没有GPU的话,module就不能够并⾏,直接赋值,设备id置空
lf.device_ids =[]
return
军嶂古道
if device_ids is None:# 如果没有指定GPU,则默认使⽤所有可⽤的GPU # 获取所有可⽤的设备ID,为⼀个list。
device_ids = _get_all_device_indices()
if output_device is None:# 判断输出设备是否指定
output_device = device_ids[0]# 默认为指定设备的第⼀个
lf.dim = dim
lf.device_ids =[_get_device_index(x,True)for x in device_ids]
lf.output_device = _get_device_index(output_device,True)
lf.src_device_obj = torch.device(device_type, lf.device_ids[0])
_check_balance(lf.device_ids)
if len(lf.device_ids)==1:
<(lf.src_device_obj)
前向传播
def forward(lf,*inputs,**kwargs):
# 如果没有可⽤的GPU则使⽤原来的module来计算乐趣园
if not lf.device_ids:
dule(*inputs,**kwargs)
# 这⾥应该是判断模型的参数和buffer都要有。
for t in dule.parameters(), lf.module.buffers()):
if t.device != lf.src_device_obj:
rai RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(lf.src_device_obj, t.device))
# ⽤scatter函数将input平均分配到每个GPU上
inputs, kwargs = lf.scatter(inputs, kwargs, lf.device_ids)黑桃7
句加鸟念什么# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs =((),)
kwargs =({},)
if len(lf.device_ids)==1:# 只有⼀个给定的GPU的话,就直接调⽤未并⾏的module,否者进⼊下⼀步
dule(*inputs[0],**kwargs[0])
replicas = lf.dule, lf.device_ids[:len(inputs)])# replicate函数主要讲模型复制到多个GPU上
outputs = lf.parallel_apply(replicas, inputs, kwargs)# 并⾏地在多个GPU上计算模型。
return lf.gather(outputs, lf.output_device)# 将数据聚合到⼀起,传送到output_device上,默认也是dim 0维度聚合。
def replicate(lf, module, device_ids):
return replicate(module, device_ids,not torch.is_grad_enabled())
def scatter(lf, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=lf.dim)
def parallel_apply(lf, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, lf.device_ids[:len(replicas)])
def gather(lf, outputs, output_device):
return gather(outputs, output_device, dim=lf.dim)
scatter函数:
def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
return Scatter.apply(target_gpus,None, dim, obj)
if isinstance(obj,tuple)and len(obj)>0:
return list(zip(*map(scatter_map, obj)))
晒晒我们班的牛人if isinstance(obj,list)and len(obj)>0:
return list(map(list,zip(*map(scatter_map, obj))))
if isinstance(obj,dict)and len(obj)>0:
return list(map(type(obj),zip(*map(scatter_map, obj.items()))))
return[obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (becau the
# fn is recursive). To avoid this reference cycle, we t the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map =None
return res
在前向传播中,数据需要通过scatter函数分配到每个GPU上,代码在scatter_gather.py⽂件下,如果输⼊的类型不是tensor的话,会依据数据类型处理⼀下变成tensor,再递归调⽤scatter_map,最后调⽤Scatter.apply⽅法将数据依据给定的GPU给划分好返回。
replicate函数:
replicate函数需要将模型给复制到每个GPU上。如果你定义的模型是ScriptModule的话,也就是在编写⾃⼰model的时候不是继承
的nn.Module,⽽是继承的nn.ScriptModule,就不能复制,会报错。
这个函数主要就是将模型参数、buffer等需要共享的信息,复制到每个GPU上,感兴趣的⾃⼰看吧。
data_parallel