Spark 中的 Shuffle 是分布式数据交换的核心流程,从源码角度分析 Shuffle 的执行路径

news/2024/11/24 18:56:34/

Spark 中的 Shuffle 是分布式数据交换的核心流程,涉及多个组件的协同工作。为了深入理解其处理过程,我们可以从源码角度分析 Shuffle 的执行路径,分为 Shuffle WriteShuffle Read 两个阶段。


1. Shuffle Write 阶段

Shuffle Write 的主要任务是将 Mapper 的数据按照分区规则(如 HashPartitioner)分割、排序并写入磁盘。

1.1 数据分区与序列化
  • 入口方法
    Mapper 阶段的 compute() 方法中调用了 ShuffleDependency 的相关逻辑。

    val partition = partitioner.getPartition(key)
    

    数据会根据 partitioner(如 HashPartitioner 或自定义分区器)计算目标分区。

  • 序列化
    每条数据会通过 serializerInstance.serialize() 进行序列化,将数据转换成字节流以便后续写入。

1.2 数据排序与溢写(Spill)
  • 排序
    SortShuffleWriter 中,数据会被放入内存中的 PartitionedAppendOnlyMapPartitionedPairBuffer 进行排序(根据键的自然顺序或用户指定的比较器)。

  • 溢写到磁盘
    当内存不足时,会触发溢写(spill)。溢写的数据会写到多个磁盘文件,每个文件对应多个分区。

1.3 合并分区数据
  • 归并操作
    在溢写文件较多时,Spark 会对这些临时文件执行归并排序,生成最终的分区文件。
    • 对于 BypassMergeSortShuffleWriter,会直接将分区文件写出,无需排序。
    • 对于 SortShuffleWriter,归并排序确保每个分区的数据有序。
1.4 写出索引文件
  • 最后,Shuffle Write 阶段会生成一个索引文件(shuffleId_0.index)和数据文件(shuffleId_0.data)。
    • 索引文件:记录每个分区在数据文件中的偏移量,用于快速定位分区数据。
    • 数据文件:存储分区后的数据。

2. Shuffle Read 阶段

Shuffle Read 阶段由 Reducer 执行,任务是从分布式存储中拉取相应分区的数据。

2.1 拉取数据
  • 入口方法
    Reducer 的 compute() 方法会调用 BlockStoreShuffleFetcher.fetch()MapOutputTracker 获取每个分区的数据位置。

    val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId)
    
  • 数据传输
    数据通过 Spark 的 BlockManager 拉取。如果目标数据在同一节点上,可以通过本地文件系统读取;如果在远程节点上,则通过 Netty 或 HTTP 传输。

2.2 数据解压与反序列化
  • 解压
    如果数据经过压缩(如 LZ4、Snappy),在读取时会被解压缩。

  • 反序列化
    使用与 Shuffle Write 相同的序列化器(如 Kryo 或 JavaSerializer)将字节流转换回对象。

2.3 数据聚合与处理
  • 拉取到的数据会根据 Reducer 的逻辑(如 reduceByKeyaggregateByKey)进行聚合或排序处理。

3. 关键组件的协作关系

  • ShuffleManager
    决定使用哪种类型的 Shuffle,如 SortShuffleManagerHashShuffleManager

  • ShuffleWriter
    负责数据写入磁盘,主要实现类:

    • BypassMergeSortShuffleWriter
    • SortShuffleWriter
  • ShuffleReader
    负责从不同节点拉取数据,主要实现类:

    • BlockStoreShuffleReader
  • MapOutputTracker
    负责跟踪每个 Mapper 的输出分区位置,Reducer 会通过它获取分区数据的位置。


4. Shuffle 设计的优缺点

特性优点缺点
分区文件索引减少数据读取时的随机 I/O 开销索引管理复杂度增加
排序优化提高数据局部性和读取效率需要更多的 CPU 和内存资源
溢写与归并避免内存溢出,支持大规模数据处理增加磁盘 I/O 开销
数据压缩减少网络传输和存储空间压缩和解压缩会增加 CPU 开销

源码路径及关键类

  1. Shuffle Write

    • SortShuffleWriterorg.apache.spark.shuffle.sort.SortShuffleWriter
    • BypassMergeSortShuffleWriterorg.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter
  2. Shuffle Read

    • BlockStoreShuffleReaderorg.apache.spark.shuffle.BlockStoreShuffleReader
  3. Shuffle 依赖与管理


5. 性能优化方向

  1. 调优分区数

    • 合理配置 spark.sql.shuffle.partitionsspark.default.parallelism,避免分区数过多或过少。
  2. 压缩与序列化

    • 优化序列化器(推荐使用 Kryo),并启用压缩来减少网络开销。
  3. 内存管理

    • 调整 spark.memory.fraction,确保 Shuffle 缓存有足够的内存。
  4. 使用外部 Shuffle 服务

    • 启用 ExternalShuffleService,减轻 Executor 的内存和磁盘压力。

通过以上分析,可以从源码和优化角度全面理解 Spark Shuffle 的设计与工作原理。


http://www.ppmy.cn/news/1549606.html

相关文章

11/19使用Spring,gradle实现前后端交互

创建 Gradle 项目 在你常用的 IDE(如 Intellij IDEA)中选择创建新的 Gradle 项目,按照向导进行相应的配置,选择合适的项目名称、目录等信息。配置 build.gradle 文件(Gradle 项目的配置文件),添…

蚁群算法(Ant Colony Optimization, ACO)

简介 蚁群算法(Ant Colony Optimization, ACO)是一种基于自然启发的优化算法,由意大利学者马可多里戈(Marco Dorigo)在1992年首次提出。它受自然界中蚂蚁觅食行为的启发,用于解决离散优化问题。 在自然界…

jquery还有其应用场景,智慧慢慢地被边缘化,但不会消亡

一、jQuery 的辉煌过往 jQuery 的诞生与崛起 在前端开发的漫长历史中,2006 年诞生的 jQuery 犹如一颗耀眼的新星划破天际。它由 John Resig 创造,一出现便以其独特的魅力迅速吸引了广大开发者的目光。在那个前端技术发展相对缓慢的时期,jQue…

ESP32移植Openharmony外设篇(6)光敏电阻ADC读取

光照传感器 模块简介 产品描述 光敏电阻(photoresistor orlight-dependent resistor,后者缩写为LDR)是一种基于内光电效应的半导体元件,它的阻值依赖于入射光强的变化 。入射光强增加,光敏电阻的阻值减小&#xff0…

apr共享内存

下载: Download - The Apache Portable Runtime Project 编译: 使用cmake-gui生成库: apr-1.lib aprapp-1.lib libapr-1.lib libaprapp-1.lib libapr-1.dll 在Developer PowerShell for VS 2019中: 执行nmake -f Makefile.win来…

什么是“数学活动”?

数学活动嘛,不就是关于数学的活动,还有什么可说的?非也,且看下面定义。 一、不同人的定义: ① “数学活动”是指在数学教学过程中,以学生学习兴趣和内在需要为基础,以主动探索、变革、改造对象…

win10局域网加密共享设置

1、创建共享账户 我的电脑右键选择管理 选择本地用户和组 -> 用户 双击用户 在空白区域右键,新建用户 然后创建用户 点击创建后 2、设置网络 右下角网络右键

C++结构型设计模式的作用和特征

在C面向对象软件设计中,结构型模式(Structural Patterns)主要关注对象和类之间的组合,以形成更大的结构。这些模式帮助我们管理和组织对象之间的关系,使得系统更加灵活、可扩展和易于维护。以下是几种常见的结构型模式…