一、上下文
《Spark-Task启动流程》中我们讲到了ShuffleMapTask中会对这个Stage的结果进行磁盘的写入,并且从SparkEnv中得到了ShuffleManager,且调用了它的getWriter方法并在这个Stage的入口处(也就是RDD的迭代器数据源处)调用了它的getReader,下面我们来详细分析下ShuffleManager在整个Task中的形态。
二、ShuffleManager
Shuffle System 的可插拔接口。根据spark.shuffle.manager设置,位于SparkEnv中,在driver 和每个executor 上创建ShuffleManager。driver 来注册一个Shuffle,executor(或在driver中本地运行的任务)可以使用它来进行Shuffle Reader和Shuffle Writer
private[spark] trait ShuffleManager {//注册洗牌,并获得一个句柄,以便将其传递给任务def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle//为给定分区找一个写入器。被 executor 上的Task调用执行//一个Stage最后写入时调用def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取//被reduce端的Executor上的Task调用//一个Stage开始计算时调用final def getReader[K, C](handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)}}
SortShuffleManager
它是ShuffleManager的唯一子类
在基于排序的Shuffle中,传入记录根据其目标分区ID进行排序,然后写入单个map输出文件,reducer获取此文件的连续区域,以便读取其在map输出中的部分。如果map输出数据太大而无法放入内存,则可以将输出的排序子集溢写到磁盘,并将磁盘上的文件合并以生成最终输出文件。
基于排序的Shuffle有两种不同的写入路径来生成对应的map输出文件:
1、序列化排序:当满足以下三个条件时使用
- Shuffle依赖关系未指定Map侧组合。
- Shuffle序列化程序支持重新定位序列化值(目前KryoSerialer和Spark SQL的自定义序列化程序支持此功能)
-
洗牌产生的输出分区小于或等于16777216(2的24次方)个。
2、非序列化排序:用于处理所有其他情况
序列化排序模式
在序列化排序模式下,传入记录在传递给ShuffleWriter后立即被序列化,并在排序过程中以序列化形式缓冲。此写入路径实现了几个优化:
1、它的排序操作基于序列化的二进制数据,而不是Java对象,这减少了内存消耗和GC开销。此优化要求记录序列化器具有某些属性,以允许序列化记录重新排序,而不需要反序列化。有关更多详细信息,请参阅SPARK-4550,其中首次提出并实施了此优化。
2、它使用一个专门的缓存高效排序器([[ShuffleExternalSorter]])对压缩记录指针和分区ID的数组进行排序。通过在排序数组中每条记录只使用8个字节的空间,这可以将更多的数组放入缓存中。
3、溢写合并过程对属于同一分区的序列化记录块进行操作,在合并过程中不需要对记录进行反序列化。
4、当溢写压缩编解码器支持压缩数据的连接时,溢写合并只是将序列化和压缩的溢写分区连接起来,以产生最终的输出分区。这允许使用高效的数据复制方法,如NIO的“transferTo”,并避免了在合并过程中分配解压缩或复制缓冲区的需要。
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {//注册Shuffleoverride def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//如果有少于spark.shuffle.sort.bypassMergeThreshold 默认值200 的分区,并且我们不需要map侧聚合,那么直接编写numPartitions文件,并在最后将它们连接起来。这避免了两次进行序列化和反序列化以将溢写的文件合并在一起,这在正常的代码路径中会发生。缺点是一次打开多个文件,因此分配给缓冲区的内存更多。new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {//否则,请尝试以序列化形式缓冲映射输出,因为这样更有效:new SerializedShuffleHandle[K, V](shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// 如果上面两种情况都不满足,走它:缓冲区数据将以反序列化的形式输出:new BaseShuffleHandle(shuffleId, dependency)}}//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取。如果endMapIndex=Int.MaxValue,则实际endMapIndex将更改为“getMapSizesByExecutorId”中Shuffle的总map输出长度。override def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]val (blocksByAddress, canEnableBatchFetch) =//如果启用了Push-based shuffle 且 rdd不是Barrier 那么可以直接构建迭代器,就不用拉取了if (baseShuffleHandle.dependency.shuffleMergeEnabled) {val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)(res.iter, res.enableBatchFetch)} else {//从mapOutputTracker处获取需要拉取数据的地址val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)(address, true)}new BlockStoreShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,shouldBatchFetch =canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))}//每个分区会获取一个ShuffleWriteroverride def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))mapTaskIds.synchronized { mapTaskIds.add(mapId) }val env = SparkEnv.gethandle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf,metrics,shuffleExecutorComponents)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,bypassMergeSortHandle,mapId,env.conf,metrics,shuffleExecutorComponents)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)}}}
在SortShuffleManager中的getReader()中我们可以看到,只有一种ShuffleReader,即:BlockStoreShuffleReader,但其因为ShuffleDependency的不同也会返回不同的Iterator
getWriter()中看到,根据ShuffleHandle可以分为三种ShuffleWriter,即:
unsafeShuffleHandle -> UnsafeShuffleWriter
bypassMergeSortHandle -> BypassMergeSortShuffleWriter
BaseShuffleHandle -> SortShuffleWriter
三、谁注册的ShuffleManager
ShuffleDependency中注册的,并得到了一个ShuffleHandle
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false,val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)extends Dependency[Product2[K, V]] with Logging {val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)
}
那么什么时候会调用RDD的getDependencies?
我们回想下之前几篇博客的内容《Spark-SparkSubmit详细过程》、《Spark-driver和executor启动过程》、《Spark-Job启动、Stage划分》是不是在Stage划分时调用了这个方法来判断是否要创建一个ShuffleMapStage。因此在划分Stage时就确定了ShuffleHandle,换言之也就确定了这个Stage最后的结果要选用哪个ShuffleWriter。而ShuffleWriter又是Spark计算中一个大的瓶颈,因此调节ShuffledRDD的ShuffleDependency就成了调优的必要且重要渠道。后面展开分析ShuffleWriter时再具体讲
哪里用到了ShuffleHandle
1、getReader调用时
getReader是在一个Stage中读取上游Stage时调用的也就是ShuffleRDD中的compute()
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](@transient var prev: RDD[_ <: Product2[K, V]],part: Partitioner)extends RDD[(K, C)](prev.context, Nil) {override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]val metrics = context.taskMetrics().createTempShuffleReadMetrics()SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context, metrics).read().asInstanceOf[Iterator[(K, C)]]}}
2、getWriter调用时
getWriter是在一个Stage结束并将数据溢写磁盘时调用的也就是ShuffleWriteProcessor中的write()
private[spark] class ShuffleWriteProcessor extends Serializable with Logging {def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],mapId: Long,context: TaskContext,partition: Partition): MapStatus = {var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle,mapId,context,createMetricsReporter(context))writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])}}
四、Shuffle场景描述
通过以上的梳理我们大致画下ShuffleManager在计算Task中的场景