03-240605-Spark笔记

news/2024/9/24 21:24:27/

03-240605

1. 行动算子-1

  • reduce

聚合

格式:

def reduce(f: (T, T) => T): T

例子:

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)
​val rdd = sc.makeRDD(List(1,2,3,4))
​// TODO - 行动算子
​//reduceval i: Int = rdd.reduce(_+_)println(i)

输出结果:

10

  • collect

采集

格式:

def collect(): Array[T]

例子:

        // collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组val ints: Array[Int] = rdd.collect()println(ints.mkString(","))

输出结果:

1,2,3,4

  • count

计数

格式:

def count(): Long

例子:

        // count : 数据源中数据的个数val cnt = rdd.count()println(cnt)

运行结果:

4

  • first

获取数据源的第一个数据

格式:

def first(): T

例子:

        // first : 获取数据源中数据的第一个val first = rdd.first()println(first)

输出结果:

1

  • take

获取数据源的N个数据

格式:

def take(num: Int): Array[T]

例子:

        // take : 获取N个数据val ints: Array[Int] = rdd.take(3)println(ints.mkString(","))

输出结果:

1,2,3

  • takeOrdered

数据排序后.再取第N个数据

格式:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

例子:

        // takeOrdered : 数据排序后,取N个数据val rdd1 = sc.makeRDD(List(4,2,3,1))val ints1: Array[Int] = rdd1.takeOrdered(3)println(ints1.mkString(","))

输出结果:

1,2,3

  • aggregate

给定初始值,初始值参与分区内与分区间的计算

格式:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

例子:

        val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算val result = rdd.aggregate(10)(_+_._+_)println(result)

输出结果:

40

  • fold

折叠操作,aggregate的简化版操作

格式:

def fold(zeroValue: T)(op: (T, T) => T): T

例子:

        //10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算//val result = rdd.aggregate(10)(_+_, _+_)val result = rdd.fold(10)(_+_)println(result)

输出结果:

40

  • countByKey 与 countByValue

都是统计每种Key或者Value出现的个数

格式:

def countByKey(): Map[K, Long]

例子:

image-20240604213641365

        val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3)))//val intToLong: collection.Map[Int, Long] = rdd.countByValue()//println(intToLong)val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)

输出结果:

Map(a -> 3)

  • WordCount 不同的实现方式:

运用9种不同的方式实现WordCount

  1. 使用groupBy:

    // groupBydef wordcount1(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
  1. 使用groupByKey:

    // groupByKeydef wordcount2(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
  1. 使用reduceByKey:

    // reduceByKeydef wordcount3(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)}
  1. 使用aggregateByKey

    // aggregateByKeydef wordcount4(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)}
  1. 使用foldByKey:

    // foldByKeydef wordcount5(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)}
  1. 使用combineByKey:

    // combineByKeydef wordcount6(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.combineByKey(v=>v,(x:Int, y) => x + y,(x:Int, y:Int) => x + y)}
  1. 使用countByKey:

    // countByKeydef wordcount7(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: collection.Map[String, Long] = wordOne.countByKey()}
  1. 使用countByValue:

    // countByValuedef wordcount8(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordCount: collection.Map[String, Long] = words.countByValue()}
  1. 使用reduce:

    def wordcount91011(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))// 【(word, count),(word, count)】// word => Map[(word,1)]val mapWord = words.map(word => {mutable.Map[String, Long]((word,1))})val wordCount = mapWord.reduce((map1, map2) => {map2.foreach{case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1})println(wordCount)}

2. 序列化

算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。

  • RDD序列化

案例:

object Spark01_RDD_Serial {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))val search = new Search("h")//search.getMatch1(rdd).collect().foreach(println)search.getMatch2(rdd).collect().foreach(println)sc.stop()}// 查询对象// 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测class Search(query:String){def isMatch(s: String): Boolean = {s.contains(this.query)}// 函数序列化案例def getMatch1 (rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)}// 属性序列化案例def getMatch2(rdd: RDD[String]): RDD[String] = {val s = queryrdd.filter(x => x.contains(s))}}
}

输出结果:

image-20240605133427336

  • Kryo序列化框架

Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

了解一下就行

案例:

 def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SerDemo").setMaster("local[*]")// 替换默认的序列化机制.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")// 注册需要使用 kryo 序列化的自定义类.registerKryoClasses(Array(classOf[Searcher]))val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
"atguigu", "hahah"), 2)val searcher = new Searcher("hello")val result: RDD[String] = searcher.getMatchedRDD1(rdd)result.collect.foreach(println)}
}
case class Searcher(val query: String) {def isMatch(s: String) = {s.contains(query)}def getMatchedRDD1(rdd: RDD[String]) = {rdd.filter(isMatch) }def getMatchedRDD2(rdd: RDD[String]) = {val q = queryrdd.filter(_.contains(q))}

Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储

  • RDD 血缘关系

toDebugString查看血缘关系

image-20240605135251669

多个连续的RDD的依赖关系,称之为血缘关系

演示:

image-20240605135543501

关于如何将RDD间的关系保存下来:

image-20240605135759577

血缘关系演示:

image-20240605140042850

image-20240605140110991

image-20240605140124464

  • RDD的依赖关系

dependencies查看依赖关系

image-20240605140238899

OneToOne依赖(窄依赖)

image-20240605140706460

窄依赖我们形象的比喻为独生子女。

image-20240605141335525

Shuffle依赖(宽依赖):

image-20240605140820212

宽依赖我们形象的比喻为多生。

image-20240605141533442

  • RDD 阶级划分

image-20240605141721424

image-20240605142320693

  • RDD 任务划分

image-20240605142405972

源码演示:

image-20240605142747592

  • RDD 的持久化

这样的复用在底层不是很好用:

image-20240605143051395

image-20240605143137900

应该这样:

image-20240605143221932

image-20240605143241039

image-20240605143253432

放在内存中 mapRDD.cache()

放在磁盘中 mapRDD.persist()

Cache缓存:

image-20240605143410471

  • RDD CheckPoint 检查点

image-20240605143502870

image-20240605143516625

checkpoint 需要落盘,需要指定检查点保存路径

检查点路径保存的文件,当作业执行完毕后,不会被删除

一般保存路径都是在分布式存储系统: HDFS

  • checkpoint、Cache、Persist的区别:

以上三个都可以存储,关于他们的区别:

cache : 将数据临时存储在内存中进行数据重用

会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据

persist : 将数据临时存储在硬盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint : 将数据长久地保存在磁盘文件中进行数据重用

涉及到磁盘IO,性能较低,但是数据安全

为了保证数据安全,所以一般情况下,会独立执行作业

为了能够提高效率,一般情况下,是需要和cache联合使用

执行过程中,会切断血缘关系,重新建立新的血缘关系

checkpoint等同于改变数据源


http://www.ppmy.cn/news/1468709.html

相关文章

蓄电池MSDS报告办理 锂电池运输鉴定中英文报告申请

MSDS 指的是化学产品安全技术说明书 MSDS 报告一般是由工厂所出具的,但也逐渐的应用在各种贸易过程当中,在海运过程当中,相关的产品也需要提供 MSDS 认证报告,不过有些人对于 MSDS 认证所规定的内容不是很了解,接下来大…

Web前端答辩PPT模板:全面解析与高效呈现

Web前端答辩PPT模板:全面解析与高效呈现 在Web前端领域,一个优秀的答辩PPT模板不仅能够充分展示项目成果,还能有效传达技术实力和创新思维。本文将从四个方面、五个方面、六个方面和七个方面,详细解析如何打造一个高质量的Web前端…

腾讯云和windows11安装frp,实现内网穿透

一、内网穿透目的 实现公网上,访问到windows上启动的web服务 二、内网穿透的环境准备 公网服务器、windows11的电脑、frp软件(需要准备两个软件,一个是安装到公网服务器上的,一个是安装到windows上的) frp下载地址下载版本 1.此版本(老版…

智慧城市的规划与实施:科技引领城市运行效率新飞跃

随着信息技术的飞速发展,智慧城市的构想正逐步成为现实。作为地理信息与遥感领域的研究者,我深知在这一转型过程中,技术的创新与应用是提升城市运行效率的关键。本文旨在探讨如何利用地理信息系统(GIS)、遥感技术、大数…

WPF Frame应用 实现页面跳转

需求: 有一个F0View主页面入口,三个子页面(First.xaml/Second.xaml/Third.xaml)用Frame默认加载第一个页面 First.xaml。实现三个页面之间顺序跳转,并且每个页面只初始化一次。 实现: 1,将三…

通过 CartPole 游戏详细说明 PPO 优化过程

CartPole 介绍 在一个光滑的轨道上有个推车,杆子垂直微置在推车上,随时有倒的风险。系统每次对推车施加向左或者向右的力,但我们的目标是让杆子保持直立。杆子保持直立的每个时间单位都会获得 1 的奖励。但是当杆子与垂直方向成 15 度以上的…

Ceph入门到精通-Ceph OSD 磁盘在系统重启后无法识别处理步骤

如果Ceph OSD磁盘在系统重启后无法识别,你可以按照以下步骤进行检查和解决: 1. 检查硬件状态 物理检查:首先进行物理检查,确保磁盘没有物理损坏,数据线和电源线连接正常。S.M.A.R.T状态:使用smartctl命令检查磁盘的S.M.A.R.T状态,以确定是否有硬件问题。2. 确认磁盘识别…

Mongodb中字段的删除

学习mongodb,体会mongodb的每一个使用细节,欢迎阅读威赞的文章。这是威赞发布的第61篇mongodb技术文章,欢迎浏览本专栏威赞发布的其他文章。 本篇文章,探讨UPDATE中的操作符$unset。Mongodb数据插入后,开发人员使用$u…