【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)

ops/2024/11/23 18:31:39/

之前学过的kv类型上面的算子

groupby groupByKey reduceBykey sortBy sortByKey join[cogroup left inner right] shuffle的

mapValues keys values flatMapValues 普通算子,管道形式的算子

shuffle的过程是因为数据产生了打乱重分,分组、排序、join等算子需要将数据重新排版。

shuffle的过程是上游的数据处理完毕写出到自己的磁盘上,然后下游的数据从磁盘上面拉取。

重新排版打乱重分是需要存在规则的。

中间数据的流向规则叫做分区器 partitioner,这个分区器一般是存在于shuffle类算子中的,我们可以这么说,shuffle类算子一定会带有分区器,分区器也可以单独存在,人为定义分发规则。

groupBy groupBykey reduceBykey 自带的分区器HashPartitioner。

sortby sortBykey rangePartitioner

hashPartitioner

规则 按照key的hashCode %下游分区 = 分区编号

处理key-value类型数据,如果key为0,就分配去0号分区。否则调用nonNegativeMod函数。

保证取余的结果为正向结果。

hash取余的方式,不管数据分发到下游的什么分区中,最终结果都是相同的数据放入到一起。

演示结果:

scala> val arr = Array(1,2,3,4,5,6,7,8,9)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> sc.makeRDD(arr,3)
res78: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[73] at makeRDD at <console>:27scala> res78.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res79: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[74] at mapPartitionsWithIndex at <console>:26scala> res79.collect
res80: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (2,7), (2,8), (2,9))scala> res78.map(t=>(t,t))
res81: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[75] at map at <console>:26scala> res78.partitioner
res82: Option[org.apache.spark.Partitioner] = Nonescala> res81.reduceByKey(_+_)
res84: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[76] at reduceByKey at <console>:26scala> res84.partitioner
res85: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)scala> res84.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res88: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[77] at mapPartitionsWithIndex at <console>:26scala> res88.collect
res89: Array[(Int, (Int, Int))] = Array((0,(6,6)), (0,(3,3)), (0,(9,9)), (1,(4,4)), (1,(1,1)), (1,(7,7)), (2,(8,8)), (2,(5,5)), (2,(2,2)))

演示的逻辑,就是按照key.hashcode进行分区,int类型的hashcode值是自己的本身。

并且hash分区器的规则致使我们可以任意的修改下游的分区数量。

scala> res81.reduceByKey(_+_,100)
res91: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[78] at reduceByKey at <console>:26scala> res91.partitions.size
res92: Int = 100scala> res81.reduceByKey(_+_,2)
res93: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[79] at reduceByKey at <console>:26scala> res93.partitions.size
res94: Int = 2

rangePartitioner

hashPartitioner规则非常简单,直接规定来一个数据按照hashcode规则的分配,规则比较简答,但是会出现数据倾斜。

range分区规则中存在两个方法。

rangeBounds界限,在使用这个分区器之前先做一个界限划定。

首先使用水塘抽样算法,在未知的数据集中抽取能够代表整个数据集的样本,根据样本进行规则设定。

然后在使用getPartitions。

首先存在水塘抽样,规定数据的流向以后再执行整体逻辑,会先触发计算。

sortBykey是转换类的算子,不会触发计算。

但是我们发现它触发计算了,因为首先在计算之前先进行水塘抽样,能够规定下游的数据规则,然后再进行数据的计算。

scala> arr
res101: Array[Int] = Array(1, 9, 2, 8, 3, 7, 4, 6, 5)scala> arr.map(t=> (t,t))
res102: Array[(Int, Int)] = Array((1,1), (9,9), (2,2), (8,8), (3,3), (7,7), (4,4), (6,6), (5,5))scala> sc.makeRDD(res102)
res104: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[94] at makeRDD at <console>:27scala> res104.sortByKey()
res105: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[97] at sortByKey at <console>:26scala> res105.partitioner
res106: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@fe1f9dea)scala> res104.sortByKey(true,3)
res107: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[100] at sortByKey at <console>:26scala> res107.mapPartitionsWithIndex((index,it)=> it.map((index,_)))
res109: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:26scala> res109.collect
res110: Array[(Int, (Int, Int))] = Array((0,(1,1)), (0,(2,2)), (0,(3,3)), (1,(4,4)), (1,(5,5)), (1,(6,6)), (2,(7,7)), (2,(8,8)), (2,(9,9)))

range分区器,它是先做抽样然后指定下游分区的数据界限。

它可以修改分区数量,但是分区数量不能大于元素个数,必须要保证每个分区中都有元素。

scala> res104.sortByKey(true,3)
res111: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[104] at sortByKey at <console>:26scala> res104.sortByKey(true,300)
res112: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[107] at sortByKey at <console>:26scala> res111.partitions.size
res114: Int = 3scala> res112.part

自定义分区器

工作的过程中我们会遇见数据分类的情况,想要根据自己的需求定义分区的规则,让符合规则的数据发送到不同的分区中,这个时候我们就需要自定义分区器了。

定义分区器,让数据发送到不同的分区,从而不同的task任务输出的文件结果内容也不同

# 自己创建数据data/a.txt
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
hello tom hello jack
# 需求就是将数据按照规则进行分发到不同的分区中
# 存储的时候一个文件存储hello其他的文件存储tom jack

分区器的定义需要实现分区器的接口

class MyPartitioner extends Partitioner{override def numPartitions: Int = ???
// 设定下游存在几个分区override def getPartition(key: Any): Int = ???
// 按照key设定分区的位置
}

整体代码:

package com.hainiu.sparkimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object Test1 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("parse")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.textFile("data/a.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)val rdd1 = rdd.partitionBy(new MyPartitioner)val fs = FileSystem.get(new Configuration())val out = "data/res"if(fs.exists(new Path(out)))fs.delete(new Path(out),true)rdd1.saveAsTextFile(out)}
}
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {if(key.toString.equals("hello"))0else1}
}


http://www.ppmy.cn/ops/136107.html

相关文章

mysql in查询大数据量业务无法避免情境下优化

在 MySQL 中&#xff0c;IN 查询操作广泛用于从数据库中检索符合条件的多条记录&#xff0c;但当涉及到大数据量的 IN 查询时&#xff0c;性能可能会显著下降。特别是当 IN 子句中的元素数量非常大时&#xff0c;MySQL 需要对每个元素进行匹配&#xff0c;这会导致查询变得非常…

ffmpeg视频滤镜:提取缩略图-framestep

滤镜描述 官网地址 > FFmpeg Filters Documentation 这个滤镜会间隔N帧抽取一帧图片&#xff0c;因此这个可以用于设置视频的缩略图。总体上这个滤镜比较简单。 滤镜使用 滤镜参数 framestep AVOptions:step <int> ..FV....... set frame st…

单片机学习笔记 8. 矩阵键盘按键检测

更多单片机学习笔记&#xff1a;单片机学习笔记 1. 点亮一个LED灯单片机学习笔记 2. LED灯闪烁单片机学习笔记 3. LED灯流水灯单片机学习笔记 4. 蜂鸣器滴~滴~滴~单片机学习笔记 5. 数码管静态显示单片机学习笔记 6. 数码管动态显示单片机学习笔记 7. 独立键盘 目录 0、实现的…

华为IPD流程管理体系L1至L5最佳实践-解读

该文档主要介绍了华为IPD流程管理体系&#xff0c;包括流程体系架构、流程框架实施方法、各业务流程框架示例以及相关案例等内容&#xff0c;旨在帮助企业建立高效、规范的流程管理体系&#xff0c;实现业务的持续优化和发展。具体内容如下&#xff1a; 1. 华为流程体系概述 -…

BY组态-低代码web可视化组件

简介 BY组态是集实时数据展示、动态交互等一体的全功能可视化平台。帮助物联网、工业互联网、电力能源、水利工程、智慧农业、智慧医疗、智慧城市等场景快速实现数字孪生、大屏可视化、Web组态、SCADA等解决方案。具有实时监控、多样、变化、动态交互、高效、可扩展、支持自动…

创建HTTPS网站

每天&#xff0c;我们都会听到网络上发生身份盗窃和数据侵权的案例&#xff0c;这导致用户对自己访问的网站更加怀疑。他们开始更加了解自己将个人信息放在哪里以及信任哪些类型的网站。了解如何使网站使用HTTPS变得比以往任何时候都更加重要。 解读缩略词&#xff1a;HTTP与HT…

从0开始机器学习--Day29--K-means算法以及PCA降维作业

题目1&#xff1a;计算聚类中心点并对图像进行重构 代码&#xff1a; import numpy as np import scipy.io as sio import matplotlib.pyplot as plt from skimage import iodef find_data_type(X, centers):idx []# 聚类中心和样本点不再变化后每个样本点的的类for i in ra…

瀚海微SD NAND之SD 协议(34)1.8V信号的时序

固定数据窗口输出时序(SDR12、SDR25、SDR50) 固定数据窗口插卡输出时序如下图所示&#xff0c;SDR12、SDR25、SDR50的输出时序 有效窗口由输出延迟(topy)的最小值和最大值指定。 无论温度和电压如何变化&#xff0c;与SDCLK同步的有效数据窗口都是可用的。 输出有效窗口由t…