NVIDIA NCCL 源码学习(十四)- NVLink SHARP

ops/2024/9/19 0:40:43/ 标签: gpu, cuda, nccl

背景

上节我们介绍了IB SHARP的工作原理,进一步的,英伟达在Hopper架构机器中引入了第三代NVSwitch,就像机间IB SHARP一样,机内可以通过NVSwitch执行NVLink SHARP,简称nvls,这节我们会介绍下NVLink SHARP如何工作的。

后续为了方便都是以nranks为2举例的,但值得注意的是nranks为2实际上不会用到nvls。

图搜索

ncclResult_t ncclNvlsInit(struct ncclComm* comm) {...if (comm->nvlsSupport == 1) comm->nvlsChannels = std::max(comm->config.minCTAs, std::min(comm->config.maxCTAs, (int)ncclParamNvlsChannels()));return ncclSuccess;
}

init过程中主要是判断是否支持,如果支持的话会将nvlsSupport设置为1,然后设置comm->nvlsChannels,这里需要注意下,这个是kernel实际启动的block数,而不是搜索出来的channel数。

然后开始执行搜索过程,nvls的channel和其他算法搜索出来的channel不太一样,我们具体看下

  nvlsGraph.pattern = NCCL_TOPO_PATTERN_NVLS;nvlsGraph.minChannels = 1; nvlsGraph.maxChannels = MAXCHANNELS;if (comm->nvlsSupport) {NCCLCHECKGOTO(ncclTopoCompute(comm->topo, &nvlsGraph), ret, fail);NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, &nvlsGraph), ret, fail);}

设置pattern为NCCL_TOPO_PATTERN_NVLS,然后开始搜索,通过pattern确定到backToNet和backToFirstRank均为-1。

ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) {int backToNet, backToFirstRank;NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank));if (system->nodes[NET].count) {} else {if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {NCCLCHECK(ncclTopoSearchTryGpu(system, graph, saveGraph, 0, backToNet, backToFirstRank, 0, time, -1, -1, graph->nChannels));return ncclSuccess;} ...}return ncclSuccess;
}

此时graph->nChannels为0。

ncclResult_t ncclTopoSearchTryGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time, int type, int index, int g) {const uint64_t flag = 1ULL<<(graph->nChannels);struct ncclTopoNode* gpu;NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, 1, &gpu));if (gpu) {gpu->used ^= flag;NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, gpu, step, backToNet, backToFirstRank, forcedOrder, time));gpu->used ^= flag;NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, -1, &gpu));} return ncclSuccess;
}     

由于type为-1,因此ncclTopoFollowPath直接返回gpu0,从gpu0开始搜索。

ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, struct ncclTopoNode* gpu, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time) {if ((*time) <= 0) return ncclSuccess;(*time)--;int ngpus = system->nodes[GPU].count;if (step == ngpus) {}graph->intra[graph->nChannels*ngpus+step] = gpu->gpu.rank;int g = gpu - system->nodes[GPU].nodes;if (step == backToNet) {} else if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {NCCLCHECK(ncclTopoSearchTryNvls(system, graph, saveGraph, g, ngpus, time));} else if (step < system->nodes[GPU].count-1) {} else if (step == backToFirstRank) {} else {}return ncclSuccess;
}

将0号GPU填到graph->intra,由于pattern为NCCL_TOPO_PATTERN_NVLS,因此直接执行ncclTopoSearchTryNvls。

ncclResult_t ncclTopoSearchTryNvls(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int g, int ngpus, int *time) {struct ncclTopoNode* nvs;struct ncclTopoNode* gpu;int d0=0; // See if there is enough bandwidth for NVS->GPU trafficdo {NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? 2 : 1, &gpu));d0++;} while (gpu && d0 < system->nodes[GPU].count);if (gpu == NULL) {d0--;} else {int d1=0; // See if there is enough bandwidth for GPU->NVS trafficdo {NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? 2 : 1, &nvs));d1++;} while (nvs && d1 < system->nodes[GPU].count);if (nvs == NULL) {d1--;} else { // Both directions worked. Move on to the next path.NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, NULL, ngpus, -1, -1, 0, time));}while (d1) {d1--;NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? -2 : -1, &nvs));}}while (d0) {d0--;NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? -2 : -1, &gpu));}return ncclSuccess;
}

这里就是判断带宽是否满足要求,我们先看下实际机器中GPU和NVSwitch的拓扑如图1所示

在这里插入图片描述

图 1

但是因为NVSwitch对用户来说是透明的,因此NCCL中构建的拓扑实际下所示

在这里插入图片描述

图 2

假设现在的搜索条件中带宽为bw,g为GPU0,那么这里搜索一个channel的逻辑是判断所有GPU节点到NVSwitch的双向带宽是否大于bw,如果大于的话,则减去bw,特殊的是GPU0,需要判断现有链路带宽是否大于2 * bw,这个原因后边会介绍。

在这里插入图片描述

图 3

然后继续执行ncclTopoSearchRecGpu,注意这里step指定为了ngpus,所以就完成了一个channel的搜索,nChannels变为1,那么下次执行ncclTopoSearchTryGpu的时候将从GPU1开始,重复这一过程知道搜索到了ngpus个channel,以四卡为例,搜索出的channel如下所示

0
1
2
3

其他算法的channel表示了节点内的传输顺序,而nvls搜索出来的channel不太一样,比如第一个channel里的0,nccl称为nvlsHead,他用来表示某一段内存的reduce之类的工作由谁来负责,后边我们会看到。

channel连接

static ncclResult_t connectNvls(struct ncclComm* comm, int* nvlsHeads, struct ncclTopoGraph* nvlsGraph) {int nHeads = nvlsGraph->nChannels;int headRank = -1; for (int h=0; h<nHeads; h++) {if (nvlsGraph->intra[h*comm->localRanks] == comm->rank) headRank = h;}for (int c=0; c<comm->nvlsChannels; c++) {struct ncclChannel* channel = comm->channels+c;channel->nvls.nHeads = nHeads;for (int h=0; h<nHeads; h++) channel->nvls.up[h] = comm->nRanks+1+h;for (int h=nHeads; h<NCCL_MAX_NVLS_ARITY; h++) channel->nvls.up[h] = -1; channel->nvls.down = comm->nRanks+1+headRank;channel->nvls.out = -1;       // NVLS+SHARP not yet implemented.channel->nvls.headRank = headRank;channel->nvls.treeUp = channel->nvls.treeDown[0] = channel->nvls.treeDown[1] = channel->nvls.treeDown[2] = -1; channel->nvls.node = comm->node;channel->nvls.nNodes = comm->nNodes;}if (comm->nNodes == 1) return ncclSuccess;
}

计算headRank,即第几个channel的节点是自己这个rank,然后开始设置所有的nvls channel,这里的up和down用于索引peers,由于从nRanks+1开始才是nvls的链接,所以这里要加上nRanks+1,up是所有的head,down是headRank,其实就是自己。

内存注册

内存注册的整体流程如图4所示,首先通过cuMulticastCreate创建一个multicast对象,图中handle指向这个multicast对象,然后每个GPU通过cuMulticastAddDevice将当前device和这个multicast对象关联起来,然后申请显存,最后通过cuMulticastBindAddr或者cuMulticastBindMem将申请到的显存和handle关联起来。
在这里插入图片描述

图 4

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...size_t buffSize = comm->buffSizes[NCCL_PROTO_SIMPLE];size_t memSize = NVLS_MEM_ALIGN_SIZE;size_t nvlsPerRankSize = nChannels * 2 * (buffSize + memSize);size_t nvlsTotalSize = nvlsPerRankSize * nHeads;char* shareableHandle = resources->shareableHandle;NCCLCHECKGOTO(nvlsGetProperties(comm, resources, dev, comm->localRanks, nvlsTotalSize), res, cleanup);...
}

buffSize为SIMPLE协议的buff大小,memSize用于保存head,tail,然后计算一共需要分配多少内存,nHeads为搜索出来的channel数量,即nRanks,后边会看到为什么内存大小是这样的,然后将内存总大小和localRanks等信息保存到resources。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...if (comm->localRank == 0) {NCCLCHECKGOTO(nvlsGroupCreate(comm, &resources->properties, comm->localRank, comm->localRanks, &resources->mcHandle, shareableHandle), res, cleanup);NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);} else {NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);NCCLCHECKGOTO(nvlsGroupConnect(comm, shareableHandle, comm->localRankToRank[0], &resources->mcHandle), res, cleanup);}   ...
}

rank0执行nvlsGroupCreate,通过cuMulticastCreate创建一个multicast对象,保存在resources->mcHandle,由于要跨进程共享,所以需要转成shareable handle,这里是直接memcpy的。

ncclResult_t nvlsGroupCreate(struct ncclComm *comm, CUmulticastObjectProp *prop, int rank, unsigned int nranks, CUmemGenericAllocationHandle *mcHandle, char *shareableHandle) {size_t size = prop->size;CUCHECK(cuMulticastCreate(mcHandle, prop));memcpy(shareableHandle, mcHandle, sizeof(CUmemGenericAllocationHandle));return ncclSuccess;
}

然后所有rank执行bootstrapIntraNodeBroadcast,rank0将multicast对象的共享handle广播到所有rank的shareableHandle,其他rank在得到shareableHandle之后通过nvlsGroupConnect转成mcHandle。然后通过cuMulticastAddDevice将当前卡bind到mcHandle,这样所有rank就都拿到了mcHandle对应的multicast对象。
ncclResult_t nvlsGroupBindMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {size_t size = resources->size;size_t granularity;CUdeviceptr ptr = 0;CUmemAllocationProp prop;memset(&prop, 0, sizeof(prop));prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;prop.location.id = resources->dev;prop.requestedHandleTypes = NVLS_CU_MEM_HANDLE_TYPE;CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_RECOMMENDED));resources->ucGran = granularity;// Map a VA for UC memoryCUCHECK(cuMemAddressReserve(&ptr, size, granularity, 0U, 0));// Alloc local physical mem for this NVLS groupCUCHECK(cuMemCreate(&resources->ucHandle, size, &prop, 0));CUCHECK(cuMemMap(ptr, size, 0, resources->ucHandle, 0));CUCHECK(cuMemSetAccess(ptr, size, &resources->accessDesc, 1));CUDACHECK(cudaMemset((void*)ptr, 0, size));resources->ucBuff = (char*)ptr;CUCHECK(cuMulticastBindMem(resources->mcHandle, 0/*mcOffset*/, resources->ucHandle, 0/*memOffset*/, size, 0/*flags*/));return ncclSuccess;
}

然后开始分配物理内存并映射到虚拟地址空间,首先预留一段虚拟地址空间到ptr,然后分配物理内存到ucHandle,再将ucHandle指向的物理内存map到ptr,将ptr赋值给ucBuff,如图5所示。

在这里插入图片描述

图 5

最后通过cuMulticastBindMem将ucHandle对应的物理内存bind到mcHandle。

然后开始执行nvlsGroupMapMem将mcHandle映射到虚拟地址空间。

ncclResult_t nvlsGroupMapMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {size_t size = resources->size;CUdeviceptr ptr = 0;// Create a VA for the NVLSCUCHECK(cuMemAddressReserve(&ptr, size, resources->granularity, 0U, 0));// Map the VA locallyCUCHECK(cuMemMap(ptr, size, 0, resources->mcHandle, 0));resources->mcBuff = (char*)ptr;INFO(NCCL_NVLS, "NVLS Mapped MC buffer at %p size %zi", resources->mcBuff, size);// Having completed the BindMem we can now call SetAccess// NB: It will block until all ranks have bound to the GroupCUCHECK(cuMemSetAccess((CUdeviceptr)resources->mcBuff, size, &resources->accessDesc, 1));return ncclSuccess;
}

同样的,预留虚拟地址空间空间到ptr,然后将mcHandle映射到ptr,保存在mcBuff。此时如图6所示

在这里插入图片描述

图 6

此时这块物理内存被映射到了ucBuff和mcBuff,ucBuff是Unicast buffer,对他的访问只会影响到当前device的内存,mcBuff是Multicast buffer,对他的访问将被NVSwitch广播到所有被添加到mcHandle的device。

然后开始将内存记录到各个peer的connection的buff。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...for (int h = 0; h < nHeads; h++) {int nvlsPeer = comm->nRanks + 1 + h;for (int c = 0; c < nChannels; c++) {struct ncclChannel* channel = comm->channels + c;char* mem = NULL;struct ncclChannelPeer* peer = channel->peers[nvlsPeer];// Reduce UC -> MCmem = resources->ucBuff + (h * 2 * nChannels + c) * (buffSize + memSize);peer->send[1].transportComm = &nvlsTransport.send;peer->send[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->send[1].conn.head = (uint64_t*)(mem + buffSize);peer->send[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);mem = resources->mcBuff + (h * 2 * nChannels + c) * (buffSize + memSize);peer->recv[0].transportComm = &nvlsTransport.recv;peer->recv[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->recv[0].conn.head = (uint64_t*)(mem + buffSize);peer->recv[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);peer->recv[0].conn.flags |= NCCL_NVLS_MIN_POLL;// Broadcast MC -> UCmem = resources->ucBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);peer->recv[1].transportComm = &nvlsTransport.recv;peer->recv[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->recv[1].conn.head = (uint64_t*)(mem + buffSize);peer->recv[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);mem = resources->mcBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);peer->send[0].transportComm = &nvlsTransport.send;peer->send[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->send[0].conn.head = (uint64_t*)(mem + buffSize);peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL;CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);}}...
}

这一过程完成后将如图7所示。
在这里插入图片描述

图 7

图中黄色的为ucBuff,蓝色的为mcBuff,mcBuff即PTX中的multimem,multimem操作如下:

The multimem.* operations operate on multimem addresses and accesses all of the multiple memory locations which the multimem address points to.

以ld_reduce为例:

multimem.ld_reduce{.ldsem}{.scope}{.ss}.op.type d, [a];

假设GPU0对peer[0]->send[1].buff执行multimem.ld_reduce,这将会load GPU0和GPU1对应位置的数据,然后执行reduce,结果保存在d。

以ReduceScatter为例介绍一下kernel流程

2.19版本改动较多,所以在看nvls kernel之前,我们先介绍下新版kernel执行的过程。

kernel的转发

首先看下kernel是如何launch的,以及如何一步步执行到对应的proto,algo等对应的device函数中的。
由于有多种api,reduce类型,数据类型,算法,协议,而kernel个是这些变量笛卡尔积,所以nccl用generate.py生成这些kernel定义,主要生成两个数组,ncclDevKernelForFunc和ncclDevFuncTable,分别为global函数和device函数。

static ncclResult_t scheduleCollTasksToPlan(...) {...NCCLCHECK(computeColl(&info, &workFuncIndex, &workElem, &proxyOp));...if (!plan->kernelSpecialized) {plan->kernelFn = ncclDevKernelForFunc[workFuncIndex];plan->kernelSpecialized = ncclDevKernelForFuncIsSpecialized[workFuncIndex];}...
}

enqueue过程通过computeColl计算出workFuncIndex,然后记录下kernelFn为ncclDevKernelForFunc[workFuncIndex],以ReduceScatter sum为例,得到的workFuncIndex为485,查询ncclDevKernelForFunc[485]为ncclDevKernel_ReduceScatter_Sum_f32_RING_LL,那么launch kernel就会执行这一kernel,这里注意下,我们实际用的是SIMPLE协议,但是这个kerne为LL,我们看下如何进一步转发的。

DEFINE_ncclDevKernel(ReduceScatter_Sum_f32_RING_LL, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_LL, 483)
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>>(comm, channelMask, workHead); \}

ncclDevKernel_ReduceScatter_Sum_f32_RING_LL这个函数定义如上,specializedFnId为483,然后直接执行ncclKernelMain。

在看ncclKernelMain之前我们先看下现有参数信息存放。

__shared__ ncclShmemData ncclShmem;struct ncclShmemGroup {ncclConnInfo *recvConns[NCCL_MAX_NVLS_ARITY];ncclConnInfo *sendConns[NCCL_MAX_NVLS_ARITY];void* srcs[NCCL_MAX_NVLS_ARITY+1];void* dsts[NCCL_MAX_NVLS_ARITY+1];union {unpackGroupShmem unpack;} devicePlugin;
};struct ncclShmemData {struct ncclShmemGroup groups[NCCL_MAX_GROUPS];uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1];int channelId;int aborted;alignas(16) struct ncclDevComm comm;alignas(16) struct ncclDevChannel channel;alignas(16) struct ncclWork work;alignas(16) union {unpackShmem unpack;} devicePlugin;
};

ncclShmem位于共享内存,存储了kernel需要的参数信息,比如channelId,comm,channel等,一个block中的线程都会使用这些信息用于收发数据。之前版本中一个block所有线程的peer是一样的,而新版中不同线程可能会对应不同的peer,比如send/recv,一个block可以收发8个peer,再比如本节介绍的nvls,一个block中不同warp使用流水线的方式完成整体流程,因此引入了数据结构ncclShmemGroup groups,一个group表示执行相同逻辑的线程组,group所需要的conn,srcs,dsts等信息存储在groups中。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {int tid = threadIdx.x;if (tid < WARP_SIZE) {int x = tid;if (channelMask & (1ull<<x)) {int y = __popcll(channelMask & ((1ull<<x)-1));if (blockIdx.x == y) ncclShmem.channelId = x;}   ... }__syncthreads(); // publish ncclShmem.channelIdint channelId = ncclShmem.channelId;...
}

选择block和channel的对应关系,就是计算当前block应该处理channel。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {...while (true) {// Notify host that all fifo reads are complete....if (0 <= SpecializedFnId && ncclShmem.work.header.funcIndex == (unsigned)SpecializedFnId) {SpecializedRunWork().run(&ncclShmem.work);} else {ncclDevFuncTable[ncclShmem.work.header.funcIndex]();}   int workIxNext = ncclShmem.work.header.workNext;__syncthreads();...}...
}	

funcIndex为485,而SpecializedFnId为483,因此会再次去ncclDevFuncTable中找funcIndex对应的函数,函数为ncclDevFunc_ReduceScatter_Sum_f32_RING_SIMPLE,这样就找到了需要执行的函数。

DEFINE_ncclDevFunc(ReduceScatter_Sum_f32_RING_SIMPLE, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE)#define DEFINE_ncclDevFunc(suffix, coll, redop, ty, algo, proto) \__device__ void ncclDevFunc_##suffix() { \RunWork<coll, ty, redop<ty>, algo, proto>().run(&ncclShmem.work); \}template<ncclFunc_t Fn, typename T, typename RedOp, int Algo, int Proto>
struct RunWork {// This __forceinline__ is necessary. The compiler was inserting a function call// here from the LL ncclKernel.__device__ __forceinline__ void run(ncclWork *w) {int wid = threadIdx.x / WARP_SIZE;ncclWorkElem* we = w->header.type == ncclWorkTypeRegColl ? &w->regElems[0].elem : &w->elems[0];int stride = w->header.type == ncclWorkTypeRegColl ? sizeof(ncclWorkElemReg) : sizeof(ncclWorkElem);#pragma unroll 1while ((char*)we + stride <= (char*)(w+1) && we->isUsed) {if (wid < we->nWarps) {RunWorkElement<Fn, T, RedOp, Algo, Proto>().run(we);}we = (ncclWorkElem*)((char*)we + stride);}}
};

我们看下这个函数的定义,可以得到,Fn为ncclFuncReduceScatter,T为float,RedOp为FuncSum<float>,algo为NCCL_ALGO_RING,协议为NCCL_PROTO_SIMPLE,然后开始执行runRing

  template<typename T, typename RedOp, typename Proto>__device__ __forceinline__ void runRing(ncclWorkElem *args) {...const ssize_t loopSize = nChannels*chunkSize;const ssize_t size = args->count;Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t realChunkSize;...realChunkSize = int(realChunkSize);ssize_t chunkOffset = gridOffset + bid*int(realChunkSize);/// begin ReduceScatter steps ///ssize_t offset;int nelem = min(realChunkSize, size-chunkOffset);int rankDest;// step 0: push data to next GPUrankDest = ringRanks[nranks-1];offset = chunkOffset + rankDest * size;prims.send(offset, nelem);

第一步,执行自己rank对应的block数据的send,send就是将数据发送到下一个rank的buffer。

// k-2 steps: reduce and copy to next GPUfor (int j=2; j<nranks; ++j) {rankDest = ringRanks[nranks-j];offset = chunkOffset + rankDest * size;prims.recvReduceSend(offset, nelem);}

然后接下来的nranks - 2次步骤,执行recvReduceSend,就是将前一个rank发送到自己buffer中的数据和自己的用户输入数据中对应位置执行reduce,然后发送给下一个rank。

      // step k-1: reduce this buffer and data, which will produce the final resultrankDest = ringRanks[0];offset = chunkOffset + rankDest * size;prims.recvReduceCopy(offset, chunkOffset, nelem, /*postOp=*/true);}}
}

最后一次执行recvReduceCopy,就是将前一个rank发送过来的数据和自己用户输入中对应位置执行reduce,并拷贝到用户输出。

primitive初始化

回顾下ReduceScatter中的primitives的构造,recvPeers为ring中的前一个rank,sendPeers为ring中下一个rank,inputBuf和outputBuf为用户执行api提供的输入输出buff,group为默认参数0。redOpArg用于比如mean的操作,会被设置为nranks,在reduceCopy的时候会除以nranks,本例为sum操作,可以忽略redOpArg。

      Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);__device__ Primitives(int tid, int nthreads, int const *recvPeers, int const *sendPeers,void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,uint8_t connIndexRecv = 0, uint8_t connIndexSend = 0, struct ncclWorkElem* e = nullptr, int stepSize_=0):tid(tid), nthreads(nthreads), tidInBlock(threadIdx.x), group(group),stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) {} 

模板参数中Direct和P2p为0,Fan为FanSymmetric<1>,作用是记录有几个recv和send,此时MaxRecv和MaxArity均为1。

template<int MaxArity>
struct FanSymmetric {static constexpr int MaxRecv = MaxArity, MaxSend = MaxArity;int n;FanSymmetric() = default;__device__ FanSymmetric(int nrecv, int nsend): n(nrecv) {// assert(nrecv == nsend && nrecv <= MaxArity);}__device__ int nrecv() const { return n; }__device__ int nsend() const { return n; }
};

然后继续看初始化过程

  __device__ Primitives(...)// For send operations, we need an extra warp to overlap the threadfence and the copythis->nworkers = nthreads - (MaxSend > 0 && nthreads-WARP_SIZE >= 64 ? WARP_SIZE : 0); int nrecv=0, nsend=0;while (nrecv < MaxRecv && recvPeers[nrecv] != -1) nrecv++;while (nsend < MaxSend && sendPeers[nsend] != -1) nsend++;this->fan = Fan(nrecv, nsend);constexpr int ThreadPerSync = 8;static_assert(MaxSend <= ThreadPerSync && MaxRecv <= ThreadPerSync, "Not enough threads to cover all peers");int g = tid / ThreadPerSync;int ng = nthreads / ThreadPerSync;index = tid % ThreadPerSync;flags = 0;if (g == 0) {if (index < nrecv) flags |= RoleWaitRecv;if (index == nrecv) flags |= RoleInput;} else if (g == 1) {if (index < nsend) flags |= RoleWaitSend;if (index == nsend) flags |= RoleOutput;} else if (g == ng - 2) {if (index < nrecv) flags |= RolePostRecv;} else if (g == ng - 1) {if (index < nsend) flags |= RolePostSend;}...}

nthreads为执行的总线程数,本例子中等于block中的线程数,nworkers是实际干活的线程数,由于发送的时候需要一个warp执行threadfence,因此实际干活的线程数为nthreads减去一个warp,不过当总的warp数少的时候就不会使用独立的同步warp。记录nsend和nrecv,此时均为1。

然后开始设置每个线程的role,将nthreads按照8分成多个小组,假设一共有n-1个组,假设recvPeer和sendPeer都有两个,那么各个线程的role分配如图8所示,其中WaitRecv表示这个线程负责等待直到fifo中有数据可以接收,本例中g[0]的thr[0]负责等待第0个recvPeer,thr[1]负责第1个recvPeer,Input线程负责写入用户buff的地址,PostRecv负责在接收到数据后通知recvPeer,g[n-2]的thr[0]负责通知第0个recvPeer,thr[1]负责通知第1个recvPeer,同理对于send。
在这里插入图片描述

图 8

  __device__ __forceinline__ void loadRecvConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {if (flags & (RoleWaitRecv|RolePostRecv)) {auto *conn = &peer->recv[connIndex];step = conn->step;step = roundUp(step, SlicePerChunk*StepPerSlice);if (flags & RolePostRecv) {connStepPtr = conn->head;*connStepPtr = step; // Return credits in case we rounded up.}if (flags & RoleWaitRecv) {ncclShmem.groups[group].recvConns[index] = conn; // WaitRecv role saves since that's who needs it in setDataPtrs()flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;connStepPtr = conn->tail;connStepCache = loadStepValue(connStepPtr);flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;if (Direct) {...}if (flags & OffsFifoEnabled)connOffsFifoPtr = conn->offsFifo;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];}}}

只有RoleWaitRecv和RolePostRecv的线程才会执行loadRecvConn,读取step,之前章节中介绍过,step表示在fifo中的位置;RolePostRecv线程负责通知recvPeer,所以需要保存conn中的head指针到connStepPtr,RoleWaitRecv线程负责等待直到fifo中有新的数据,因此需要保存conn中的tail指针到connStepPtr,并将内容cache到connStepCache,以避免频繁的global mem读取,最后将conn->buff,即fifo,记录到connEltsFifo中。

  __device__ __forceinline__ void loadSendConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {if (flags & (RoleWaitSend|RolePostSend)) {auto *conn = &peer->send[connIndex];step = conn->step;step = roundUp(step, SlicePerChunk*StepPerSlice);if (flags & RolePostSend) {connStepPtr = conn->tail;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];}if (flags & RoleWaitSend) {ncclShmem.groups[group].sendConns[index] = conn; // WaitSend role saves since that's who needs it in setDataPtrs()flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;connStepPtr = conn->head;connStepCache = loadStepValue(connStepPtr);flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;if (flags & OffsFifoEnabled)connOffsFifoPtr = conn->offsFifo;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];...}}}

loadSendConn逻辑一样,RolePostSend线程负责通知send peer,因此持有tail指针,RoleWaitSend线程负责等待send peer,因此持有head指针,然后记录fifo。
最后是执行setDataPtrs,设置userBuff为用户的输入输出。

  __device__ void setDataPtrs(void const *inputBuf, void *outputBuf, uint64_t redOpArg, struct ncclWorkElemReg* e) {if (flags & RoleInput) {userBuff = (T*)inputBuf;ncclShmem.redOpArgs[0] = redOpArg;  // scaler for local input}   if (flags & RoleOutput) userBuff = (T*)outputBuf;...}

到这里就完成了初始化。

recvReduceSend

  __device__ __forceinline__ void recvReduceSend(intptr_t inpIx, int eltN, bool postOp=false) {genericOp<0, 0, 1, 1, Input, -1>(inpIx, -1, eltN, postOp);}template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { constexpr int DirectRecv = 1 && Direct && DirectRecv1;constexpr int DirectSend = 1 && Direct && DirectSend1;constexpr int Src = SrcBuf != -1; constexpr int Dst = DstBuf != -1;nelem = nelem < 0 ? 0 : nelem;int sliceSize = stepSize*StepPerSlice;sliceSize = max(divUp(nelem, 16*SlicePerChunk)*16, sliceSize/32);int slice = 0;int offset = 0;...}

模板参数中Recv表示是否需要执行recv,Send表示是否执行Send,SrcBuf表示输入中是否有用户的src buff,DstBuf表示输出中是否包含了用户的dst buff。然后计算得到DirectRecv和DirectSend为0,Src为1,Dst为0。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { ...if (tid < nworkers && offset < nelem) {do {sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;if (Src && (flags & (SrcBuf==Input ? RoleInput : RoleOutput)))ncclShmem.groups[group].srcs[0] = userBuff + srcIx + offset;if (Dst && (flags & (DstBuf==Input ? RoleInput : RoleOutput)))ncclShmem.groups[group].dsts[0] = userBuff + dstIx + offset;waitPeer<DirectRecv, DirectSend, Recv, Send, Src, Dst>(srcIx, dstIx, offset, sliceSize);...} while (slice < SlicePerChunk && offset < nelem);}   ...}

2.7.8中负责数据收发的工作线程和负责同步的线程都在一个循环里,会引入很多分支指令影响性能,新版中将逻辑拆分成了两个循环,以提高性能,工作线程执行第一个循环,同步线程执行第二个循环。

RoleInput线程将用户buff填入到srcs[0],然后执行waitPeer,waitPeer函数就是之前的waitSend和waitRecv,会等待直到可以发送和接收数据,并将数据地址填入到srcs和dsts。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {const bool isSendNotRecv = (Send && Recv) ? (flags & RoleWaitSend) : Send;const bool noRecvWait = DirectRecv && Src && (flags & DirectRead);        // no wait when directly reading from remote inputconst bool noSendWait = DirectSend && (flags & (DirectRead|DirectWrite)); // no wait in empty send (e.g. directScatter) or direct remote writeif (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||((flags & (Send*RoleWaitSend)) && !noSendWait)) {int spins = 0;while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {connStepCache = loadStepValue(connStepPtr);if (checkAbort(spins)) break;}   }...}   

noRecvWait和noSendWait都为0,RoleWaitSend线程的isSendNotRecv为1,由于持有的connStepPtr是head指针,所以他判断等待的逻辑是如果head指针加上队列容量小于step + StepPerSlice,那么不能执行send,否则会超过队列容量,因此循环等待;而RoleWaitRecv线程的isSendNotRecv为0,由于持有的connStepPtr是tail指针,所以他判断等待逻辑是如果step + StepPerSlice超过了队尾指针,说明队列中已经没有数据了,那么就需要等待。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {...if (flags & (Recv*RoleWaitRecv | Send*RoleWaitSend)) {if (isSendNotRecv && (flags & SizesFifoEnabled))connSizesFifoPtr[step%NCCL_STEPS] = nelts*sizeof(T);void **ptrs = isSendNotRecv ? (ncclShmem.groups[group].dsts + Dst): (ncclShmem.groups[group].srcs + Src);if (flags & OffsFifoEnabled)else if (isSendNotRecv && DirectSend) {} else if (!isSendNotRecv && DirectRecv) {}   else {ptrs[index] = connEltsFifo + (step%NCCL_STEPS)*stepSize;}   step += StepPerSlice;}   }

然后开始填充srcs和dsts数组,就是将自己持有fifo对应的slot填进去,然后更新step。所以对于recvReduceSend,srcs[0]为用户buff,srcs[1]为前一个ran的fifo,dsts[0]为下一个rank的fifo,因此就能达到前边描述的作用,接收前一个rank的数据,和用户的输入buff执行reduce,然后发送给下一个rank。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { ...if (tid < nworkers && offset < nelem) {do {...subBarrier();int workSize = ncclShmem.aborted ? 0 : sliceSize;if (DirectRecv && ncclShmem.groups[group].srcs[0] == ncclShmem.groups[group].dsts[0]} else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem.groups[group].dsts[Dst] == nullptr) {} else {constexpr int PreOpSrcs = SrcBuf != Input ? 0 : DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1;reduceCopy<Unroll, RedOp, T,MultimemSrcs, Recv+Src, Recv*MaxRecv+Src,MultimemDsts, Send+Dst, Send*MaxSend+Dst, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, postOp,Recv*fan.nrecv()+Src, ncclShmem.groups[group].srcs,Send*fan.nsend()+Dst, ncclShmem.groups[group].dsts,workSize);}   barrier(); // This barrier has a counterpart in following looppostPeer<Recv, Send>(0 < sliceSize);offset += sliceSize;slice += 1;} while (slice < SlicePerChunk && offset < nelem);}   while (slice < SlicePerChunk) {sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;barrier(); // Has couterpart in preceding worker-only loop.postPeer<Recv, Send>(0 < sliceSize);offset += sliceSize;slice += 1;}   }

waitPeer完成之后,说明可以执行数据收发了,这里先执行subBarrier(),作用是同步一下所有的工作线程,保证在waitPeer完成之后才进入收发数据的逻辑。然后执行reduceCopy将数据从srcs完成reduce并拷贝到dsts。然后执行barrier(),barrier所有的线程,即工作线程加同步线程,因为同步线程只有在等到数据收发结束才能开始post,然后看下同步线程执行的postPeer。

  template<int Recv, int Send>inline __device__ void postPeer(bool dataStored) {if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {step += StepPerSlice;if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();st_relaxed_sys_global(connStepPtr, step);}   }

RolePost类线程需要更新step,然后将step写入到connStepPtr,对于RolePostRecv,持有的是head指针,直接接入就好;对于RolePostSend,持有的是tail指针,为了保证先完成数据的写之后再完成post,需要加一个fence,这里使用了acq_rel的屏障,其实这个场景使用release语义就是足够的,不过查了一下PTX,好像没有单独的release语义指令。对于读数据的场景,也是需要配对使用读屏障,但是nccl的实现使用的是volatile,这样可以bypass L1 cache,因此不需要使用屏障。

nvls

ReduceScatter kernel

      if (tid < tidEndScatter) {// Scatterusing Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * chunkSize;int nelem = min(chunkSize, size - offset);prims.scatter(offset, nvls->nHeads * size, nelem, size, -1, 0);}} 

scatter线程会通过prim执行scatter操作,sendPeers为up,因此就是所有的rank,inputBuf为用户的输入args->sendbuff,connIndexSend为1,因此load第1个send conn。

  __device__ __forceinline__ voidscatter(intptr_t inpIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift) {ScatterGatherOp<0, 0, 0, 1>(inpIx, -1, totalElem, peerElem, peerOffset, skip, shift, /*postOp=*/false);}template <int DirectRecv1, int DirectSend1, int Recv, int Send>__device__ __forceinline__ voidScatterGatherOp(intptr_t inpIx, intptr_t outIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift, bool postOp) {constexpr int DirectRecv = 1 && Direct && DirectRecv1;constexpr int DirectSend = 1 && Direct && DirectSend1;int offset = 0; // slice offsetint sliceSize = stepSize*StepPerSlice;int dataSize = max(DIVUP(peerElem, 16*SlicePerChunk)*16, sliceSize/32);  // per-peer slice size#pragma unrollfor (int slice=0; slice<SlicePerChunk; ++slice) {ssize_t realSize = max(0, min(dataSize, peerElem-offset));bool fenceNeeded = false;if (tid < nworkers) {if (Send) {// Scatter pre-scales data of input buffer only in non-Direct caseconstexpr int PreOpSrcs = DirectSend ? 0 : 1;if (flags & RoleInput) ncclShmem.groups[group].srcs[0] = userBuff + inpIx + offset;// realSize is not accurate here; but intra-node does not rely on sizes FIFOwaitPeer<0, DirectSend, 0, 1, 1, 0>(0, inpIx, offset, realSize);subBarrier();#pragma unroll// Loop over peersfor (int j=0; j<fan.nsend(); j++) {int i = (j+shift)%fan.nsend();ssize_t pOffset = i*peerOffset;// Skip the data I am responsible of reducing myselfif (skip >= 0 && i >= skip) pOffset += peerElem;void* src0 = (T*)ncclShmem.groups[group].srcs[0] + pOffset;ssize_t realPeerSize = min(realSize, totalElem-pOffset);if (realPeerSize > 0 && ncclShmem.groups[group].dsts[i] != nullptr) {reduceCopy<Unroll, RedOp, T, 0,1,1, 0,1,1, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, false, 1, &src0, 1, ncclShmem.groups[group].dsts+i, realPeerSize);// Mark for threadfence at the endfenceNeeded |= true;}}} else if (Recv) {}}fenceNeeded = barrierAny(fenceNeeded);postPeer<Recv, Send>(fenceNeeded);offset += realSize;}}

如图9所示,scatter做的就是将userBuff的数据按照peerOffset的间隔,发送到所有的sendPeer对应的buff,即图中的peer[0]->send[1].buff和peer[1]->send[1].buff。
在这里插入图片描述

图 9

对于reduce线程,sendPeers为NULL,recvPeers为nvls->down,connIndexRecv为1,因此load第一个recv conn,然后执行recv。

	  else if (tid < tidEndReduce) {// Reduce through NVLSusing Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>; Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, NULL, NULL, args->recvbuff,args->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * chunkSize;int nelem = min(chunkSize, size - offset);prims.recv(offset, nelem);}   }   

recv函数中执行会将dst设置为args->recvbuff,src为自己rank对应的Multicast buffer,如图10所示,执行完成之后,GPU0的recvBuff就拿到了所有卡对应值reduce的结果。
在这里插入图片描述

图 10

reduceCopy kernel

以ReduceScatter过程为例,我们看下reduceCopy kernel如何同时支持Unicast buffer和Multicast buffer的。

template<int Unroll, typename RedFn, typename T,int MultimemSrcs, int MinSrcs, int MaxSrcs,int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,typename IntBytes>
__device__ __forceinline__ void reduceCopy(int thread, int nThreads,uint64_t redArg, uint64_t *preOpArgs, bool postOp,int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,IntBytes nElts) {int lane = thread%WARP_SIZE;// If a multimem src is present then our biggest pack size is limited to what// is supported for this redfn/type.constexpr int BigPackSize = (MultimemSrcs == 0) ? 16 : LoadMultimem_BigPackSize<RedFn>::BigPackSize;IntBytes nBytesBehind = 0;IntBytes nBytesAhead = nElts*sizeof(T);#if __cpp_if_constexprif constexpr (BigPackSize > sizeof(T)) {#elseif (BigPackSize > sizeof(T)) {#endif// Check that all pointers are BigPackSize aligned.bool aligned = true;if (lane < nSrcs) aligned &= 0 == cvta_to_global(srcPtrs[lane]) % (BigPackSize + !BigPackSize);if (lane < nDsts) aligned &= 0 == cvta_to_global(dstPtrs[lane]) % (BigPackSize + !BigPackSize);aligned = __all_sync(~0u, aligned);if (aligned) {reduceCopyPacks<RedFn, T, Unroll, BigPackSize,MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>(nThreads, /*&*/thread, redArg, preOpArgs, postOp,nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);if (nBytesAhead == 0) return;reduceCopyPacks<RedFn, T, /*Unroll=*/1, BigPackSize,MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>(nThreads, /*&*/thread, redArg, preOpArgs, postOp,nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);if (nBytesAhead == 0) return;}}...
}

模板参数中,MultimemSrcs表示有几个输入是multimem,MultimemDsts表示有几个输出是multimem,入参中thread为tid,nThreads为总的线程数,共有nSrcs个输入,地址位于srcPtrs,共有nDsts个输出,存放于dstPtrs,nElts为元素个数,函数的作用就是将所有的src reduce,然后存到所有的dst。
nBytesBehind表示已经处理过了多少数据,nBytesAhead表示还有多少数据未处理。
然后开始看是否可以用向量化指令,BigPackSize就是load/store指令的粒度,如果输入为非Multimem,那么尝试使用16字节,即128位,如果是multimem,则需要判断Func和数据类型,本例中为FuncSum,因此也是16字节。

  template<typename Fn>struct LoadMultimem_BigPackSize {using T = typename Fn::EltType;static constexpr bool IsSum = std::is_same<Fn, FuncSum<T>>::value ||std::is_same<Fn, FuncPreMulSum<T>>::value ||std::is_same<Fn, FuncSumPostDiv<T>>::value;static constexpr bool IsMinMax = std::is_same<Fn, FuncMinMax<T>>::value;static constexpr bool IsFloat = IsFloatingPoint<T>::value;static constexpr int BigPackSize =IsFloat && IsSum && sizeof(T) < 8 ? 16 :IsFloat && IsSum ? 8 :IsFloat && IsMinMax && sizeof(T)==2 ? 16 :!IsFloat && (IsSum||IsMinMax) && sizeof(T)>=4 ? sizeof(T) :/*multimem.ld_reduce not supported:*/ 0;};

使用向量化的前提是输入输出有需要是对齐的,以对齐为例看下reduceCopyPacks的逻辑。

template<typename RedFn, typename T, int Unroll, int BytePerPack,int MultimemSrcs, int MinSrcs, int MaxSrcs,int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,typename IntBytes>
__device__ __forceinline__ void reduceCopyPacks(int nThreads, int &thread,uint64_t redArg, uint64_t *preOpArgs, bool postOp,int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,IntBytes &nBytesBehind, IntBytes &nBytesAhead) { // A hunk is the amount of contiguous data a warp consumes per loop iteration// assuming all threads partake.constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack;int nWarps = nThreads/WARP_SIZE;int warp = thread/WARP_SIZE;int lane = thread%WARP_SIZE;...
}

BytePerPack就是BigPackSize,即16字节,假设Unroll为4,那么一个warp的访存模式如图11,一个蓝框的长度为32个16字节,BytePerHunk为一个warp一次性处理的连续数据长度,即4个蓝框,warp里的第一个线程会访问箭头指向的4个蓝框里的第一个16字节。
在这里插入图片描述

图 11

然后初始化各个线程的初始位置

__device__ __forceinline__ void reduceCopyPacks(...) {// This thread's initial position.IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack);IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack);// Number of hunks to be consumed over all warps.IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk);// Advance collective position.nBytesBehind += nHunksAhead*BytePerHunk;nBytesAhead -= nHunksAhead*BytePerHunk;if (Unroll==1 && BytePerPack <= nBytesAhead) {// Only Unroll=1 can do partial hunks (where not all threads partake).nHunksAhead += 1;nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack));nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack);}nHunksAhead -= warp;RedFn redFn(redArg);uintptr_t minSrcs[MinSrcs + !MinSrcs];uintptr_t minDsts[MinDsts + !MinDsts];#pragma unrollfor (int s=0; s < MinSrcs; s++)minSrcs[s] = cvta_to_global(srcPtrs[s]) + threadBytesBehind;#pragma unrollfor (int d=0; d < MinDsts; d++)minDsts[d] = cvta_to_global(dstPtrs[d]) + threadBytesBehind;...
}

threadBytesBehind即当前线程的起始位置,threadBytesAhead为当前线程需要处理的数据量,然后将MinSrcs个src和minDsts个dst指针记录到minSrcs和minDsts。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {BytePack<BytePerPack> acc[Unroll];{ RedFn preFn(0 < PreOpSrcs ? preOpArgs[0] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (0 < MultimemSrcs) {// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[0]);} else {// Use volatile loads in case credits are polled for with volatile (instead of acquire).acc[u] = ld_volatile_global<BytePerPack>(minSrcs[0]);if (0 < PreOpSrcs) acc[u] = applyPreOp(preFn, acc[u]);}minSrcs[0] += WARP_SIZE*BytePerPack;}}...}

BytePack在这个场景下就是16字节,由一个union描述,acc用于存储reduce结果。

template<>
union alignas(16) BytePack<16> {BytePack<8> half[2];uint8_t u8[16];uint16_t u16[8];uint32_t u32[4];uint64_t u64[2];ulong2 ul2, native;
};

然后开始初始化acc,如果输入中没有multimem,那么会通过ld_volatile_global将第一个蓝框的对应的128b load到acc[0],然后循环Unroll,将所有蓝框的对应数据load到acc,然后看下ld_volatile_global。

#define DEFINE_ld_st_16__space(space, addr_cxx_ty, addr_reg_ty) \template<> \__device__ __forceinline__ BytePack<16> ld_##space<16>(addr_cxx_ty addr) { \BytePack<16> ans; \asm("ld." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \return ans; \} \template<> \__device__ __forceinline__ BytePack<16> ld_volatile_##space<16>(addr_cxx_ty addr) { \BytePack<16> ans; \asm("ld.volatile." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \return ans; \} \template<> \__device__ __forceinline__ void st_##space<16>(addr_cxx_ty addr, BytePack<16> value) { \asm("st." #space ".v2.b64 [%0], {%1,%2};" :: #addr_reg_ty(addr), "l"(value.u64[0]), "l"(value.u64[1]) : "memory"); \}
DEFINE_ld_st_16__space(global, uintptr_t, l)

BytePerPack为16,因此会执行ld_volatile_global<16>,其实就是ld.volatile.global.v2.b64将128b load到ans的u64[0]和u64[1]。
而当输入中包括了multimem之后,会通过applyLoadMultimem将数据load并reduce然后存储到acc;st_global<16>就是将BytePack的u64[0]和u64[1]通过st.global.v2.b64存储到addr。

#define SIZEOF_BytePack_field_u32 4
#define PTX_REG_BytePack_field_u32 "r"
DEFINE_Apply_LoadMultimem_sum_v4(float, f32, u32)
#define DEFINE_Apply_LoadMultimem_sum_v4(T, ptx_ty, pack_field) \template<> \struct Apply_LoadMultimem<FuncSum<T>, 4*(SIZEOF_BytePack_field_##pack_field)> { \static constexpr int PackSize = 4*(SIZEOF_BytePack_field_##pack_field); \__device__ static BytePack<PackSize> load(FuncSum<T> fn, uintptr_t addr) { \BytePack<PackSize> ans; \asm("multimem.ld_reduce.relaxed.sys.global.add.v4." #ptx_ty " {%0,%1,%2,%3}, [%4];" \: "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[0]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[1]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[2]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[3]) \: "l"(addr)); \return ans; \} \};

可以看到用的是multimem.ld_reduce.relaxed.sys.global.add.v4.f32将对4个float都执行了reduce操作结果存到acc。

完成第一个src的读取之后,继续读取其他的src到tmp,然后通过Apply_Reduce 执行reduce操作,Apply_Reduce其实就是将4个float执行elmentwise的求和。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {...#pragma unroll (MinSrcs-1 + !(MinSrcs-1))for (int s=1; s < MinSrcs; s++) {BytePack<BytePerPack> tmp[Unroll];RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < MultimemSrcs) {// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[s]);} else {// Use volatile loads in case credits are polled for with volatile (instead of acquire).tmp[u] = ld_volatile_global<BytePerPack>(minSrcs[s]);}minSrcs[s] += WARP_SIZE*BytePerPack;}#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);acc[u] = applyReduce(redFn, acc[u], tmp[u]);}}for (int s=MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {uintptr_t src = cvta_to_global(srcPtrs[s]) + threadBytesBehind;BytePack<BytePerPack> tmp[Unroll];RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {// Use volatile loads in case credits are polled for with volatile (instead of acquire).tmp[u] = ld_volatile_global<BytePerPack>(src);src += WARP_SIZE*BytePerPack;}#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);acc[u] = applyReduce(redFn, acc[u], tmp[u]);}}...}...
}
template<typename Fn, int EltPerPack>
struct Apply_Reduce {template<int Size>__device__ static BytePack<Size> reduce(Fn fn, BytePack<Size> a, BytePack<Size> b) {a.half[0] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[0], b.half[0]);a.half[1] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[1], b.half[1]);return a;}
};
template<typename T>
struct Apply_Reduce<FuncSum<T>, /*EltPerPack=*/1> {__device__ static BytePack<sizeof(T)> reduce(FuncSum<T> fn, BytePack<sizeof(T)> a, BytePack<sizeof(T)> b) {return toPack<T>(fromPack<T>(a) + fromPack<T>(b));}
};

到这里就拿到了所有输入reduce的结果,然后开始存储到所有的输出dst。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {...#pragma unroll (MinDsts + !MinDsts)for (int d=0; d < MinDsts; d++) {#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (d < MultimemDsts) {multimem_st_global(minDsts[d], acc[u]);} else {st_global<BytePerPack>(minDsts[d], acc[u]);}minDsts[d] += WARP_SIZE*BytePerPack;}}for (int d=MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) {uintptr_t dst = cvta_to_global(dstPtrs[d]) + threadBytesBehind;#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {st_global<BytePerPack>(dst, acc[u]);dst += WARP_SIZE*BytePerPack;}}}...
}  

到这里就完成了reduceCopy的过程,这里再提一下nvls场景的head,tail等flag都是mcBuff,我们看下waitPeer中如何判断是否需要等待的。

  inline __device__ uint64_t loadStepValue(uint64_t* ptr) {#if __CUDA_ARCH__ >= 900 && CUDART_VERSION >= 12010if (flags & NvlsMinPolling) {uint64_t ans;asm("multimem.ld_reduce.acquire.sys.global.min.u64 %0, [%1];" : "=l"(ans) : "l"(cvta_to_global(ptr)));return ans;}   #endif// volatile is faster than acquire but not as correct. Make sure reduceCopy// loads data using volatile so it doesn't see stale data in L1.return ld_volatile_global(ptr);}

当为nvls场景的时候,flags会有NvlsMinPolling,这里读取使用的是multimem.ld_reduce读取所有peer的step,然后取min,因此知道所有peer都准备好数据时候,才会接收数据,这里使用aquire语义,和postPeer的release配对,保证内存序。

然后看下postPeer

  template<int Recv, int Send>inline __device__ void postPeer(bool dataStored) {if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {step += StepPerSlice;if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();st_relaxed_sys_global(connStepPtr, step);}}

这里会使用非multimem的指令去写mcBuff,PTX手册中说这是一种未定义的行为,但是官方说对于写操作是可以的。

AllReduce

allreduce kernel主要有三种线程组,scatter线程逻辑如下

      using Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;int nelem = args->regUsed ? 0 : min(nvls->nHeads * chunkSize, size - offset);prims.scatter(offset, nelem, chunkSize, chunkSize, -1, 0); }   

和ring allreduce一样,nvls一次循环大小为nranks * chunkSize,即变量nlem或者loopSize,scatter线程负责将sendbuff中的数据发送到nvls->up,connIndexSend为1,因此使用第一个send conn。执行完之后如图12所示。
在这里插入图片描述

图 12

reduce线程组逻辑如下所示。
	else if (tid < tidEndReduce && nvls->headRank != -1) {if (!hasOut) {// Reduce, broadcast through NVLSusing Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 1>;Primitives<T, RedOp, FanSymmetric<1>, /*Direct=*/1, Proto, 0>prims(tid - tidEndGather, nThreadsReduce, &nvls->down, &nvls->down, NULL, NULL,args->redOpArg, 2 * Proto::MaxGroupWidth, 0, 0, args);for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + (bid * nvls->nHeads + nvls->headRank) * chunkSize;int nelem = min(chunkSize, size - offset);prims.directRecvDirectSend(offset, offset, nelem);}}...}

这里MultimemSrcs和MultimemDsts为1,使用第0个conn,因此执行directRecvDirectSend之后效果如图13,GPU 0执行流程如黄色箭头,将两卡的浅黄数据块reduce后得到深黄数据块,然后通过multimem.st将数据广播到两卡,同理GPU 1为绿色箭头,将两卡浅蓝数据块reduce后得到深蓝数据块,此时两卡都有了全局数据。
在这里插入图片描述

图 13

最后是gather线程负责将到嗯全局数据拷贝到recvbuff。
    } else if (tid < tidEndGather) {// Gatherusing Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_NVLS_ARITY, 0>, /*Direct=*/0, Proto, 0>prims(tid - tidEndScatter, nThreadsGather, nvls->up, NULL, NULL, args->recvbuff,args->redOpArg, 1 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;int nelem = args->regUsed ? 0 :min(nvls->nHeads * chunkSize, size - offset);prims.gather(offset, nelem, chunkSize, chunkSize, -1, 0); }   

gather流程就是ScatterGatherOp执行recv分支,不再赘述,执行后如图14,到这里就完成了allreduce。
在这里插入图片描述

图 14

搜索时带宽

在这里插入图片描述

图 15
前边提到搜索channel时head对应的带宽需要乘2,以图15为例,回顾刚说的Allreduce过程,GPU 0执行ld_reduce,此时带宽消耗如图16

在这里插入图片描述

图 16
然后执行multimem.st,此时产生的带宽如图17,所以head需要2倍带宽。

在这里插入图片描述

图 17

http://www.ppmy.cn/ops/11355.html

相关文章

待办已办如何分表?

1、背景介绍 待办中心是通用模块&#xff0c;随着待办已办数据&#xff0c;越来越多&#xff0c;查询待办已办的效率急剧下降。 当数据量达到千万级别的时&#xff0c;查询已办耗时达到10s以上&#xff0c;并发情况下&#xff0c;数据库链接得不到释放&#xff0c;造成新请求…

服务器(AIX、Linux、UNIX)性能监视器工具【nmon】使用介绍

目录 ■nmon简介 1.安装 2.使用简介 3.使用&#xff08;具体使用的例子【CPU】【内存】&#xff09; 4.采集数据 5.查看log&#xff08;根据结果&#xff0c;生成报表&#xff09; 6.分析结果 ■nmon简介 nmon&#xff08;"Nigels performance Monitor"&…

11_Qt文件系统

文件系统 文件系统基本文件操作二进制文件读写文本文件读写 文件系统 文件操作是应用程序必不可少的部分。Qt 作为一个通用开发库&#xff0c;提供了跨平台的文件操作能力。Qt 通过QIODevice提供了对 I/O 设备的抽象&#xff0c;这些设备具有读写字节块的能力。下面是 I/O 设备…

GPT提示词分享 —— 小说家

提示词&#x1f447; 我希望你能作为一个小说家。你要想出有创意的、吸引人的故事&#xff0c;能够长时间吸引读者。你可以选择任何体裁&#xff0c;如幻想、浪漫、历史小说等--但目的是要写出有出色的情节线、引人入胜的人物和意想不到的高潮。我的第一个要求是 小说类型 GPT…

【C++语言】字符串String练习题

题目连接&#xff1a; 仅仅反转字母 1.仅仅反转字母 给你一个字符串 s &#xff0c;根据下述规则反转字符串&#xff1a; 所有非英文字母保留在原有位置。所有英文字母&#xff08;小写或大写&#xff09;位置反转。 返回反转后的 s 。 示例 1&#xff1a; 输入&#xff1a;s …

C#探索之路基础夯实篇(5):语法糖概念解析

C#探索之路基础夯实篇(5)&#xff1a;语法糖概念解析 文章目录 C#探索之路基础夯实篇(5)&#xff1a;语法糖概念解析1、概念定义2、Lua中的语法糖3、C#中的语法糖4、C中的语法糖5、优缺点辨析6、适用范围7、总结 从之前一开始接触lua的时候开始&#xff0c;开始第一次接触到语法…

jmeter之连接MySQL数据库

jmeter连接mysql数据库 mysql官网下载地址&#xff1a;MySQL :: Download Connector/J 步骤如下&#xff1a; 1、下载mysql的jar包放入到jmeter的lib/ext下&#xff0c;然后重启jmeter 链接: https://pan.baidu.com/s/1rRrMQKnEuKz8zOUfMdMHFg?pwdawfc 提取码: awfc 2、配置…

90天玩转Python—18—Python面向对象编程:核心概念详解

90天玩转Python系列文章目录 90天玩转Python—01—基础知识篇:C站最全Python标准库总结 90天玩转Python--02--基础知识篇:初识Python与PyCharm 90天玩转Python—03—基础知识篇:Python和PyCharm(语言特点、学习方法、工具安装) 90天玩转Python—04—基础知识篇:Pytho…

SAP打印输出设置

SAP打印输入有很多方式&#xff0c;适合不同的应用场景。 一.打印输出总体概览图 二.前台打印 这个是比较常见的&#xff0c;前端打印的出现减轻了管理员的工作量&#xff0c;用户可以选择自己电脑上的打印机输出&#xff0c;不需要所有打印机都在SAP平台中进行配置&#xff0…

学习 Rust 的第五天:了解程序的基本控制流程

大家好呀 欢迎来到这个学习 Rust 的 30 天系列的第五天&#xff0c;今天我们将深入了解 Rust 中的控制流。 控制流&#xff0c;顾名思义&#xff0c;根据条件来 控制程序的流程。 If 表达式 当你想要在满足条件时执行一段代码块时&#xff0c;可以使用 if 表达式。 示例 …

【学习笔记二十四】EWM补货策略和自动补货配置

一、EWM补货策略概述 1.计划补货 ①以联机或批处理模式启动 ②根据最大和最小数量计算补货 ③仅当库存量低于最低数量时才开始 ④四舍五入至最小补货数量的倍数 2.自动补货 ①在WT确认期间启动 ②根据最大和最小数量计算补货 ③只有当库存量低于最低数量时才开始 ④四舍…

深度学习-01

机器学习是一种人工智能的分支&#xff0c;主要研究如何让计算机系统通过从大量的数据中学习并自动改进其性能。机器学习算法通过统计和数学模型来分析和理解数据&#xff0c;从而能够自动发现数据中的模式和规律&#xff0c;并根据这些规律进行预测和决策。 机器学习可以应用于…

在城市与自然中穿行:探索自然的全新方式,健康、环保、快乐的生活方式

一辆单车&#xff0c;三五好友&#xff0c;骑行穿过城市与大自然。无论是在悠闲的周末打卡城市古建筑&#xff0c;还是选择充满挑战的“川藏线”&#xff0c;无论是在城郊绿道感受清风拂面&#xff0c;还是在洱海湖畔欣赏美好风光……如今&#xff0c;越来越多人加入骑行队伍&a…

【MySQL 数据宝典】【磁盘结构】- 004 redolog 重做日志

一、背景介绍 持久性要求&#xff1a; 对于已提交的事务&#xff0c;即使系统发生崩溃&#xff0c;其对数据库的更改也不能丢失。问题&#xff1a; 在事务提交前将所有修改的页面刷新到磁盘浪费资源。随机IO导致刷新速度慢。 解决方案&#xff1a; 【数据副本】记录事务执行过…

C++ 性能分析的实战指南(gperftools工具)[建议收藏]

文章目录 使用gperftools进行 C 性能分析的实战指南一、编译安装 gperftools1. 下载源代码&#xff1a;2. 编译和安装&#xff1a; 二、编写测试程序三、使用 gperftools 代码示例四、查看分析结果五、一份实际代码实例及实操1.代码实例2.操作命令3.结果分析根据上述数据&#…

数据持久化第四课-EF的基本使用

数据持久化第四课-EF的基本使用 一.预习笔记 1.数据实体模型概述 ORM全称是“对象-关系映射”&#xff08;Object-Relation Mapping&#xff09; ORM是将关系数据库中的数据用对象的形式表现出来&#xff0c;并通过面向对象的方式将这些对象组织起来&#xff0c;实现系统业务…

记录一个Java记录日志的切面实现

假如我们有个这个注解&#xff0c;用来记录controller的操作&#xff1a; AutoLog(value "查询主表") PostMapping(value "/queryByMaster") public Result<?> queryByReqMaster(RequestBody ReqMaster reqMasterParam) {List<ReqMaster> …

vue 3 + TS 组合式标注类型

1.组件的 emits 标注类型 <script setup lang"ts"> // 运行时 const emit defineEmits([change, update])// 基于选项 const emit defineEmits({change: (id: number) > {// 返回 true 或 false// 表明验证通过或失败},update: (value: string) > {//…

REINFORCE算法

REINFORCE&#xff08;REward Increment Nonnegative Factor Offset Reinforcement Characteristic Eligibility&#xff09;是一种在强化学习中常用的策略梯度算法&#xff0c;用于训练神经网络策略。它是一种基于梯度的方法&#xff0c;直接优化策略的期望回报。 以下是RE…

微服务中的重要模块

为什么要有微服务&#xff1f; 微服务提高开发效能&#xff0c;避免业务的重复理解&#xff0c;代码重复开发&#xff0c;增加开发效能和代码复用性。 在实际的工作中许多不同的业务有着共同的功能需求&#xff0c;如果我们每遇到一次这种需求就重新去理解构建一次的话会花费大…