【大数据学习 | Spark-Core】Spark中的join原理

embedded/2024/11/27 16:21:52/

join是两个结果集之间的链接,需要进行数据的匹配。

演示一下join是否存在shuffle。

1. 如果两个rdd没有分区器,分区个数一致

,会发生shuffle。但分区数量不变。

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala> val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res116: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[108] at makeRDD at <console>:27scala> sc.makeRDD(arr1,3)
res117: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[109] at makeRDD at <console>:27scala> res116 join res117
res118: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[112] at join at <console>:28scala> res118.collect
res119: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

2. 如果分区个数不一致,有shuffle,且产生的rdd的分区个数以多的为主。

3. 如果分区个数一样并且分区器一样,那么是没有shuffle的

scala> sc.makeRDD(arr,3)
res128: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[118] at makeRDD at <console>:27scala> sc.makeRDD(arr1,3)
res129: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[119] at makeRDD at <console>:27scala> res128.reduceByKey(_+_)
res130: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[120] at reduceByKey at <console>:26scala> res129.reduceByKey(_+_)
res131: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[121] at reduceByKey at <console>:26scala> res130 join res131
res132: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[124] at join at <console>:28scala> res132.collect
res133: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala> res132.partitions.size
res134: Int = 3

4. 都存在分区器但是分区个数不同,也会存在shuffle

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala>  val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:27scala> sc.makeRDD(arr1,4)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:27scala> res0.reduceByKey(_+_)
res2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:26scala> res1.reduceByKey(_+_)
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:26scala> res2 join res3
res4: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[6] at join at <console>:28scala> res4.collect
res5: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala> res4.partitions.size
res6: Int = 4

这里为啥stage3里reduceByKey和join过程是连在一起的,因为分区多的RDD是不需要进行shuffle的,数据该在哪个分区就在哪个分区,反而是分区少的RDD要进行join,要进行数据的打散。

分区以多的为主。

5. 一个带有分区器一个没有分区器,那么以带有分区器的rdd分区数量为主,并且存在shuffle

scala> arr
res7: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala> arr1
res8: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27scala> sc.makeRDD(arr,4)
res10: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:27scala> res9.reduceByKey(_+_)
res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26scala> res10 join res11
res12: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:28scala> res12.partitions.size
res13: Int = 3scala> res12.collect
res14: Array[(String, (Int, Int))] = Array((zhangsan,(300,300)), (wangwu,(350,350)), (lisi,(400,400)), (zhaosi,(450,450)))

同理,stage6的reduceByKey过程和join过程是连在一起的,是因为有分区器的RDD并不需要进行shuffle操作,原来的数据该在哪在哪,而没有分区器的RDD要进行join要进行数据的打散,有shuffle过程,所以有stage4到stage6的连线。


http://www.ppmy.cn/embedded/140949.html

相关文章

使用Python实现智能食品安全追溯系统的深度学习模型

食品安全一直是社会关注的重大问题,尤其在全球化供应链日益复杂的今天,食品安全追溯系统显得尤为重要。通过智能食品安全追溯系统,可以有效追溯食品来源、流通路径,及时发现和处理食品安全问题。本文将详细介绍如何使用Python构建一个智能食品安全追溯系统的深度学习模型,…

Rust学习笔记_01——基础

文章目录 1. Hello world2. 变量1. 变量声明2. 类型推断3. 变量绑定4. 重绑定&#xff08;Shadowing&#xff09; 3. 值4. 算术4.1 基础运算4.2 整数 && 浮点数运算4.3 标准库函数std::cmp 模块num_traits 模块 4.4 注意事项 5. 类型推导5.1 工作原理5.2 限制5.3 类型注…

UE5 Create Dynamic Material Instance(创建动态材质实例) 概述

在 Unreal Engine 5 (UE5) 中&#xff0c;Create Dynamic Material Instance 节点用于创建材质的动态实例。动态材质实例是基于某个已有的材质或材质实例的副本&#xff0c;并允许在运行时修改其属性&#xff08;例如颜色、纹理、参数等&#xff09;。这个节点在游戏开发中非常…

TCP/IP 协议:网络世界的基石(2/10)

一、引言 在当今数字化时代&#xff0c;互联网已经成为人们生活中不可或缺的一部分。而在互联网的背后&#xff0c;TCP/IP 协议扮演着至关重要的角色&#xff0c;堪称互联网的基石。 TCP/IP 协议是一组用于数据通信的协议集合&#xff0c;它的名字来源于其中最重要的两个协议…

深度学习-47-大语言模型LLM之常用的大模型微调框架选择建议

文章目录 1 微调框架1.1 LLaMA-Factory1.1.1 模块化设计(简化微调与推理流程)1.1.2 多硬件支持(适应不同环境需求)1.1.3 量化技术(降低内存占用提升推理速度)1.2 Unsloth1.2.1 加速微调(大幅提升微调速度)1.2.2 降低内存使用(突破资源限制)1.2.3 广泛的模型支持(兼容多种主流LL…

文件的处理(c语言)

首先了解下文件的作用 文件可以把数据直接放在电脑的硬盘上&#xff0c;实现了数据的持久化 什么是文件 文件就是磁盘上的文件。在程序设计中&#xff0c;文件通常有俩种&#xff0c;一种是程序文件&#xff0c;另一种是数据文件&#xff08;这是从文件功能来分类的&#xff…

如何寻找适合的HTTP代理IP资源?

一、怎么找代理IP资源&#xff1f; 在选择代理IP资源的时候&#xff0c;很多小伙伴往往将可用率作为首要的参考指标。事实上&#xff0c;市面上的住宅IP或拨号VPS代理IP资源&#xff0c;其可用率普遍在95%以上&#xff0c;因此IP可用率并不是唯一的评判标准 其实更应该关注的…

C++设计模式之组合模式中适用缓存机制提高遍历与查找速度

在组合设计模式中&#xff0c;为了提高反复遍历和查找的速度&#xff0c;可以引入缓存机制。缓存机制可以通过存储已经遍历过的子组件或计算过的结果来减少重复操作的开销。以下是一个示例&#xff0c;展示了如何在组合模式中使用缓存机制来提高性能。 示例&#xff1a;组合设…