1. shuffle前言
对spark任务划分阶段,遇到宽依赖会断开,所以在stage 与 stage 之间会产生shuffle,大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代。
ShuffleManager 大概有两个: HashShuffleManager 和 SortShuffleManager。
历史:
在spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager;
在spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager;
在spark 2.0以后,抛弃了 HashShuffleManager。
2. HashShuffleManager
上游 stage 有 2个 Executor,每个Executor 有 2 个 task。
下游 stage 有 3个task。
shuffle write阶段:
将相当于mapreduce的shuffle write, 上游的mapTask任务的数据按照key的hash 分桶,写出中间文件(个数为下游reduceTask的任务,即下游RDD分区的个数)。
写出中间文件个数 = maptask的个数 * reducetask的个数。
shuffle read 阶段:
就相当于mapreduce 的 shuffle read, 每个reducetask 拉取自己的数据。
由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个reducetask只要从上游stage的所有maptask所在节点上,拉取属于自己的那一个磁盘文件即可。
弊端:
shuffle write阶段占用大量的内存空间,会导致频繁的GC,容易导致OOM(out of memory);也会产生大量的小文件,写入过程中会产生大量的磁盘IO,性能受到影响。适合小数据集的处理。
3. HashShuffleManager 优化
开启consolidate机制。
设置参数:spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
shuffle write阶段:
我们知道,如果executor的个数为5个,一个executor上的核心是1个,有10个分区的数据要处理,即一个核心要处理2个任务。
开启consolidate机制后,上游的每个mapTask任务的数据仍然按照key的hashCode值分桶,但每个任务并不会形成很多个中间小文件,而是对于每个executor的每个核来说,只会产生下游reduceTask个数的文件。优化后,HashShuffleManager允许上游的交给由一个executor的一个core处理的多个maptask任务的数据以追加形式写入文件组,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
写出中间文件个数 = 上游的CPU核数 * 下游task的个数
shuffle read 阶段:
就相当于mapreduce 的 shuffle read, 每个reducetask 拉取自己的数据。
由每个reducetask只要从上游stage的所在节点上,拉取属于自己的那一个磁盘文件即可。
弊端:
优化后的HashShuffleManager,虽然比优化前减少了很多小文件,但在处理大量数据时,还是会产生很多的小文件。
4. SortShuffleManager
Spark在引入Sort-Based Shuffle以前,比较适用于中小规模的大数据处理。为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了SortShuffleManager。
shuffle write阶段:
shuffle操作之前,数据会被划分为多个分区。每个分区被发往不同的executor进行计算。在map阶段,每个executor会根据key的hashCode值将数据进行分桶产生小文件。每个桶对应的一个下游分区。在每个桶中,数据会被按照key进行局部排序(这个操作不是必须的),排序后这些小文件会写入到内存中的一个大的shuffle文件。在写入shuffle文件的同时,会生成一个index索引文件。索引文件可以快速定位和读取所需要的键值对数据,而不需要扫描整个文件。
SortShuffleManager不会为每个Reducer中的Task生成一个单独的文件,相反,会把上游中每个mapTask所有的输出数据Data只写到一个文件中。并使用了Index文件存储具体 mapTask 输出数据在该文件的位置。
因此 上游 中的每一个mapTask中产生两个文件:Data文件 和 Index 文件,其中Data文件是存储当前Task的Shuffle输出的,而Index文件中存储了data文件中的数据通过partitioner的分类索引。
写出文件数 = maptask的个数 * 2 (index 和 data )
可见,SortShuffle 的产生的中间文件的多少与 上个stage 的 maptask 数量有关。
shuffle read 阶段:
下游的Stage中的Task就是根据这个Index文件获取自己所要抓取的上游Stage中产生的数据。
在sortShuffleManager中,我们可以启动byPass机制,不排序的机制。开关的值默认是mapTask的个数是200.
触发bypass机制的条件:
shuffle map task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)或者不是聚合类的shuffle算子(比如groupByKey)
5. 总结
回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化以此为:
Hash Shuffle:M*R;
Consolidate 方式的Hash Shuffle:C*R;
Sort Shuffle:2*M;
其中:M:上游stage的task数量,R:下游stage的task数量,C:上游stage运行task的CPU核数