广播变量在spark中的用法以及数据倾斜问题的解决方法

embedded/2024/9/23 1:56:04/
1. spark中的广播变量
  • 应用场景:广播变量用于在集群的各个节点的executor 中高效的分发一个只读的变量副本

  • 操作原理:创建一个广播变量时,spark会将变量序列化并发送到每一个executor,每一个executor存一个副本,而不需要每次执行任务重新重driver或其他节点来获取,spark任务涉及到对该变量的访问时就只需要从本地executor内存中来获取即可,避免了在网络中频繁的传输大量重复的数据

  • 目的: 就是为了减少冗余数据在网络中的传输,提升了分布式计算环境下的访问速度和整体的性能

  • 广播变量适合场景: 那些需要在多个任务中共享, 且数据量适中可以存在executor内存中的情况

2. 广播join (map-side join)  在map task 本地解决join
  • 应用场景: 在处理大规模的join操作时,其中一方的数据集明显较小,就是所谓的小表就可以通过广播join, 将小的数据集加载到每一个map task的executor的内存中,那种join操作中某些key对应的键的record数量远大于其他key的record数量,导致在reducer task 严重的负载不均衡问题

  • 操作原理:广播join 发生在map端非reduce 端无需通过网络传输shuffle就可以到达reduce 端,减少I/O,hive将小表的数据全部加载到map task 的内存中作为广播变量,然后对每一条大表的record ,在本地内存中查找是否有匹配的小表record ,并立即完成join操作,

    由于每个join 在每个map task内部独立完成 ,无需经过网络shuffle 就可以到达reduce 端,进而大大减少了网络传输和磁盘i/o,消除了因为数据倾斜造成的

  • 目的主要目的是利用小表数据可以被广播到所有的map task 内存的特点 提前在map 端完成了join 操作,避免了常规的join操作中需要的昂贵的shuffle过程,特别是适合于数据倾斜问题,显著提高了join的操作效率

    这种策略的前提是小表的数据必须要足够小,能够被全部加载到每个节点的内存中

  • 代码:

        def main(args: Array[String]): Unit = {/*** rdd 处理数据倾斜-广播join*/val spark = SparkSession.builder().appName("MapSideBroadcastJoin").getOrCreate()// 假设已经读取了大表和小表的数据,并转换为RDD[(String, String)]// 大表val bigTable: RDD[(String, String)] = spark.sparkContext.textFile("hdfs://path/to/big_table.csv").map(line => line.split(",")).map(arr => (arr(0), arr(1)))// 小表val smallTable: RDD[(String, String)] = spark.sparkContext.textFile("hdfs://path/to/small_table.csv").map(line => line.split(",")).map(arr => (arr(0), arr(1)))// 将小表转换为本地集合并广播val smallTableBroadcast: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(smallTable.collectAsMap().toMap)// 定义Map函数,执行Map-Side JOINdef joinWithSmallTable(bigRecord: (String, String)): Option[(String, (String, String))] = {smallTableBroadcast.value.get(bigRecord._1).map { smallValue =>(bigRecord._1, (bigRecord._2, smallValue))}}// 应用Map函数,过滤掉没有匹配的小表记录val joinedData: RDD[(String, (String, String))] = bigTable.flatMap(joinWithSmallTable)// 打印或进一步处理joinedDatajoinedData.foreach(println)spark.stop()

3 . reduce_side join

都是基于优化分布式计算思想

关于使用广播join  使用map-side 函数在map端进行merage 在本地executor 中完成join过程

减少网络传输, 以及减少磁盘的I/O


http://www.ppmy.cn/embedded/3564.html

相关文章

使用 Docker 部署 instantbox 轻量级 Linux 系统

1)instantbox 介绍 GitHub:https://github.com/instantbox/instantbox instantbox 是一款非常实用的项目,它能够让你在几秒内启动一个主流的 Linux 系统,随起随用,支持 Ubuntu,CentOS, Arch Li…

Jmeter 测试Dubbo接口-实例

1、Dubbo插件准备 ①把jmeter-plugins-dubbo-2.7.4.1-jar-with-dependencies.jar包放在D:\apache-jmeter-5.5\lib\ext目录 ②重新打开Jmeter客户端 在线程组-添加-取样器-dubbo simple,添加dubbo接口请求 2、Jmeter测试lottery接口 ①配置zookeeper参数 由于dub…

K8s: 在Pod中将configmap数据注入容器

configMap 概述 文档: https://kubernetes.io/zh-cn/docs/concepts/configuration/configmap/ Kubernetes 为我们提供了 ConfigMap,可以方便的配置一些变量 是一个存储键值对 key-value 对象的 创建一个可以包含多个键值对的 ConfigMap, 以下是:mul-c…

OpenHarmony实战开发-如何使用text组件的enableDataDetector属性实现文本特殊文字识别。

介绍 本示例介绍使用text组件的enableDataDetector属性实现文本特殊文字识别。 效果图预览 使用说明 1.进入页面,输入带有特殊文字的信息并发送,对话列表中文本会自动识别并标识特殊文字。目前支持识别的类型包括电话号码、链接、邮箱和地址&#xff…

【鸿蒙开发】第二十一章 Media媒体服务(二)--- 音频播放和录制

1 AVPlayer音频播放 使用AVPlayer可以实现端到端播放原始媒体资源,本开发指导将以完整地播放一首音乐作为示例,向开发者讲解AVPlayer音频播放相关功能。 以下指导仅介绍如何实现媒体资源播放,如果要实现后台播放或熄屏播放,需要…

零基础小白如何自学网络安全(入门)

一、为什么选择网络安全? 这几年随着我国《国家网络空间安全战略》《网络安全法》《网络安全等级保护2.0》等一系列政策/法规/标准的持续落地,网络安全行业地位、薪资随之水涨船高。 未来3-5年,是安全行业的黄金发展期,提前踏入…

Python5种算法回溯+剪枝、字典序、递归交换、计数回溯、迭代法 实现全排列ll【力扣题47】

题目描述 给定一个可能包含重复数字的数组 nums,返回所有可能的唯一全排列。你可以以任意顺序返回答案。 LeetCode 题目 47 “全排列 II” 与题目 46 “全排列” 的主要区别在于输入数组中的元素是否可以包含重复数字。这个差异对解题策略和算法实现有重大影响。 …

Python 网络与并发编程(二)

文章目录 线程Thread创建方式join()守护线程全局锁GIL问题线程同步和互斥锁死锁信号量事件(Event)生产者和消费者模式 线程Thread 创建方式 Python的标准库提供了两个模块: _thread 和threading , _thread 是低级模块, threading 是高级模块…