Pytorch通信算子组合测试

ops/2025/1/18 5:01:52/

Pytorch通信算子组合测试

  • 一.背景
  • 二.相关链接
  • 三.遇到的问题
  • 四.操作步骤
    • 1.登录服务器
    • 2.查看拓扑
    • 3.准备测试用例
      • A.准备目录
      • B.用例代码
    • 4.创建docker容器
    • 5.查看当前pytorch版本
    • 6.运行测试程序

一.背景

  • 测试pytorch通信算子不同配置下的功能及性能
  • 测试不同的group组合
  • 测试不同的tensor:数据类型、shape[小到大,是否对齐]、不同的dim大小
  • 不同的reduce op:SUM,PRODUCT,MAX,MIN,PREMUL_SUM,AVG
  • async_op: True False
  • 单机多卡、多机多卡(走IB、不走IB)、是否启动共享内存
  • 覆盖的算子
    • barrier
    • all_gather
    • all_to_all
    • scatter
    • gather
    • broadcast
    • send\recv
    • reduce
    • all_reduce
    • reduce_scatter
    • all_to_all_single

二.相关链接

  • NCCL环境变量
  • pytorch通信算子
  • Writing Distributed Applications with PyTorch

三.遇到的问题

  • /dev/shm空间不足: 启动docker容器时加入–shm-size=128g

四.操作步骤

1.登录服务器


2.查看拓扑

nvidia-smi topo -m

输出

        GPU0    GPU1    GPU2    GPU3    GPU4    GPU5    GPU6    GPU7    CPU Affinity    NUMA Affinity
GPU0     X      NODE    NODE    NODE    SYS     SYS     SYS     SYS     0-31,64-95      0
GPU1    NODE     X      NODE    NODE    SYS     SYS     SYS     SYS     0-31,64-95      0
GPU2    NODE    NODE     X      NODE    SYS     SYS     SYS     SYS     0-31,64-95      0
GPU3    NODE    NODE    NODE     X      SYS     SYS     SYS     SYS     0-31,64-95      0
GPU4    SYS     SYS     SYS     SYS      X      NODE    NODE    NODE    32-63,96-127    1
GPU5    SYS     SYS     SYS     SYS     NODE     X      NODE    NODE    32-63,96-127    1
GPU6    SYS     SYS     SYS     SYS     NODE    NODE     X      NODE    32-63,96-127    1
GPU7    SYS     SYS     SYS     SYS     NODE    NODE    NODE     X      32-63,96-127    1Legend:X    = SelfSYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA nodePHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)PXB  = Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge)PIX  = Connection traversing at most a single PCIe bridgeNV#  = Connection traversing a bonded set of # NVLinks
  1. NODE:表示连接在同一个NUMA节点内,支持直接的P2P访问,这通常是最理想的情况。
  2. PHB:此类连接表示穿越了PCIe Host Bridge,通常表示在同一个CPU上,有时候也支持P2P。
  3. PIX:表示穿越最多一个PCIe桥,也通常支持P2P,但可能不是最佳的性能。

在输出中,这三种连接方式(特别是NODE 和 PHB)都是表明您的GPU之间可以进行P2P通信的良好指示。
NODE或者PHB的连接表示,基本可以判断支持P2P。而SYSPXB通常不支持有效的P2P访问,因为它们之间的连接比较复杂,涉及多个桥接或连接。

3.准备测试用例

A.准备目录

mkdir nccl_test
cd nccl_test
vim nccl_benchmark.py

B.用例代码

python">import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import os
import torch
import argparse
import torch.distributed as dist
from torch.distributed import ReduceOp
import time
import numpy as np
from sklearn.metrics import mean_squared_error
from itertools import combinations# 全局配置
backend = "nccl"
dev_type = "cuda"# 定义数据类型映射表
dtype_map = {np.float32: torch.float32,np.float64: torch.float64,np.float16: torch.float16,np.int32: torch.int32,np.int64: torch.int64,np.int16: torch.int16,np.int8: torch.int8,np.uint8: torch.uint8
}def get_torch_dtype(np_dtype):torch_dtype = dtype_map.get(np_dtype)if torch_dtype is None:raise ValueError(f"不支持的 dtype: {np_dtype}")return torch_dtypedef generate_partitions(elements):"""生成元素的所有可能划分,满足每个划分中的组至少有两个元素"""if len(elements) < 2:return []result = []first = elements[0]for i in range(2, len(elements) + 1):for subset in combinations(elements, i):if first not in subset:continuesubset = set(subset)remaining = set(elements) - subsetif len(remaining) == 0:result.append([subset])else:for rest in generate_partitions(list(remaining)):partition = [subset] + restresult.append(partition)return resultdef generate_rank_lists(world_size):"""生成所有可能的 rank 列表,满足每个列表中的组至少有两个元素"""elements = list(range(world_size))partitions = generate_partitions(elements)all_rank_lists = []for partition in partitions:if all(len(group) >= 2 for group in partition):rank_lists = [None] * world_sizefor group in partition:for rank in group:rank_lists[rank] = sorted(list(group))all_rank_lists.append(rank_lists)return all_rank_listsdef get_group(rank_list, world_size):"""根据 rank_list 获取通信组"""group_size = len(rank_list)if world_size == group_size:group = dist.group.WORLDelse:group = dist.new_group(rank_list, use_local_synchronization=True)return groupdef validate_outputs(output_tensor, expected_output, input_dtype):"""验证输出张量是否与期望输出匹配"""output_tensor_cpu = output_tensor.cpu().type(input_dtype).reshape(-1)expected_output = expected_output.cpu().type(input_dtype).reshape(-1)has_inf = torch.isinf(output_tensor_cpu).any()has_nan = torch.isnan(output_tensor_cpu).any()invalid = has_inf or has_nanif invalid:mse = 255else:mse = mean_squared_error(output_tensor_cpu, expected_output)return msedef generate_random_tensor(seed, shape, dtype, device):"""生成指定形状和数据类型的随机张量"""np.random.seed(seed)random_tensor = np.random.uniform(-3, 3, shape).astype(dtype)tensor = torch.from_numpy(random_tensor).to(device)return tensordef compute_expected_reduce_output(cpu_input_tensor, op, group_size):"""计算期望的 reduce 操作输出"""if op == dist.ReduceOp.SUM or op == dist.ReduceOp.AVG:expected_output = cpu_input_tensor * group_sizeif op == dist.ReduceOp.AVG:expected_output = expected_output / group_sizeelif op == dist.ReduceOp.PRODUCT:expected_output = cpu_input_tensor ** group_sizeelif op == dist.ReduceOp.MAX or op == dist.ReduceOp.MIN:expected_output = cpu_input_tensorelif op == dist.ReduceOp.PREMUL_SUM:expected_output = cpu_input_tensor * group_sizeelse:raise ValueError(f"不支持的 ReduceOp 操作:{op}")return expected_outputdef test_reduce_scatter(dtype, shape, device, rank, all_rank_list, world_size, iters=5,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 reduce_scatter 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue  # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 将输入张量拆分为列表,长度为 group_sizeinput_list = list(input_tensor.chunk(group_size, dim=0))# 准备输出张量output_shape = [shape[0] // group_size] + list(shape[1:])output_tensor = torch.empty(output_shape, dtype=get_torch_dtype(dtype), device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.reduce_scatter(output_tensor, input_list, op=op, group=group, async_op=False)else:req = dist.reduce_scatter(output_tensor, input_list, op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出cpu_input_list = list(cpu_input_tensor.chunk(group_size, dim=0))if op == dist.ReduceOp.SUM or op == dist.ReduceOp.AVG:expected_chunks = [chunk * group_size for chunk in cpu_input_list]if op == dist.ReduceOp.AVG:expected_chunks = [chunk / group_size for chunk in expected_chunks]elif op == dist.ReduceOp.PRODUCT:expected_chunks = [chunk ** group_size for chunk in cpu_input_list]elif op == dist.ReduceOp.MAX or op == dist.ReduceOp.MIN:expected_chunks = cpu_input_listelif op == dist.ReduceOp.PREMUL_SUM:expected_chunks = [chunk * group_size for chunk in cpu_input_list]else:raise ValueError(f"不支持的 ReduceOp 操作:{op}")expected_output = expected_chunks[group_rank]# 验证输出mse = validate_outputs(output_tensor, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_reduce_scatter op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_reduce(dtype, shape, device, rank, all_rank_list, world_size, iters=5,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 all_reduce 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue  # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_reduce(input_tensor, op=op, group=group, async_op=False)else:req = dist.all_reduce(input_tensor, op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_output = compute_expected_reduce_output(cpu_input_tensor, op, group_size)# 验证输出mse = validate_outputs(input_tensor, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_reduce op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_reduce(dtype, shape, device, rank, all_rank_list, world_size, iters=1,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 reduce 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue  # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.reduce(input_tensor, dst=rank_list[0], op=op, group=group, async_op=False)else:req = dist.reduce(input_tensor, dst=rank_list[0], op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 仅在目标进程上计算期望输出if group_rank == 0:expected_output = compute_expected_reduce_output(cpu_input_tensor, op, group_size)mse = validate_outputs(input_tensor, expected_output, input_tensor.dtype)else:mse = 0mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_reduce op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_gather(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 all_gather 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)# 准备输出张量列表output_tensor_list = [torch.empty_like(input_tensor) for _ in range(group_size)]# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_gather(output_tensor_list, input_tensor, group=group, async_op=False)else:req = dist.all_gather(output_tensor_list, input_tensor, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_tensors = []for p in rank_list:expected_tensor = generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float()expected_tensors.append(expected_tensor)expected_output = torch.stack(expected_tensors, dim=0)# 验证输出output_tensors_cpu = torch.stack([t.cpu().float() for t in output_tensor_list], dim=0)mse = validate_outputs(output_tensors_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_gather dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_to_all(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 all_to_all 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"mse_arr, duration_arr = [], []split_size = shape[0] // group_sizefor i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)# 将输入张量拆分为输入列表input_tensor_list = list(input_tensor.chunk(group_size, dim=0))# 准备输出张量列表output_tensor_list = [torch.empty_like(input_tensor_list[0]) for _ in range(group_size)]# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_to_all(output_tensor_list, input_tensor_list, group=group, async_op=False)else:req = dist.all_to_all(output_tensor_list, input_tensor_list, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_chunks = []for p in rank_list:expected_tensor = generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float()expected_chunk = list(expected_tensor.chunk(group_size, dim=0))[group_rank]expected_chunks.append(expected_chunk)expected_output = torch.cat(expected_chunks, dim=0)# 验证输出output_tensor_cpu = torch.cat([t.cpu().float() for t in output_tensor_list], dim=0)mse = validate_outputs(output_tensor_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_to_all dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_scatter(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 scatter 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"split_size = shape[0] // group_sizemse_arr, duration_arr = [], []for i in range(iters):if group_rank == 0:input_tensor = generate_random_tensor(i, shape, dtype, device)input_tensor_list = list(input_tensor.chunk(group_size, dim=0))else:input_tensor_list = Noneoutput_tensor = torch.empty([split_size] + list(shape[1:]), dtype=torch_dtype, device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.scatter(output_tensor, scatter_list=input_tensor_list, src=rank_list[0], group=group)else:req = dist.scatter(output_tensor, scatter_list=input_tensor_list, src=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_output = generate_random_tensor(i, shape, dtype, device).cpu().float()expected_output = list(expected_output.chunk(group_size, dim=0))[group_rank]# 验证输出mse = validate_outputs(output_tensor, expected_output, output_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_scatter dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_gather(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 gather 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)if group_rank == 0:gather_list = [torch.empty_like(input_tensor) for _ in range(group_size)]else:gather_list = None# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.gather(input_tensor, gather_list=gather_list, dst=rank_list[0], group=group)else:req = dist.gather(input_tensor, gather_list=gather_list, dst=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 仅在目标进程上验证输出if group_rank == 0:expected_tensors = [generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float() for p in rank_list]gathered_tensors = [t.cpu().float() for t in gather_list]mse_list = [validate_outputs(gathered, expected, input_tensor.dtype)for gathered, expected in zip(gathered_tensors, expected_tensors)]mse = max(mse_list)else:mse = 0mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)if (max_mse > 1e-3 and group_rank == 0) or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_gather dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_broadcast(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 broadcast 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr, duration_arr = [], []for i in range(iters):expected_tensor = generate_random_tensor(i, shape, dtype, device)if group_rank == 0:input_tensor = expected_tensor.clone()else:input_tensor = torch.empty(shape, dtype=torch_dtype, device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.broadcast(input_tensor, src=rank_list[0], group=group)else:req = dist.broadcast(input_tensor, src=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 验证输出mse = validate_outputs(input_tensor, expected_tensor, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3)if do_print or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_broadcast dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_send_recv(dtype, shape, device, rank, all_rank_list,world_size, iters=5):"""测试 send_recv 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr = []duration_arr = []if rank not in rank_list[0:2]:returnfor i in range(iters):# 进程配对,假设将相邻的两个进程配对tag = i  # 通信的标签,用于匹配发送和接收# 使用固定种子生成数据np.random.seed(i)if group_rank == 0:# 偶数Rank作为发送进程input_array = np.random.uniform(-10, 10, shape).astype(dtype)input_tensor = torch.from_numpy(input_array).to(device)#if i%3==0:#    input_tensor=make_non_contiguous(input_tensor)# 记录发送开始时间start_time = time.time()if i % 2 == 0:dist.send(input_tensor, dst=rank_list[1], tag=tag)else:req = dist.isend(input_tensor, dst=rank_list[1], tag=tag)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 发送进程不需要验证结果mse_arr.append(0)else:# 奇数Rank作为接收进程output_tensor = torch.empty(shape, dtype=torch_dtype, device=device)# 记录接收开始时间start_time = time.time()if i % 2 == 0:dist.recv(output_tensor, src=rank_list[0], tag=tag)else:req = dist.irecv(output_tensor, src=rank_list[0], tag=tag)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 生成预期的张量用于比较expected_array = np.random.uniform(-10, 10, shape).astype(dtype)expected_tensor = torch.from_numpy(expected_array).cpu().float()# 将接收到的张量转换为 CPU,并转换为 float32 类型output_tensor_cpu = output_tensor.cpu().float()mse = validate_outputs(output_tensor_cpu, expected_tensor,torch_dtype)mse_arr.append(mse)# 打印结果if rank==rank_list[1]:max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)    print(f"mse:{max_mse:6.2e} duration(ms):{max_duration*1000:6.2f} "f"test_send_recv dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_to_all_single(dtype, shape, device, rank, all_rank_list,world_size, iters=5):"""这是一个针对 all_to_all_single 通信算子的单元测试函数。参数:- dtype: 数据类型- shape: 输入张量的形状- device: 设备类型(如 'cpu' 或 'cuda')- rank: 当前进程的排名- all_rank_list: 所有进程的排名列表- iters: 测试的迭代次数"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"mse_arr = []duration_arr = []# 计算每个子张量的大小split_size = shape[0] // group_size# 准备输入和输出的拆分大小列表input_split_sizes = [split_size for _ in range(group_size)]output_split_sizes = [split_size for _ in range(group_size)]for i in range(iters):# 为了使每个进程的输入张量不同,这里使用 (i + rank * 1000) 作为随机种子np.random.seed(i + rank * 1000)# 生成一个形状为 shape 的随机张量input_tensor = torch.from_numpy(np.random.uniform(-10, 10, shape).astype(dtype)).to(device)cpu_input_tensor = input_tensor.cpu().float()#if i%3==0:#    input_tensor=make_non_contiguous(input_tensor)# 记录通信开始时间start_time = time.time()# 准备输出张量output_tensor = torch.empty_like(input_tensor)if i % 2 == 0:# 同步操作dist.all_to_all_single(output_tensor, input_tensor,output_split_sizes=output_split_sizes,input_split_sizes=input_split_sizes,group=group, async_op=False)else:# 异步操作req = dist.all_to_all_single(output_tensor, input_tensor,output_split_sizes=output_split_sizes,input_split_sizes=input_split_sizes,group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 在 CPU 上模拟 all_to_all_single 操作,计算期望的输出all_cpu_tensors = []for p in rank_list:np.random.seed(i + p * 1000)cpu_tensor_p = torch.from_numpy(np.random.uniform(-10, 10, shape).astype(dtype)).cpu().float()all_cpu_tensors.append(cpu_tensor_p)# 将每个进程的输入张量在第一个维度上拆分cpu_splits = [t.split(split_size, dim=0) for t in all_cpu_tensors]# 收集当前进程应接收的子张量(来自所有其他进程的对应部分)expected_chunks = [cpu_splits[p][group_rank] for p in range(group_size)]expected_output = torch.cat(expected_chunks, dim=0).type(input_tensor.dtype)# 获取实际输出并转换到 CPUoutput_tensor_cpu = output_tensor.cpu().type(input_tensor.dtype)# 检查输出中是否存在无限大或非数字mse = validate_outputs(output_tensor_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)# 判断是否需要打印结果do_print = Falsemax_mse = np.max(mse_arr)max_duration = np.max(duration_arr)if max_mse > 1e-3:do_print = Trueif do_print or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration*1000:6.2f} "f"test_all_to_all_single dtype:{dtype.__name__} shape:{shape} Rank {rank}")def main():local_rank = int(os.environ['LOCAL_RANK'])dist.init_process_group(backend=backend)if not dist.is_initialized():returnparser = argparse.ArgumentParser(description='测试通信操作')parser.add_argument('--level', type=int, default=0, help='测试级别,控制测试的张量大小和维度')args = parser.parse_args()dtype_list = [np.float32, np.float16, np.int32, np.int8, np.int64]shape_format = [[i, [8]] for i in dtype_list]if args.level > 0:shape_format += [[i, [1024, 406]] for i in dtype_list]if args.level > 1:shape_format += [[i, [8, 256, 26]] for i in dtype_list]if args.level > 2:shape_format += [[i, [8, 64, 2, 2]] for i in dtype_list]if args.level > 3:shape_format += [[i, [8, 64, 2, 2]] for i in dtype_list]if args.level > 4:shape_format += [[i, [32, 33, 34, 3, 5]] for i in dtype_list]rank = dist.get_rank()device = torch.device(dev_type, local_rank)world_size = dist.get_world_size()for rank_list in generate_rank_lists(world_size):if rank == 0:print(f"Rank lists: {rank_list}")for val in shape_format:dtype, shape = valif rank == 0:print(f"Testing dtype: {dtype.__name__}, shape: {shape}")dist.barrier()test_reduce_scatter(dtype, shape, device, rank, rank_list, world_size)test_all_reduce(dtype, shape, device, rank, rank_list, world_size)test_reduce(dtype, shape, device, rank, rank_list, world_size)test_all_gather(dtype, shape, device, rank, rank_list, world_size)test_all_to_all(dtype, shape, device, rank, rank_list, world_size)test_scatter(dtype, shape, device, rank, rank_list, world_size)test_gather(dtype, shape, device, rank, rank_list, world_size)test_broadcast(dtype, shape, device, rank, rank_list, world_size)test_send_recv(dtype, shape, device, rank, rank_list, world_size)test_all_to_all_single(dtype, shape, device, rank, rank_list, world_size)print(f"finished: Rank {rank}")if dist.is_initialized():dist.destroy_process_group()if __name__ == '__main__':main()

4.创建docker容器

docker stop nccl_test
docker rm nccl_test
docker run --gpus all --shm-size=128g -id -e NVIDIA_VISIBLE_DEVICES=all \--privileged --net=host -v $PWD:/home -w /home \--name=nccl_test nvcr.io/nvidia/pytorch:23.07-py3 /bin/bash	
docker start nccl_test
docker exec -ti nccl_test bash

pytorch_776">5.查看当前pytorch版本

pip list | grep -w torch

输出

torch  2.1.0a0+b5021ba

6.运行测试程序

export NCCL_SOCKET_IFNAME=lo
export NCCL_IB_DISABLE=1
export NCCL_P2P_DISABLE=1
export NCCL_SHM_DISABLE=1
torchrun -m --nnodes=1 --nproc_per_node=4 --node_rank=0 \--master_addr=127.0.0.1 --master_port=12355 nccl_benchmark --level=0 2>&1 | tee log
cat log  | grep "mse" | awk '{print $1}' | sed "s/mse://g" | sort | uniq

http://www.ppmy.cn/ops/151001.html

相关文章

搭建Node.js后端

从头开始搭建一个Node.js后端&#xff0c;并实现查询历史数据的功能&#xff0c;下面是详细的步骤说明&#xff0c;包括环境配置、项目初始化、代码编写、以及服务器启动。 1. 环境配置 1.1 安装 Node.js 和 npm 首先&#xff0c;你需要在你的电脑上安装 Node.js 和 npm&…

C++实现设计模式---中介者模式 (Mediator)

中介者模式 (Mediator) 中介者模式 是一种行为型设计模式&#xff0c;它用一个中介对象来封装一组对象之间的交互。中介者通过协调多个对象之间的通信&#xff0c;避免对象之间的直接依赖&#xff0c;从而实现对象之间的松耦合。 意图 通过引入一个中介者对象&#xff0c;减少…

vscode 极简Linux下 cmake c++开发环境

​ 安装这三插件 vscode安装插件clangd 后报错 无法自动下载服务端 Failed to install clangd language server: FetchError: request to https://api.github.com/repos/clangd/clangd/releases/latest failed, reason: Failed to establish a socket connection to proxies…

使用AKTools本地部署AKShare财经数据接口库

使用AKTools部署AKShare财经数据接口库&#xff0c;AKShare的介绍见&#xff1a;基于 Python 的财经数据接口库&#xff1a;AKShare-CSDN博客 AKTools 是一款用于快速搭建 AKShare HTTP API 的工具&#xff0c;通过 AKTools 可以利用一行命令来启动 HTTP 服务&#xff0c;从而…

element el-input只能输入数字

背景&#xff1a; 在项目中做新增功能的时候&#xff0c;前端需要限制用户的输入&#xff0c;这里例如&#xff1a;在input 输入框只能输入数字。 第2点&#xff0c;如果我想限制的是&#xff0c;输入的是数字限制数字位数。 实现思路&#xff1a;input输入框只能输入数字&…

1/13+2

运算符重载 myString.h #ifndef MYSTRING_H #define MYSTRING_H #include <cstring> #include <iostream> using namespace std; class myString {private:char *str; //记录c风格的字符串int size; //记录字符串的实际长度int capacity; …

Gateway怎么实现限流的

Gateway怎么实现限流的 在API网关&#xff08;如Spring Cloud Gateway、Kong、Nginx等&#xff09;中实现限流是为了控制服务请求的频率&#xff0c;从而避免系统过载&#xff0c;确保稳定性和可用性。限流可以通过多种策略实现&#xff0c;常见的方法包括基于请求次数、时间窗…

EasyExcel的应用

一、简单使用 引入依赖&#xff1a; 这里我们可以使用最新的4.0.2版本&#xff0c;也可以选择之前的稳定版本&#xff0c;3.1.x以后的版本API大致相同&#xff0c;新的版本也会向前兼容&#xff08;3.1.x之前的版本&#xff0c;部分API可能在高版本被废弃&#xff09;&…