Spark RDD 中的 repartition
和 coalesce
是两种常用的分区调整算子,它们的功能是改变 RDD 的分区数量。以下从源码、原理和使用角度分析它们的异同点。
一、repartition
和 coalesce
的功能与区别
特性 | repartition | coalesce |
---|---|---|
主要功能 | 调整 RDD 分区数量,可以增加或减少。 | 调整 RDD 分区数量,主要用于减少分区。 |
是否触发 Shuffle | 一定会触发,生成新的分区数据分布。 | 在减少分区时默认不会触发,但可以选择触发 Shuffle(shuffle=true )。 |
是否增加分区 | 支持,增加分区会均匀分布数据(Shuffle)。 | 不推荐增加分区,分区数变多时需启用 Shuffle。 |
数据倾斜的影响 | 数据倾斜更低,分区较均匀。 | 不触发 Shuffle 时可能导致部分分区过大。 |
常见使用场景 | 动态增加或减少分区,用于优化性能或并行度。 | 减少分区,常用于窄依赖算子后优化下游性能。 |
二、实现原理
1. repartition
的源码分析
repartition
是通过调用 coalesce
并设置 shuffle = true
实现的,源码位于 RDD.scala
:
def repartition(numPartitions: Int): RDD[T] = {coalesce(numPartitions, shuffle = true)
}
2. coalesce
的源码分析
coalesce
的实现逻辑同样在 RDD.scala
中,具体如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {if (shuffle) {// 使用 Shuffle 重新分区new CoalescedRDD(this, numPartitions, shuffle = true)} else {// 不触发 Shuffle 时,合并分区(窄依赖)new CoalescedRDD(this, numPartitions, shuffle = false)}
}
- 核心逻辑:
三、Shuffle 触发的影响
-
Shuffle 的作用:
-
repartition
一定会触发 Shuffle:- 原因是其目的是将数据分布均匀到目标分区。
-
coalesce
默认不触发 Shuffle:- 数据保持窄依赖,减少 Shuffle 开销。
- 缺点:分区数据可能不均匀,造成下游任务性能问题。
四、举例说明
数据集
val rdd = sc.parallelize(1 to 100, 10) // 创建一个有 10 个分区的 RDD
1. 使用 repartition
增加分区
val repartitionedRDD = rdd.repartition(20)
println(repartitionedRDD.getNumPartitions) // 输出:20
2. 使用 coalesce
减少分区
val coalescedRDD = rdd.coalesce(5)
println(coalescedRDD.getNumPartitions) // 输出:5
3. 使用 coalesce
增加分区(触发 Shuffle)
val shuffledRDD = rdd.coalesce(20, shuffle = true)
println(shuffledRDD.getNumPartitions) // 输出:20
五、优缺点总结
算子 | 优点 | 缺点 |
---|---|---|
repartition | 数据分布均匀,适合动态调整并行度。 | 一定会触发 Shuffle,开销较大。 |
coalesce | 不触发 Shuffle 时性能高,适合减少分区的场景。 | 数据可能不均匀,依赖父 RDD 分区情况。 |