在PyTorch分布式训练场景中,多个进程会并行执行数据加载、前向计算、反向传播、参数同步等任务,不同进程的执行速度、资源占用情况存在差异,很容易出现部分进程已经执行到后续步骤,而其他进程还在处理前序任务的情况,这种不同步会引发参数更新错误、数据读取异常等问题,dist.barrier就是用来解决这类进程同步问题的核心工具。

PyTorch分布式训练基础
PyTorch的分布式训练主要通过torch.distributed模块实现,使用前需要先完成分布式环境的初始化,常用的初始化方式是使用nccl后端(针对GPU场景)或者gloo后端(针对CPU场景)。初始化完成后,每个进程会被分配一个唯一的rank编号,用于标识进程身份,所有进程通过集合通信接口完成信息交互。
分布式环境初始化的基础代码示例如下:
import torch
import torch.distributed as dist
import os
def init_distributed():
# 从环境变量获取当前进程的rank和总进程数
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
# 初始化进程组,使用nccl后端适配GPU场景
dist.init_process_group(backend='nccl', rank=rank, world_size=world_size)
return rank, world_size
为什么需要dist.barrier同步进程
多进程执行过程中,以下场景很容易出现同步问题:
- 数据预处理阶段,不同进程加载本地数据文件的速度不同,部分进程已经加载完成开始后续计算,其他进程还在读取文件,会导致后续批次数据对齐错误
- 模型保存阶段,如果多个进程同时执行模型保存操作,会出现文件写入冲突,导致保存的模型文件损坏
- 自定义集合通信操作前,需要确保所有进程都已经准备好对应的数据,否则会出现部分进程发送数据,其他进程还未准备好接收的情况
dist.barrier的作用就是让所有调用该接口的进程进入阻塞状态,直到所有参与分布式训练的进程都执行到dist.barrier调用处,才会同时解除阻塞继续执行后续代码,以此保证所有进程的执行节奏一致。
dist.barrier的使用方法
基本调用方式
dist.barrier的调用非常简单,不需要传入额外参数,只需要在需要同步的位置调用即可,但是调用前必须保证进程组已经初始化完成。基础调用示例如下:
import torch.distributed as dist
# 假设进程组已经初始化完成
def sync_process():
# 所有进程执行到这里都会阻塞,直到所有进程都到达该位置
dist.barrier()
print("所有进程都已完成同步,继续执行后续逻辑")
结合常见场景的使用示例
场景1:数据加载阶段同步
在数据加载完成后进行同步,确保所有进程都加载完当前批次的数据再开始前向计算:
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
def train_epoch(model, dataset, rank, world_size):
# 使用分布式采样器,保证每个进程加载不同的数据分片
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
for batch_data in dataloader:
# 数据加载完成后同步,确保所有进程都拿到当前批次数据
dist.barrier()
# 将数据放到对应设备上
batch_data = batch_data.to(rank)
# 后续前向计算、反向传播逻辑
output = model(batch_data)
# ... 其他训练逻辑
场景2:模型保存阶段同步
模型保存前先同步,避免多进程同时写入模型文件:
import torch
import torch.distributed as dist
def save_model(model, rank, save_path):
# 保存前先同步,确保所有进程都完成了当前轮的参数更新
dist.barrier()
# 只让rank为0的进程执行保存操作,避免重复写入
if rank == 0:
torch.save(model.state_dict(), save_path)
print("模型保存完成")
使用dist.barrier的注意事项
- 必须在进程组初始化之后调用dist.barrier,否则会抛出进程组未初始化的错误
- 所有参与分布式训练的进程都必须调用dist.barrier,如果有进程没有调用,会导致其他进程一直阻塞,最终程序卡死
- 不要在不需要同步的位置频繁调用dist.barrier,因为阻塞等待会浪费进程的执行时间,降低分布式训练的整体效率
- 如果使用了DistributedDataParallel包装模型,参数同步会自动由DDP完成,不需要额外使用dist.barrier做参数同步,避免重复操作
完整分布式训练示例
下面是一个包含dist.barrier同步逻辑的完整分布式训练代码片段:
import torch
import torch.nn as nn
import torch.distributed as dist
import os
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
def init_distributed():
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
dist.init_process_group(backend='nccl', rank=rank, world_size=world_size)
return rank, world_size
def main():
rank, world_size = init_distributed()
torch.cuda.set_device(rank)
# 定义简单模型
model = nn.Linear(10, 2).to(rank)
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank])
# 构造模拟数据集
data = torch.randn(1000, 10)
labels = torch.randint(0, 2, (1000,))
dataset = TensorDataset(data, labels)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
# 训练循环
for epoch in range(3):
sampler.set_epoch(epoch)
for batch_x, batch_y in dataloader:
# 数据加载后同步
dist.barrier()
batch_x = batch_x.to(rank)
batch_y = batch_y.to(rank)
output = model(batch_x)
loss = criterion(output, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 每个epoch结束后同步,再保存模型
dist.barrier()
if rank == 0:
torch.save(model.module.state_dict(), f"model_epoch_{epoch}.pth")
dist.destroy_process_group()
if __name__ == "__main__":
main()
PyTorchdist_barrier分布式训练进程同步修改时间:2026-07-04 19:06:26