文章目录
1、partitionBy
源码中的定义(部分)
调用方式
2、coalesce
源码中的定义
调用方式
3、repartition
源码中的定义
调用方式
repartition和coalesce的区别
代码演示 (跳转代码)
实现重新分区,本质上就是重新配置并行度,也就是说,如果我们将分区数设置为n,那么Spark作业的并行度就是n
1、partitionBy
传入自定义的分区策略 默认为按照key进行hash
源码中的定义(部分):
def partitionBy(self: "RDD[Tuple[K, V]]",numPartitions: Optional[int],partitionFunc: Callable[[K], int] = portable_hash,
) -> "RDD[Tuple[K, V]]":
调用方式
rdd.partitionBy(参数1,参数2)
参数1,分区数
参数2,传入自定义的分区策略 默认为按照key进行hash
2、coalesce
coalesce既可以实现RDD分区的合并缩小,也可以实现RDD分区的扩大
源码中的定义:
def coalesce(self: "RDD[T]", numPartitions: int, shuffle: bool = False) -> "RDD[T]":
调用方式
rdd.coalesce(参数1,参数2)
参数1,传入分区个数
参数2,传入 shuffle,默认为 False,为False则不进行shuffle,带有分区捆绑进行重新分区
若shuffle = True, 则进行shuffle操作,不带有分区捆绑进行重新分区 分区更加均匀(避免数据倾斜)
3、repartition
返回一个新的RDD,它刚好有numPartitions(参数1)个分区。
源码中的定义
def repartition(self: "RDD[T]", numPartitions: int) -> "RDD[T]":
调用方式
rdd.repartition(参数1)
参数1,分区数
可以增加或减少此RDD中的分区。在内部,它使用shuffle来重新分发数据。
如果正在减少这个RDD中的分区数量,考虑使用' coalesce',这可以避免执行shuffle。
repartition和coalesce的区别:
repartition默认开启shuffle,
coalesce默认不开启,但可用参数配置,
实际上repartition底层调用的就是coalesce
代码演示
# coding:utf8
import timefrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 7, 8, 9, 10], 3)print(rdd.glom().collect())print(rdd.coalesce(2).glom().collect())print(rdd.coalesce(2, shuffle=True).glom().collect())print(rdd.repartition(2).glom().collect())
制作不易,点个赞吧~