【分布式通信】NPKit,NCCL的Profiling工具

embedded/2024/9/23 6:24:12/

NPKit介绍

NPKit (Networking Profiling Kit) is a profiling framework designed for popular collective communication libraries (CCLs), including Microsoft MSCCL, NVIDIA NCCL and AMD RCCL.
It enables users to insert customized profiling events into different CCL components, especially into giant GPU kernels.
These events are then automatically placed onto a unified timeline in Google Trace Event Format, which users can then leverage trace viewer to understand CCLs’ workflow and performance.

以NCCL为例,如何使用?

Usage

  1. NCCL 2.17.1-1版本,将文件夹下的 npkit-for-nccl-2.17.1-1.diff 添加到你的nccl源文件中。
    实测nccl2.17和2.18都可以用。使用方法:
    git apply ../NPKit/nccl_sample/npkit-for-nccl-2.17.1-1.diff
    如果是对应的RCCL和MSCCL版本,方法类似。但如果是自己维护的其他分支,最好还是自己看看。

  2. NPKit只有在CPU和GPU没以后overlap的时候使用,所以 NPKIT_FLAGS 也要遵从这个规则。同时 npkit_launcher.sh里面的参数也要对应正确。
    我发现这个仓库提供的bash脚本有一点问题,编译的时候容易失败。所以我把bash脚本中的编译部分注释掉了,在外面编译。编译时记得检查对应的NPKIT_FLAGS
    这个仓库通过编译宏来控制profiling监控的对象,所以如果你这次做了all_reduce,下次想测alltoall,就得换一个NPKIT_FLAGS重新编译。

  3. nccl_testnpkit_runner.sh对应参数正确. 仅支持每个线程有1个GPU, 因此nccl_test运行参数记得是 -g 1

  4. 运行bash npkit_launcher.sh. 这个脚本会调用npkit_runner.

  5. 生成文件 npkit_event_trace.json ,可以用谷歌浏览器打开看。在浏览器那一栏输入chrome://tracing, 然后打开对应文件即可。

在这里插入图片描述

以下是nccl_sample中解析dump为trace的代码。

import argparse
import os
import jsonfrom queue import Queuedef parse_npkit_event_header(npkit_event_header_path):npkit_event_def = {'id_to_type': {}, 'type_to_id': {}}with open(npkit_event_header_path, 'r') as f:lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]line_idx = 0while line_idx < len(lines):if lines[line_idx].startswith('#define NPKIT_EVENT_'):fields = lines[line_idx].split()if len(fields) == 3:event_type = fields[1]event_id = int(fields[2], 0)npkit_event_def['type_to_id'][event_type] = event_idnpkit_event_def['id_to_type'][event_id] = event_typeline_idx += 1return npkit_event_defdef parse_gpu_clock_scale(gpu_clock_file_path):with open(gpu_clock_file_path, 'r') as f:freq_in_khz = f.read()return float(freq_in_khz) * 1e3 / 1e6def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):with open(cpu_clock_num_file_path, 'r') as f:num = float(f.read())with open(cpu_clock_den_file_path, 'r') as f:den = float(f.read())return den / num / 1e6def parse_gpu_event(event_bytes):return {'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)}def parse_cpu_event(event_bytes):return {'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)}def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx))raw_event_size = 16curr_cpu_base_time = Nonecurr_gpu_base_time = Nonegpu_events = []event_type_to_seq = {}with open(gpu_event_file_path, 'rb') as f:raw_content = f.read()raw_content_size = len(raw_content)raw_content_idx = 0while raw_content_idx < raw_content_size:parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU':curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scalecurr_gpu_base_time = Noneelif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU':if curr_gpu_base_time is None:curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scaleelse:if curr_gpu_base_time is None:curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scaleevent_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']]phase = 'B' if event_type.endswith('_ENTRY') else 'E'gpu_events.append({'ph': phase,'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time,'pid': rank,'tid': buf_idx + 1})if phase == 'B':if event_type not in event_type_to_seq:event_type_to_seq[event_type] = 0gpu_events[-1].update({'name': event_type,'cat': 'GPU','args': {'rank': rank,'buf_idx': buf_idx,'seq': event_type_to_seq[event_type],'rsvd_0': parsed_gpu_event['rsvd'],'size_0': parsed_gpu_event['size']}})event_type_to_seq[event_type] += 1else:gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']}delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts']gpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else gpu_events[-1]['args']['size'] / delta_time / 1e3raw_content_idx += raw_event_sizereturn gpu_eventsdef parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel))raw_event_size = 16cpu_events = []event_type_to_seq = {}fiber_is_usable = []fiber_open_ts = []slot_to_fiber_id = {}channel_shift = 1000with open(cpu_event_file_path, 'rb') as f:raw_content = f.read()raw_content_size = len(raw_content)raw_content_idx = 0while raw_content_idx < raw_content_size:parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']]phase = 'B' if event_type.endswith('_ENTRY') else 'E'cpu_events.append({'ph': phase,'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale,'pid': rank})slot = parsed_cpu_event['slot']if phase == 'B':# Open fiber eventfiber_id = 0while fiber_id < len(fiber_is_usable):if fiber_is_usable[fiber_id]:breakfiber_id += 1if fiber_id == len(fiber_is_usable):fiber_is_usable.append(True)fiber_open_ts.append(0.0)slot_to_fiber_id[slot] = fiber_idfiber_open_ts[fiber_id] = cpu_events[-1]['ts']fiber_is_usable[fiber_id] = Falseif event_type not in event_type_to_seq:event_type_to_seq[event_type] = 0cpu_events[-1].update({'name': event_type,'cat': 'CPU','args': {'rank': rank,'channel': channel,'slot': parsed_cpu_event['slot'],'seq': event_type_to_seq[event_type],'size_0': parsed_cpu_event['size']}})event_type_to_seq[event_type] += 1else:# Close fiber eventfiber_id = slot_to_fiber_id[slot]slot_to_fiber_id.pop(slot)last_ts = fiber_open_ts[fiber_id]fiber_is_usable[fiber_id] = Truedelta_time = max(0.001, cpu_events[-1]['ts'] - last_ts)cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']}cpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else cpu_events[-1]['args']['size'] / delta_time / 1e3cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shiftraw_content_idx += raw_event_sizereturn cpu_eventsdef convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')]cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')]ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files]))buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files]))channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files]))trace = {'traceEvents': []}for rank in ranks:cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank)cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank)cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank)gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)for buf_idx in buf_indices:gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale)trace['traceEvents'].extend(gpu_events)for channel in channels:cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)trace['traceEvents'].extend(cpu_events)trace['traceEvents'].sort(key=lambda x : x['ts'])trace['displayTimeUnit'] = 'ns'os.makedirs(output_dir, exist_ok=True)with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f:json.dump(trace, f)if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.')parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.')parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.')args = parser.parse_args()npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)

测试效果

我在8卡A100上跑了一下ring。
也不知道对不对。
在这里插入图片描述
在这里插入图片描述


http://www.ppmy.cn/embedded/22873.html

相关文章

无人机+集群组网+单兵图传:空地一体化组网技术详解

空地一体化组网技术是一种结合了无人机、集群自组网和单兵图传等多种技术的先进通信解决方案。这种技术方案的主要目的是在前线事故现场和后方指挥中心之间建立一个高效、稳定的通信链路&#xff0c;以确保信息的实时传输和指挥的顺畅进行。 首先&#xff0c;前端视频采集部分&…

MOS(My Oracle Support)怎么用?Oracle DBA必备技能!

MOS简介 老Oracle DBAer都知道MOS的前身是Metalink&#xff0c;2012年MOS替换掉原有的Metalink平台&#xff1b;MOS是Oracle提供的新一代在线支持平台&#xff0c;MOS平台整合了以前的Metalink功能&#xff0c;并提供了更强大的功能和用户体验。它允许客户提交和跟踪技术支持…

如何替代传统的方式,提高能源企业敏感文件传输的安全性?

能源行业是一个关键的基础设施领域&#xff0c;它涉及能源的勘探、开采、生产、转换、分配和消费。随着全球经济的发展和人口的增长&#xff0c;能源需求持续上升&#xff0c;这对能源行业的可持续发展提出了挑战。能源行业的传输场景多种多样&#xff0c;需要重点关注能源企业…

Elasticsearch索引别名:管理与优化数据访问

索引别名是Elasticsearch提供的一项强大功能&#xff0c;它允许将一个或多个索引映射到一个易于记忆且可复用的名称&#xff0c;从而简化索引管理、支持数据迁移、优化查询性能以及实现数据过滤与路由。本文将详细阐述索引别名的创建与删除、配合数据过滤与数据路由的应用场景及…

CSS基础:4类组合选择器以及5个注意事项

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃-大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端工具”&#xff0c;可获取 Web 开发工具合…

am62x linux sdk环境搭建

文章目录 3 SDK单机环境搭建1 下载SDK2 安装SDK3 SDK目录4 SDK编译5 编译变量定义工具安装编译目标编译示例6 编译kernel7 编译uboot8 工具链9 测试验证参考3 SDK单机环境搭建 1 下载SDK 入口1:

WPF —— MVVM command如何传递参数

点击按钮把窗体关闭 把页面的控件传递到自定义指令的函数中 FindAncestor 找到该组件的祖先元素 AncestorType{x:Type Window} 祖先元素类型为window CommandParameter 自定义指令传递参数 自定义指令 public class MyCommand : ICommand {public event Ev…

spring cloud eureka 初始化报错(A bean with that name has already been defined)

报错内容 The bean ‘eurekaRegistration’, defined in class path resource [org/springframework/cloud/netflix/eureka/EurekaClientAutoConfiguration E u r e k a C l i e n t C o n f i g u r a t i o n . c l a s s ] , c o u l d n o t b e r e g i s t e r e d . A …