一、广播变量使用
源码中给的例子是:org.apache.spark.examples.BroadcastTest
其中我们关心的只有两行代码,即创建广播变量和使用广播变量
//准备测试数据
val arr1 = (0 until num).toArray
//创建广播变量
val barr1 = sc.broadcast(arr1)
//使用广播变量
val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)
二、创建广播变量
为了思路清晰,和主线关联不大的代码会剔除掉
1、SparkContext
private[spark] def env: SparkEnv = _envdef broadcast[T: ClassTag](value: T): Broadcast[T] = {//isLocal 是一个 Boolean 类型//当设置master 为 local 或者 local[n] 时 为trueval bc = env.broadcastManager.newBroadcast[T](value, isLocal)bc}
2、BroadcastManager
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {//计算 BroadcastId val bid = nextBroadcastId.getAndIncrement()//三个参数 :1、数据 2、是否local 3、广播id//最终需要构造一个 TorrentBroadcast 对象broadcastFactory.newBroadcast[T](value_, isLocal, bid)}
3、TorrentBroadcast
机制如下:
Driver 将序列化对象划分为小块,并将这些块存储在Driver 的BlockManager中。
在每个Executor上首先尝试从其BlockManager中获取对象。如果获取不到,则使用远程获取(从Driver 和/或 其他Executor(如果可用)获取小块)。一旦获取成功,就会将块放入自己的BlockManager中,并准备好供其他Executor从中获取。
这可以防止Driver成为发送多个广播数据副本(每个Executor一个)的瓶颈。
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)extends Broadcast[T](id) with Logging with Serializable {//广播对象的软引用//它通过从 Driver 和/或其他 Executor 读取块来构建此值。@transient private var _value: SoftReference[T] = _//将广播id 封装成 广播块idprivate val broadcastId = BroadcastBlockId(id)//构建该对象时就执行 writeBlocksprivate val numBlocks: Int = writeBlocks(obj)//每个区块的大小。默认值为4MB。此值仅由 broadcaster 读取@transient private var blockSize: Int = _private def setConf(conf: SparkConf): Unit = {//spark.broadcast.compress 默认 true//是否在发送广播变量之前对其进行压缩 , 通常都需要压缩 compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {//最终会找 spark.io.compression.codec 默认 lz4//用于压缩内部数据的编解码器,如RDD分区、事件日志、广播变量和洗牌输出。//默认情况下,Spark提供四种编解码器:lz4、lzf、snappy和zstd。还可以使用完全限定的类名来指定编解码器//LZ4压缩算法以其极高的压缩和解压缩速度而闻名,但压缩比并不突出。Some(CompressionCodec.createCodec(conf))} else {None}//spark.broadcast.blockSize 默认 4m//TorrentBroadcastFactory每块块的大小,//值太大会降低广播过程中的并行性(使其变慢);//但是,如果它太小,BlockManager的性能可能会受到影响blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024//spark.broadcast.checksum 默认 true//是否启用广播校验和。//如果启用,广播将包括一个校验和,这可以帮助检测损坏的块,//但代价是计算和发送更多的数据。如果网络有其他机制来保证数据在广播过程中不会损坏,则可以禁用它checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)}setConf(SparkEnv.get.conf)//将对象划分为多个块,并将这些块放入块管理器中。private def writeBlocks(value: T): Int = {import StorageLevel._// 在Driver中存储广播变量的副本,这样在Driver上运行的Task就不会创建广播变量值的重复副本。//因为是 Driver 上的Task 用因此没有序列化,是直接放到了内存和磁盘val blockManager = SparkEnv.get.blockManagerif (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {throw new SparkException(s"Failed to store $broadcastId in BlockManager")}//如果 blockManager 放成功了 继续向下执行try {//获取支持此ChunkedByteBuffer的ByteBuffers的副本。val blocks =TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)//构建检验和if (checksumEnabled) {checksums = new Array[Int](blocks.length)}blocks.zipWithIndex.foreach { case (block, i) =>//计算每个块的校验和if (checksumEnabled) {checksums(i) = calcChecksum(block)}//为每个块计算一个idval pieceId = BroadcastBlockId(id, "piece" + i)//只读字节缓冲区,物理上存储为多个块,而不是单个连续数组val bytes = new ChunkedByteBuffer(block.duplicate())//调用blockManager 存储每个块 存储级别时 内存和磁盘 并序列化 (因为要分发,因此要序列化)if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {throw new SparkException(s"Failed to store $pieceId of $broadcastId " +s"in local BlockManager")}}blocks.length} catch {}}}
三、使用广播变量
1、Broadcast
def value: T = {getValue()}//调用 TorrentBroadcast 获取protected def getValue(): T
2、TorrentBroadcast
//广播对象的软引用//它通过从 Driver 和/或其他 Executor 读取块来构建此值。@transient private var _value: SoftReference[T] = _override protected def getValue() = synchronized {//如果有就直接获取,如果没有拉过来再获取val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.getif (memoized != null) {memoized} else {val newlyRead = readBroadcastBlock()_value = new SoftReference[T](newlyRead)newlyRead}}private def readBroadcastBlock(): T = Utils.tryOrIOException {TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {//由于我们只根据“broadcastId”进行锁定,因此每当使用“broadcast Cache”时,我们应该只触摸“broadcastId”。//获取 broadcastManager 的缓存 因为存的时候选择的级别是 MEMORY_AND_DISK_SERval broadcastCache = SparkEnv.get.broadcastManager.cachedValues//根据 broadcastId 从broadcastCache 获取块Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {setConf(SparkEnv.get.conf)val blockManager = SparkEnv.get.blockManager//从本地块管理器获取块作为Java对象的迭代器。blockManager.getLocalValues(broadcastId) match {case Some(blockResult) =>if (blockResult.data.hasNext) {val x = blockResult.data.next().asInstanceOf[T]releaseBlockManagerLock(broadcastId)//如果从本地块管理取到了值,就将值返回,并放到缓存中if (x != null) {broadcastCache.put(broadcastId, x)}x} else {throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")}case None =>//本地块管理中没有该广播id对应的块数据val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +s"(estimated total size $estimatedTotalSize)")val startTimeNs = System.nanoTime()//从Driver和/或其他Executor获取torrent块//请注意,所有这些块都存储在BlockManager中并报告给Driver,因此其他Executor也可以从该Executor中提取这些块。val blocks = readBlocks()logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")try {val obj = TorrentBroadcast.unBlockifyObject[T](blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)// 将合并的副本存储在BlockManager中,这样此Executor上的其他Task就不需要重新获取它。val storageLevel = StorageLevel.MEMORY_AND_DISK//写一个由单个对象组成的块 ,该Executor其他Task使用时直接全量获取广播变量if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {throw new SparkException(s"Failed to store $broadcastId in BlockManager")}if (obj != null) {broadcastCache.put(broadcastId, obj)}obj} finally {blocks.foreach(_.dispose())}}}}}
四、总结
创建广播变量
1、创建广播变量id
2、创建TorrentBroadcast
3、TorrentBroadcast会自动调用writeBlocks将数据写入到BlockManager(这里会写两份数据,一份是整体的结果用于本Executor使用广播变量时使用,存储级别是MEMORY_AND_DISK,一份是拆分成块用于其他Executor拉取使用,存储级别是MEMORY_AND_DISK_SER)
使用广播变量
1、因为本身广播变量的类型就是TorrentBroadcast,因此直接调用其getValue方法获取
2、查看本类_value属性是否有值,如果有值直接返回(加速同Task反复调用)
3、查看缓存是否有值(BroadcastManager.cachedValues)如果有值直接返回(加速同Task反复调用)
4、查看本Executor的BlockManager是否有该广播变量整体的数据,如果有值直接返回(加速同Executor的不同Task使用)
5、从Driver或者其他Excutor获取该广播变量的块序列数据
如果获取到广播变量的数据都会向本类_value属性、缓存、本Executor的BlockManager设置或存入整体数据用于重复使用时加速
广播变量的创建以及使用流程如下:(下载放大后就很清晰哟)