Spark-广播变量源码分析

news/2024/9/22 13:57:04/

一、广播变量使用

源码中给的例子是:org.apache.spark.examples.BroadcastTest

其中我们关心的只有两行代码,即创建广播变量和使用广播变量

//准备测试数据
val arr1 = (0 until num).toArray
//创建广播变量
val barr1 = sc.broadcast(arr1)
//使用广播变量
val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)

二、创建广播变量

为了思路清晰,和主线关联不大的代码会剔除掉

1、SparkContext

  private[spark] def env: SparkEnv = _envdef broadcast[T: ClassTag](value: T): Broadcast[T] = {//isLocal 是一个 Boolean 类型//当设置master 为 local 或者 local[n] 时 为trueval bc = env.broadcastManager.newBroadcast[T](value, isLocal)bc}

2、BroadcastManager

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {//计算 BroadcastId val bid = nextBroadcastId.getAndIncrement()//三个参数 :1、数据 2、是否local 3、广播id//最终需要构造一个 TorrentBroadcast 对象broadcastFactory.newBroadcast[T](value_, isLocal, bid)}

3、TorrentBroadcast

机制如下:

Driver 将序列化对象划分为小块,并将这些块存储在Driver 的BlockManager中。

在每个Executor上首先尝试从其BlockManager中获取对象。如果获取不到,则使用远程获取(从Driver 和/或 其他Executor(如果可用)获取小块)。一旦获取成功,就会将块放入自己的BlockManager中,并准备好供其他Executor从中获取。

这可以防止Driver成为发送多个广播数据副本(每个Executor一个)的瓶颈。

private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)extends Broadcast[T](id) with Logging with Serializable {//广播对象的软引用//它通过从 Driver 和/或其他 Executor 读取块来构建此值。@transient private var _value: SoftReference[T] = _//将广播id 封装成 广播块idprivate val broadcastId = BroadcastBlockId(id)//构建该对象时就执行 writeBlocksprivate val numBlocks: Int = writeBlocks(obj)//每个区块的大小。默认值为4MB。此值仅由 broadcaster 读取@transient private var blockSize: Int = _private def setConf(conf: SparkConf): Unit = {//spark.broadcast.compress 默认 true//是否在发送广播变量之前对其进行压缩 , 通常都需要压缩 compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {//最终会找 spark.io.compression.codec   默认  lz4//用于压缩内部数据的编解码器,如RDD分区、事件日志、广播变量和洗牌输出。//默认情况下,Spark提供四种编解码器:lz4、lzf、snappy和zstd。还可以使用完全限定的类名来指定编解码器//‌LZ4压缩算法以其极高的压缩和解压缩速度而闻名,但压缩比并不突出。‌Some(CompressionCodec.createCodec(conf))} else {None}//spark.broadcast.blockSize   默认 4m//TorrentBroadcastFactory每块块的大小,//值太大会降低广播过程中的并行性(使其变慢);//但是,如果它太小,BlockManager的性能可能会受到影响blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024//spark.broadcast.checksum  默认 true//是否启用广播校验和。//如果启用,广播将包括一个校验和,这可以帮助检测损坏的块,//但代价是计算和发送更多的数据。如果网络有其他机制来保证数据在广播过程中不会损坏,则可以禁用它checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)}setConf(SparkEnv.get.conf)//将对象划分为多个块,并将这些块放入块管理器中。private def writeBlocks(value: T): Int = {import StorageLevel._// 在Driver中存储广播变量的副本,这样在Driver上运行的Task就不会创建广播变量值的重复副本。//因为是 Driver 上的Task 用因此没有序列化,是直接放到了内存和磁盘val blockManager = SparkEnv.get.blockManagerif (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {throw new SparkException(s"Failed to store $broadcastId in BlockManager")}//如果 blockManager 放成功了 继续向下执行try {//获取支持此ChunkedByteBuffer的ByteBuffers的副本。val blocks =TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)//构建检验和if (checksumEnabled) {checksums = new Array[Int](blocks.length)}blocks.zipWithIndex.foreach { case (block, i) =>//计算每个块的校验和if (checksumEnabled) {checksums(i) = calcChecksum(block)}//为每个块计算一个idval pieceId = BroadcastBlockId(id, "piece" + i)//只读字节缓冲区,物理上存储为多个块,而不是单个连续数组val bytes = new ChunkedByteBuffer(block.duplicate())//调用blockManager 存储每个块 存储级别时 内存和磁盘 并序列化 (因为要分发,因此要序列化)if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {throw new SparkException(s"Failed to store $pieceId of $broadcastId " +s"in local BlockManager")}}blocks.length} catch {}}}

三、使用广播变量

1、Broadcast

  def value: T = {getValue()}//调用 TorrentBroadcast 获取protected def getValue(): T

2、TorrentBroadcast

  //广播对象的软引用//它通过从 Driver 和/或其他 Executor 读取块来构建此值。@transient private var _value: SoftReference[T] = _override protected def getValue() = synchronized {//如果有就直接获取,如果没有拉过来再获取val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.getif (memoized != null) {memoized} else {val newlyRead = readBroadcastBlock()_value = new SoftReference[T](newlyRead)newlyRead}}private def readBroadcastBlock(): T = Utils.tryOrIOException {TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {//由于我们只根据“broadcastId”进行锁定,因此每当使用“broadcast Cache”时,我们应该只触摸“broadcastId”。//获取 broadcastManager 的缓存 因为存的时候选择的级别是  MEMORY_AND_DISK_SERval broadcastCache = SparkEnv.get.broadcastManager.cachedValues//根据 broadcastId 从broadcastCache 获取块Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {setConf(SparkEnv.get.conf)val blockManager = SparkEnv.get.blockManager//从本地块管理器获取块作为Java对象的迭代器。blockManager.getLocalValues(broadcastId) match {case Some(blockResult) =>if (blockResult.data.hasNext) {val x = blockResult.data.next().asInstanceOf[T]releaseBlockManagerLock(broadcastId)//如果从本地块管理取到了值,就将值返回,并放到缓存中if (x != null) {broadcastCache.put(broadcastId, x)}x} else {throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")}case None =>//本地块管理中没有该广播id对应的块数据val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +s"(estimated total size $estimatedTotalSize)")val startTimeNs = System.nanoTime()//从Driver和/或其他Executor获取torrent块//请注意,所有这些块都存储在BlockManager中并报告给Driver,因此其他Executor也可以从该Executor中提取这些块。val blocks = readBlocks()logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")try {val obj = TorrentBroadcast.unBlockifyObject[T](blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)// 将合并的副本存储在BlockManager中,这样此Executor上的其他Task就不需要重新获取它。val storageLevel = StorageLevel.MEMORY_AND_DISK//写一个由单个对象组成的块 ,该Executor其他Task使用时直接全量获取广播变量if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {throw new SparkException(s"Failed to store $broadcastId in BlockManager")}if (obj != null) {broadcastCache.put(broadcastId, obj)}obj} finally {blocks.foreach(_.dispose())}}}}}

四、总结

创建广播变量

        1、创建广播变量id

        2、创建TorrentBroadcast

        3、TorrentBroadcast会自动调用writeBlocks将数据写入到BlockManager(这里会写两份数据,一份是整体的结果用于本Executor使用广播变量时使用,存储级别是MEMORY_AND_DISK,一份是拆分成块用于其他Executor拉取使用,存储级别是MEMORY_AND_DISK_SER)

使用广播变量

        1、因为本身广播变量的类型就是TorrentBroadcast,因此直接调用其getValue方法获取

        2、查看本类_value属性是否有值,如果有值直接返回(加速同Task反复调用)

        3、查看缓存是否有值(BroadcastManager.cachedValues)如果有值直接返回(加速同Task反复调用)

        4、查看本Executor的BlockManager是否有该广播变量整体的数据,如果有值直接返回(加速同Executor的不同Task使用)

        5、从Driver或者其他Excutor获取该广播变量的块序列数据

        如果获取到广播变量的数据都会向本类_value属性、缓存、本Executor的BlockManager设置或存入整体数据用于重复使用时加速

广播变量的创建以及使用流程如下:(下载放大后就很清晰哟)


http://www.ppmy.cn/news/1528841.html

相关文章

python selenium网页操作

一、安装依赖 pip install -U seleniumselenium1.py: from selenium import webdriver from selenium.webdriver.common.by import Bydriver webdriver.Chrome() driver.get("https://www.selenium.dev/selenium/web/web-form.html") title driver.ti…

标准库标头 <bit>(C++20)学习

<bit>头文件是数值库的一部分。定义用于访问、操作和处理各个位和位序列的函数。例如&#xff0c;有函数可以旋转位、查找连续集或已清除位的数量、查看某个数是否为 2 的整数幂、查找表示数字的最小位数等。 类型 endian (C20) 指示标量类型的端序 (枚举) 函数 bit_ca…

微服务注册中⼼2

5.Nacos配置管理 Nacos除了可以做注册中⼼&#xff0c;同样可以做配置管理来使⽤ 5.1 统⼀配置管理 当微服务部署的实例越来越多&#xff0c;达到数⼗、数百时&#xff0c;逐个修改微服务配置就会让⼈抓狂&#xff0c;⽽且很容易出错。我们需要⼀种统⼀配置管理⽅案&#xf…

基于单片机的楼宇门禁系统的设计与实现

文章目录 前言资料获取设计介绍功能介绍设计程序具体实现截图参考文献设计获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设…

【数据结构】排序算法---计数排序

文章目录 1. 定义2. 算法步骤3. 动图演示4. 性质5. 算法分析6. 代码实现C语言PythonJavaGo 结语 1. 定义 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用。计数排序不是基于比较的排序算法&#xff0c;其核心在于将输入的数据值转化为键存储在额外开辟的数组…

基于stm32的四旋翼无人机控制系统设计系统设计与实现

文章目录 前言资料获取设计介绍功能介绍设计程序 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主要对象是咱们电子相关专业…

Gitlab及Git使用说明

目 录 1 Gitlab及Git介绍说明 5 1.1 什么是 Gitlab 5 1.2 什么是Git 5 1.3 Git 家族成员 5 1.4 Gitlab版本 5 1.5 Gitlab 优势 5 1.6 Gitlab 主要服务构成 6 1.7 Gitlab 简单工作流程 6 1.8 Gitlab用户角色 6 2 Gitlab安装与使用 7 2.1 Gitlab安装说明&#xff08;管理员&#…

网络安全中GET和POST区别在哪?

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 《Java代码审…