Spark-ShuffleManager

news/2024/9/19 4:54:03/ 标签: spark, 大数据, 分布式

一、上下文

《Spark-Task启动流程》中我们讲到了ShuffleMapTask中会对这个Stage的结果进行磁盘的写入,并且从SparkEnv中得到了ShuffleManager,且调用了它的getWriter方法并在这个Stage的入口处(也就是RDD的迭代器数据源处)调用了它的getReader,下面我们来详细分析下ShuffleManager在整个Task中的形态。

二、ShuffleManager

Shuffle System 的可插拔接口。根据spark.shuffle.manager设置,位于SparkEnv中,在driver 和每个executor 上创建ShuffleManager。driver 来注册一个Shuffle,executor(或在driver中本地运行的任务)可以使用它来进行Shuffle Reader和Shuffle Writer

private[spark] trait ShuffleManager {//注册洗牌,并获得一个句柄,以便将其传递给任务def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle//为给定分区找一个写入器。被 executor 上的Task调用执行//一个Stage最后写入时调用def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取//被reduce端的Executor上的Task调用//一个Stage开始计算时调用final def getReader[K, C](handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)}}

SortShuffleManager

它是ShuffleManager的唯一子类

在基于排序的Shuffle中,传入记录根据其目标分区ID进行排序,然后写入单个map输出文件,reducer获取此文件的连续区域,以便读取其在map输出中的部分。如果map输出数据太大而无法放入内存,则可以将输出的排序子集溢写到磁盘,并将磁盘上的文件合并以生成最终输出文件。

基于排序的Shuffle有两种不同的写入路径来生成对应的map输出文件:

1、序列化排序:当满足以下三个条件时使用

  • Shuffle依赖关系未指定Map侧组合。
  • Shuffle序列化程序支持重新定位序列化值(目前KryoSerialer和Spark SQL的自定义序列化程序支持此功能)
  • 洗牌产生的输出分区小于或等于16777216(2的24次方)个。

2、非序列化排序:用于处理所有其他情况

序列化排序模式

在序列化排序模式下,传入记录在传递给ShuffleWriter后立即被序列化,并在排序过程中以序列化形式缓冲。此写入路径实现了几个优化:

1、它的排序操作基于序列化的二进制数据,而不是Java对象,这减少了内存消耗和GC开销。此优化要求记录序列化器具有某些属性,以允许序列化记录重新排序,而不需要反序列化。有关更多详细信息,请参阅SPARK-4550,其中首次提出并实施了此优化。

2、它使用一个专门的缓存高效排序器([[ShuffleExternalSorter]])对压缩记录指针和分区ID的数组进行排序。通过在排序数组中每条记录只使用8个字节的空间,这可以将更多的数组放入缓存中。

3、溢写合并过程对属于同一分区的序列化记录块进行操作,在合并过程中不需要对记录进行反序列化。

4、当溢写压缩编解码器支持压缩数据的连接时,溢写合并只是将序列化和压缩的溢写分区连接起来,以产生最终的输出分区。这允许使用高效的数据复制方法,如NIO的“transferTo”,并避免了在合并过程中分配解压缩或复制缓冲区的需要。

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {//注册Shuffleoverride def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//如果有少于spark.shuffle.sort.bypassMergeThreshold 默认值200 的分区,并且我们不需要map侧聚合,那么直接编写numPartitions文件,并在最后将它们连接起来。这避免了两次进行序列化和反序列化以将溢写的文件合并在一起,这在正常的代码路径中会发生。缺点是一次打开多个文件,因此分配给缓冲区的内存更多。new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {//否则,请尝试以序列化形式缓冲映射输出,因为这样更有效:new SerializedShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// 如果上面两种情况都不满足,走它:缓冲区数据将以反序列化的形式输出:new BaseShuffleHandle(shuffleId, dependency)}}//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取。如果endMapIndex=Int.MaxValue,则实际endMapIndex将更改为“getMapSizesByExecutorId”中Shuffle的总map输出长度。override def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]val (blocksByAddress, canEnableBatchFetch) =//如果启用了Push-based shuffle 且 rdd不是Barrier 那么可以直接构建迭代器,就不用拉取了if (baseShuffleHandle.dependency.shuffleMergeEnabled) {val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)(res.iter, res.enableBatchFetch)} else {//从mapOutputTracker处获取需要拉取数据的地址val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)(address, true)}new BlockStoreShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,shouldBatchFetch =canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))}//每个分区会获取一个ShuffleWriteroverride def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))mapTaskIds.synchronized { mapTaskIds.add(mapId) }val env = SparkEnv.gethandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf,metrics,shuffleExecutorComponents)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,bypassMergeSortHandle,mapId,env.conf,metrics,shuffleExecutorComponents)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)}}}

在SortShuffleManager中的getReader()中我们可以看到,只有一种ShuffleReader,即:BlockStoreShuffleReader,但其因为ShuffleDependency的不同也会返回不同的Iterator

getWriter()中看到,根据ShuffleHandle可以分为三种ShuffleWriter,即:

unsafeShuffleHandle  -> UnsafeShuffleWriter

bypassMergeSortHandle -> BypassMergeSortShuffleWriter

BaseShuffleHandle -> SortShuffleWriter

三、谁注册的ShuffleManager

ShuffleDependency中注册的,并得到了一个ShuffleHandle

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false,val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)extends Dependency[Product2[K, V]] with Logging {val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)
}

那么什么时候会调用RDD的getDependencies?

我们回想下之前几篇博客的内容《Spark-SparkSubmit详细过程》、《Spark-driver和executor启动过程》、《Spark-Job启动、Stage划分》是不是在Stage划分时调用了这个方法来判断是否要创建一个ShuffleMapStage。因此在划分Stage时就确定了ShuffleHandle,换言之也就确定了这个Stage最后的结果要选用哪个ShuffleWriter。而ShuffleWriter又是Spark计算中一个大的瓶颈,因此调节ShuffledRDD的ShuffleDependency就成了调优的必要且重要渠道。后面展开分析ShuffleWriter时再具体讲

哪里用到了ShuffleHandle

1、getReader调用时

getReader是在一个Stage中读取上游Stage时调用的也就是ShuffleRDD中的compute()

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](@transient var prev: RDD[_ <: Product2[K, V]],part: Partitioner)extends RDD[(K, C)](prev.context, Nil) {override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]val metrics = context.taskMetrics().createTempShuffleReadMetrics()SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context, metrics).read().asInstanceOf[Iterator[(K, C)]]}}

2、getWriter调用时

getWriter是在一个Stage结束并将数据溢写磁盘时调用的也就是ShuffleWriteProcessor中的write()

private[spark] class ShuffleWriteProcessor extends Serializable with Logging {def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],mapId: Long,context: TaskContext,partition: Partition): MapStatus = {var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle,mapId,context,createMetricsReporter(context))writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])}}

四、Shuffle场景描述

通过以上的梳理我们大致画下ShuffleManager在计算Task中的场景


http://www.ppmy.cn/news/1520516.html

相关文章

uniapp 自定义微信小程序 tabBar 导航栏

背景 做了一个校园招聘类小程序&#xff0c;使用 uniapp vue3 uview-plus pinia 构建&#xff0c;这个小程序要实现多角色登录&#xff0c;根据权限动态切换 tab 栏文字、图标。 使用pages.json中配置tabBar无法根据角色动态配置 tabBar&#xff0c;因此自定义tabBar&…

MySQL数据库增删查改(基础)CRUD

CRUD 即增加 (Create) 、查询 (Retrieve) 、更新 (Update) 、删除 (Delete) 四个单词的首字母缩写。 1. 新增&#xff08;Create&#xff09; 1.1单行数据&#xff08;全列插入&#xff09; 比如说&#xff1a;创建一张学生表&#xff0c;有姓名&#xff0c;学号。插入两个学…

新手c语言讲解及题目分享(十)——数组专项练习

C语言中的数组是一个用于存储多个同类型数据的集合。数组在内存中是连续分配的&#xff0c;可以通过索引访问其中的元素。以下是对C语言数组的详细讲解&#xff1a; 1. 数组的定义 数组的定义格式如下&#xff1a; type arrayName[arraySize]; - type&#xff1a;数组中元素…

数据结构---链表

指针和数组 数组的用途: 固定大小的存储: 数组用于存储固定大小的一组相同类型的元素。数组的大小在声明时必须指定&#xff0c;并且在程序运行期间不能改变。访问效率高: 数组允许通过下标进行快速访问&#xff0c;时间复杂度为 O(1)。内存连续性: 数组的元素在内存中是连续存…

网络安全面试经验分享:蘑菇街/网络安全

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 蘑菇街 介绍…

蓝牙协议栈API分析

蓝牙协议栈API分析是一个复杂但重要的任务&#xff0c;它涉及到蓝牙通信的各个方面&#xff0c;包括设备发现、连接建立、数据传输以及安全管理等。以下是对蓝牙协议栈API的详细分析&#xff0c;旨在提供一个全面的视角。 一、蓝牙协议栈概述 蓝牙协议栈是蓝牙技术实现的基础…

解决reCaptcha v2 Invisible:识别和参数

概述 reCaptcha v2 Invisible是一种旨在提供安全性而不打扰用户体验的验证码类型。与传统的验证码不同&#xff0c;reCaptcha v2 Invisible在检测到可疑活动时才会要求用户进行互动。本文将引导您如何使用CapSolver API识别并解决reCaptcha v2 Invisible挑战。 什么是reCaptc…

ChatGPT与R语言融合技术在生态环境数据统计分析、绘图、模型中的实践与进阶应用

自2022年GPT&#xff08;Generative Pre-trained Transformer&#xff09;大语言模型的发布以来&#xff0c;它以其卓越的自然语言处理能力和广泛的应用潜力&#xff0c;在学术界和工业界掀起了一场革命。在短短一年多的时间里&#xff0c;GPT已经在多个领域展现出其独特的价值…

计算机网络-VRRP切换与回切过程

前面我们学习了VRRP选举机制&#xff0c;根据VRRP优先级与IP地址确定主设备与备份设备&#xff0c;这里继续进行主备切换与主备回切以及VRRP抢占模式的学习。 一、VRRP主备切换 主备选举时根据优先级选择主设备&#xff0c;状态切换为Master状态&#xff0c;那当什么时候会切换…

科研学习|论文解读——OceanGPT:用于海洋科学任务的大型语言模型

摘要 海洋覆盖我们星球表面70%以上&#xff0c;对于理解生命的丰富储备和生物多样性至关重要。鉴于海洋在调节全球气候和支持经济中的关键作用&#xff0c;海洋科学研究具有重大意义。最近&#xff0c;大型语言模型&#xff08;LLMs&#xff09;的进步改变了科学的范式。尽管在…

Linux下递归设置目标目录及其子目录和文件的权限

〇、背景 本文旨在简单介绍一个在Linux环境下批量修改目录及其子目录和文件的权限的方法。 一、实现 首先新建一个shell脚本文件&#xff0c;使用指令$ vi chmod.sh&#xff0c;然后在文件中输入下述代码。 #!/bin/bashOFFSET_INDEX" " DIR_MODE755 FILE_MODE664…

Oracle---PAG程序全局区的组成:堆栈区、会话区、游标区、排序区

文章目录 PGA程序全局区PGA主要内容1、排序区&#xff08;SORT AREA&#xff09;**为什么给排序设置合理的排序区大小** 2、会话区&#xff08;USER SESSON DATA&#xff09;3、堆栈区保存变量信息(STACK SPACE)4、游标区 (CURSOR STATE) PGA程序全局区 程序全局区或进程全局区…

AN7536PT时钟电路

目录 1 时钟电路概述2 时钟晶振电路2.1 需求分析2.2 晶振选型&#xff08;Datasheet表5-7解读&#xff09;2.3 设计晶振电路&#xff08;表4-1、图5-4&#xff09; 1 时钟电路概述 时钟电路是一种用于产生稳定、周期性脉冲信号的电子电路。它通常由晶体振荡器和相关逻辑电路组…

Luminar Neo for Mac智能图像处理软件【操作简单,轻松上手】

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功 三、运行测试安装完成&#xff01;&#xff01;&#xff01; 效果 一、下载软件 下载软件…

Hackme靶场渗透攻略

步骤一&#xff0c;注册登录进去 步骤二&#xff0c;点击search 我们发现有很多书 步骤三&#xff0c;搜索一本书抓包发放到重放器 步骤四&#xff0c;数据改为1*&#xff0c;复制数据包到1.txt&#xff0c;然后打开sqlmap 步骤五&#xff0c;sqlmap查看当前数据库 python s…

阿尔茨海默病症识别+图像识别Python+人工智能+深度学习+TensorFlow+机器学习+卷积神经网络算法

一、介绍 阿尔茨海默病症识别。使用Python作为主要编程语言进行开发&#xff0c;基于深度学习等技术使用TensorFlow搭建ResNet50卷积神经网络算法&#xff0c;通过对病症图片4种数据集进行训练[‘轻度痴呆’, ‘中度痴呆’, ‘非痴呆’, ‘非常轻微的痴呆’]&#xff0c;最终得…

TeeChart助力科研软件:高效实现数据可视化

在当今的科学研究中&#xff0c;数据可视化已经成为理解和传播复杂信息的关键工具。尤其是在物理研究领域&#xff0c;科学家们经常需要处理大量的数据&#xff0c;并通过可视化将这些数据转化为更易理解的形式。TeeChart作为一个强大且灵活的图形展示工具&#xff0c;能够帮助…

前端按钮通过浏览器下载附件

html <a click"downloadAttach(record.memoryAddress)">下载附件</a> js downloadAttach(url){var fileUrl window._CONFIG[staticDomainURL] url;window.open(fileUrl); } 配置文件 window._CONFIG[staticDomainURL] http://127.0.0.1:3000/xxx…

Spring Cloud Gateway的使用

Spring Cloud Gateway的使用 1. Spring Cloud Gateway原理2. Spring Boot项目中集成Spring Cloud Gateway2.1 创建项目与添加依赖2.2 配置网关 3. 高级功能与实践**3.1 配置过滤器****3.2 分组路由** 4. 监控与故障处理5. 部署与持续集成 在微服务架构中&#xff0c;服务发现、…

计算机网络(一) —— 网络基础入门

目录 一&#xff0c;关于网络 二&#xff0c;协议 2.1 协议是什么&#xff0c;有什么用&#xff1f; 2.2 协议标准谁定的&#xff1f; 2.3 协议分层 2.4 OSI 七层模型 2.5 TCP/IP 四层模型 三&#xff0c;网络传输基本流程 3.1 局域网中两台主机通信* 3.2 报文的封装与…