Spark 共享变量:广播变量与累加器解析

server/2024/11/14 20:44:31/

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

PySpark 数据处理实战:从基础操作到案例分析-CSDN博客

Spark 的容错机制:保障数据处理的稳定性与高效性-CSDN博客

目录

一、需求背景

二、广播变量(Broadcast Variables)

(一)功能

(二)语法 / 用法

(三)示例代码修改

(四)本质与优势

三、累加器(Accumulators)

(一)需求示例

(二)原理与功能

(三)使用方法与示例代码修改

四、总结


        在 Spark 大数据处理框架中,共享变量是一个非常重要的概念。当我们处理一些涉及到不同计算节点(Executor)需要访问相同数据的场景时,共享变量就发挥了关键作用。本文将深入探讨 Spark 中的广播变量和累加器,包括它们的使用场景、原理以及如何在实际代码中应用。

一、需求背景

        假设我们有一份用户数据(user.tsv),其中包含用户的一些基本信息如用户 id、用户名、年龄和城市 id,同时我们还有一个城市字典(city_dict),它存储了城市 id 与城市名称的对应关系。我们的目标是将这两份数据进行处理,得到包含用户完整信息(用户 id、用户名、年龄、城市 id、城市名称)的结果集。

user.tsv数据如下

user001 陆家嘴 18 2
user002 羊毛 20 5
user003 爱丽丝 22 6
user004 蒸饭 24 8
user005 淘米 26 1
user006 小笼包 28 7
user007 凉粉 30 4
user008 泡腾片 25 10
user009 炒米 27 3
user010 颖火虫 29 9

 city中的字典如下

python">city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}

示例代码如下

python">import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)fileRdd = sc.textFile("../datas/user.tsv")city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}def getLine(line):list01 = line.split(" ")cityName = city_dict.get(int(list01[3]))# print(cityName)return line + " " + cityNamemapRdd = fileRdd.map(getLine)mapRdd.foreach(print)# 使用完后,记得关闭sc.stop()

结果如下

python">user007 凉粉 30 4 深圳
user008 泡腾片 25 10 成都
.....

        在 Spark 中,user_rdd 的计算处理在 Executor 中进行,而 city_dict 的数据存储在 Driver 的内存中。这就引发了一个问题:计算过程中每个 Task 是如何获取 city_dict 的数据呢?如果 city_dict 的数据量很大(例如 1G),每个 Task 都要从 Driver 中下载一份(假设存在多个 Task 导致总下载量达到 6G),那么网络传输的开销将非常大,性能会变得很差。

二、广播变量(Broadcast Variables)

(一)功能

        广播变量的主要功能就是将一个变量元素广播到每台 Worker 节点的 Executor 中。这样一来,每个 Task 就可以直接从本地读取数据,从而大大减少网络传输的 I/O。

(二)语法 / 用法

在 Spark 中使用广播变量,首先需要创建一个广播变量对象。例如:

python">broadcastValue = sc.broadcast(city_dict)

        这里的 sc 是 SparkContext 对象,city_dict 是我们想要广播的数据(在这个例子中是城市字典)。创建广播变量后,在需要使用该数据的地方,可以通过 broadcastValue.value 来获取广播的数据。

此链接是官方给的API文档:RDD Programming Guide - Spark 3.5.3 Documentation

(三)示例代码修改

        在我们的用户数据处理示例中,原始代码在处理每个用户数据行时,需要获取对应的城市名称。修改后的代码如下:

python">import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)fileRdd = sc.textFile("../datas/user.tsv",2)city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}# 将一个变量广播出去,广播到executor中,不是task中city_dict_broad = sc.broadcast(city_dict)def getLine(line):list01 = line.split(" ")#cityName = city_dict.get(int(list01[3]))# 使用广播变量的变量获取数据cityName = city_dict_broad.value.get(int(list01[3]))# print(cityName)return line + " " + cityNamemapRdd = fileRdd.map(getLine)mapRdd.foreach(print)# 释放广播变量city_dict_broad.unpersist()# 使用完后,记得关闭sc.stop()

(四)本质与优势

广播变量本质上是一种优化手段。它的优势主要体现在两个方面:

  1. 减少数据传输量:通过广播一个 Driver 中较大的数据,可以减少每次从 Driver 复制的数据量,降低网络 I/O 损耗,从而提高整体性能。
  2. 优化表连接:在两张表进行 Join 操作时,如果一张表较小,可以将小表进行广播,然后与大表的每个部分进行 Join,这样就可以避免 Shuffle Join(Reduce Join),进一步提升性能。

需要注意的是,广播变量是只读变量,不能被修改

三、累加器(Accumulators)

(一)需求示例

        假设我们有搜狗日志的数据,现在需要统计 10 点搜索的数据一共有多少条。如果按照常规的方式编写代码,可能会出现问题。例如:

python">import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \.filter(lambda line:len(re.split("\s+",line)) == 6) \.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序_sum = 0def sumTotalLine(tuple1):global _sum # 把_sum 设置为全局变量timeStr = tuple1[0] # 10:19:18if timeStr[0:2] == '10':_sum += 1mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(_sum) # 结果是0# 使用完后,记得关闭

        在 Spark 中,上述代码最终结果会是 0。因为 sum = 0 是在 Driver 端的内存中的,Executor 中程序对其进行累加操作并不能改变 Driver 端的结果。

(二)原理与功能

        累加器的功能是实现分布式的计算。它在每个 Task 内部构建一个副本进行累加,并且在最后返回每个 Task 的结果并进行合并。

官方API截图

(三)使用方法与示例代码修改

在 Spark 中使用累加器,首先需要创建一个累加器对象:

python">accumulator = sc.accumulator(0)

然后在需要进行计数累加的地方使用 accumulator.add(1)。例如:

python">def getLines(line, accumulator):accumulator.add(1)# 对用户数据 RDD 进行处理并统计数据量
fileRdd.foreach(lambda line: getLines(line, accumulator))

最后可以通过 accumulator.value 获取累加的结果。完整代码如下:

python">import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)accCounter = sc.accumulator(0)mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \.filter(lambda line:len(re.split("\s+",line)) == 6) \.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序#_sum = 0def sumTotalLine(tuple1):#global _sum # 把_sum 设置为全局变量timeStr = tuple1[0] # 10:19:18if timeStr[0:2] == '10':accCounter.add(1)mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(accCounter.value) # 104694# 假如我不知道累加器这个操作,这个题目怎么做?print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())# 使用完后,记得关闭sc.stop()

四、总结

        Spark 中的广播变量和累加器是处理分布式计算中共享数据问题的有效工具。广播变量主要用于在多个 Task 之间共享只读数据,减少网络传输开销;累加器则用于实现分布式环境下的计数或累加操作,确保在不同 Task 中的计算结果能够正确地合并到 Driver 端。在实际的 Spark 大数据处理项目中,合理地运用广播变量和累加器能够显著提高程序的性能和计算的准确性。

        希望通过本文的介绍,读者能够对 Spark 中的共享变量有更深入的理解,并能够在自己的项目中熟练运用广播变量和累加器来优化数据处理流程。


http://www.ppmy.cn/server/141221.html

相关文章

当一个Java线程进入一个对象的一个 synchronized 方法后,其它线程是否可进入此对象的其它方法?

大家好,我是锋哥。今天分享关于【当一个Java线程进入一个对象的一个 synchronized 方法后,其它线程是否可进入此对象的其它方法?】面试题。希望对大家有帮助; 当一个Java线程进入一个对象的一个 synchronized 方法后,其它线程是否…

AI赋能·创新视界——冠捷科技集团亮相第七届中国国际进口博览会

摘要:展示创新技术、前沿应用及一体化方案生态,助推数智化产业蓬勃发展! 新时代,共享未来;新风貌,全球共进!11月5日,第七届中国国际进口博览会于国家会展中心(上海)盛大启幕。此次进博会展览展示面积超42万平方米,152个国家、地区和国际组织参加国家展和企业展,集中展示400多项…

嵌入式学习-网络高级-Day01

嵌入式学习-网络高级-Day01 【1】Modbus协议 起源 分类 优势 应用场景 【2】Modbus TCP 特点 组成 报文头:7个字节 寄存器(存储数据) 功能码 总结 练习 【3】工具安装 Modbus Slave、Poll安装 网络调试助手 wireshark 练习 【1】Modbus协议 起…

浔川 AI 翻译 v5.0 上线时间相关公告

浔川 AI 翻译 v5.0 上线时间相关公告 尊敬的用户: 我们非常高兴地向大家宣布,浔川 AI 翻译 v5.0 存在提前上线的可能性! 目前预计的上线时间为 2024 年 11 月 16 日。 不过需要注意的是,由于开发和测试过程中可能会遇到各种不可预…

零基础Java第十五期:抽象类接口(一)

目录 一、抽象类 1.1. 抽象的概念 1.2. 抽象类语法 1.3. 抽象类的特性 1.4. 图形类例子 二、 接口 2.1. 接口的概念 2.2. 语法规则 2.3. 接口的特性 2.4. 接口的使用 2.5. 实现多个接口 2.6. 工作当中常用的接口 一、抽象类 1.1. 抽象的概念 如果 一个类中没…

推荐一款功能强大的视频修复软件:Apeaksoft Video Fixer

Apeaksoft Video Fixer是一款功能强大的视频修复软件,专门用于修复损坏、不可播放、卡顿、画面失真、黑屏等视频问题。只需提供一个准确且有效的样本视频作为参考,该软件就能将受损视频修复到与样本视频相同的质量。该软件目前支持MP4、MOV、3GP等格式的…

选择适合你的报表工具,山海鲸报表与Tableau深度对比

在数据分析和报表制作的领域,企业往往面临着选择合适工具的难题。尤其是当市场上有很多功能强大的工具时,如何从中挑选出最适合自己需求的报表软件成为了一个关键问题。今天,我们将对比两款报表工具——山海鲸报表和Tableau,看看它…

《现代网络技术》读书笔记:SDN数据平面和OpenFlow

本文部分内容来源于《现代网络技术:SDN,NFV,QoE、物联网和云计算:SDN,NFV,QoE,IoT,andcloud》 SDN数据平面 SDN 数据平面也称为基础设施层,而在ITU-T的Y3300标准中则称为资源层,它是网络转发设备根据 SDN控制平面的决策来执行数据…