spark的stage划分的原理

embedded/2024/9/25 9:29:13/

        在 Apache Spark 中,stage 是执行作业时的重要执行单元。一个 Spark 作业会被划分为若干个 stage,每个 stage 由一组可以并行执行的任务组成。这种划分主要依赖于 RDD 中的操作类型(窄依赖和宽依赖)。下面我们来讨论 Spark stage 的创建和划分的原理以及代码实现的核心逻辑。

Spark Stage 划分的原理

  1. RDD 依赖(窄依赖和宽依赖)

    • Spark 中,RDD 可以有两种依赖关系:
      • 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如 mapfilter 等。
      • 宽依赖(wide dependency):父 RDD 的每个分区可能被多个子 RDD 的分区使用,典型的操作如 reduceByKeygroupByKey 等,这类操作会触发 shuffle
    • 窄依赖的 RDD 操作可以被划分到同一个 stage 中,而宽依赖的 RDD 操作会触发 shuffle,导致 stage 划分。
  2. DAG(有向无环图)

    Spark 的作业会构建一个 RDD 的依赖图(DAG)。这个 DAG 中每个 RDD 的窄依赖操作会被合并成一个 stage,宽依赖操作会划分出不同的 stage,并在两个 stage 之间插入 shuffle
  3. Stage 划分规则

    • 每当遇到一个宽依赖(如 reduceByKeyjoingroupByKey 等),Spark 会创建一个新的 stage,并将之前的 RDD 操作划分到一个 stage 中,形成一个有序的 stage 执行链。
    • stage 划分的核心任务是:将窄依赖操作尽可能合并到一起,直到遇到需要 shuffle 的宽依赖操作。

Spark Stage 划分的核心代码逻辑

        Spark 的 DAG 划分及 stage 划分主要在 DAGScheduler 中实现。DAGScheduler 是 Spark 作业调度的核心组件,负责将逻辑作业(job)划分为多个 stage,并调度这些 stage 执行。

以下是 Spark 3.x 版本中有关 stage 划分的核心逻辑及其简化代码片段。

1. DAGScheduler 类

  DAGScheduler 类位于 org.apache.spark.scheduler 包下,它负责管理 RDD 依赖关系并创建 stageDAGScheduler 会根据 RDD 的依赖图和操作类型,生成任务的 DAG 并划分 stage

class DAGScheduler(// 参数略...
) extends Logging {// stage 列表private val stages = new HashMap[StageId, Stage]()// 提交 Job 时触发的函数def submitJob(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Seq[Int],callSite: CallSite,allowLocal: Boolean,resultHandler: (Int, _) => Unit,properties: Properties = null): JobWaiter[_] = {// 根据 RDD 和依赖关系生成最终的 ResultStageval finalStage = createFinalStage(rdd, partitions, callSite)// 提交该 stage 执行submitStage(finalStage)}// 创建 ResultStage 和后续的 Stageprivate def createFinalStage(rdd: RDD[_],partitions: Seq[Int],callSite: CallSite): ResultStage = {// 创建该作业的最终的 stage,并递归创建所有依赖的 stageval finalStage = newStage(rdd, partitions)finalStage}// 递归生成各个 Stage,核心逻辑private def newStage(rdd: RDD[_], partitions: Seq[Int]): Stage = {// 检查缓存,避免重复生成 Stagestages.getOrElseUpdate(rdd.id, {val shuffleDeps = getShuffleDependencies(rdd)// 如果存在宽依赖,则要划分为不同的 stageif (shuffleDeps.nonEmpty) {val parentStages = shuffleDeps.map { dep =>newStage(dep.rdd, dep.rdd.partitions.indices)}val newStage = new ShuffleMapStage(rdd, parentStages)stages(newStage.id) = newStagenewStage} else {// 如果只有窄依赖,当前操作在同一个 stage 内val parentStages = getNarrowDependencies(rdd).map { dep =>newStage(dep.rdd, dep.rdd.partitions.indices)}val newStage = new ResultStage(rdd, parentStages)stages(newStage.id) = newStagenewStage}})}// 获取 RDD 的 shuffle 依赖(宽依赖)private def getShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = {rdd.dependencies.collect {case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep}}// 获取 RDD 的窄依赖private def getNarrowDependencies(rdd: RDD[_]): List[Dependency[_]] = {rdd.dependencies.collect {case narrowDep: NarrowDependency[_] => narrowDep}}
}
2. Stage 划分的基本过程
  • RDD 依赖遍历:通过 newStage 函数递归遍历 RDD 的依赖关系,将遇到的每一个 shuffle 依赖(宽依赖)创建一个新的 ShuffleMapStage,而 ResultStage 则用于最终计算结果。

  • 宽依赖处理:当遇到宽依赖(ShuffleDependency),说明需要进行 shuffle,因此要创建一个新的 stage

  • 窄依赖处理:当只有窄依赖时,RDD 可以继续合并在当前的 stage 中。

3. ShuffleMapStage 和 ResultStage

ShuffleMapStage 和 ResultStage 是 Spark 中两种类型的 Stage

  • ShuffleMapStage:处理宽依赖(shuffle),该 stage 会产生 shuffle 文件供下游 stage 使用。
  • ResultStage:最终计算 Action(如 collectsaveAsTextFile 等)结果的 stage,是 DAG 中的最后一个 stage

代码流程总结

  1. DAGScheduler 在收到作业时,会从最后的 Action 开始,通过递归函数 newStage,根据 RDD 的依赖关系逐步向上遍历。
  2. 当遇到 shuffle 依赖时,会将其划分为不同的 stage,每个 shuffle 依赖会产生一个 ShuffleMapStage
  3. 所有的窄依赖 RDD 操作则合并为一个 stage,在同一个 stage 中执行。
  4. submitStage 负责将划分好的 stage 发送给 TaskScheduler,TaskScheduler 则进一步调度任务到集群执行。

总结

  • 窄依赖操作:操作在同一个 stage 中执行,尽可能合并,减少 shuffle
  • 宽依赖操作:每个宽依赖会触发新的 stage,并引入 shuffle,每个 shuffle 会将数据重新分布给后续的 stage
  • DAGScheduler 的作用:DAG 调度器负责将 RDD 操作链划分为多个 stage,并根据依赖关系生成一个 DAG。

http://www.ppmy.cn/embedded/116556.html

相关文章

Linux 中 man手册基础 与 man.vim 和 ctags的安装使用

man手册基础 查找命令:Man [section] [所查找…的名字] Linux的man很强大,该手册分成很多section,使用man时可以指定不同的section来浏览,各个section意义如下: 1 - commands 2 - system calls 3 - library calls 4 -…

React 启动时webpack版本冲突报错

报错信息: 解决办法: 找到全局webpack的安装路径并cmd 删除全局webpack 安装所需要的版本

【Linux笔记】如何将内容从一个文件复制到另一个文件

比如:将文件tmp_file.txt中的部分数据,复制到file01.txt中去 tmp_file.txt文中内容: file01.txt为空文档 一、使用vi编辑器 I、文件中直接使用:e 目标文件进行切换文件复制 1、打开被复制文件 vi tmp_file.txt 2、进入一般命令模式 默认情况为…

Maven 编译和Nexus 构建私有仓库

Java 程序编译 编译流程 C 语言源码编译过程,对于单文件,我们可以使用 gcc 命令直接编译即可,但如果是大型商业项目,源码文件多,存在各种依赖,各种配置路径,各种库的支持等,几乎无法…

第十四届蓝桥杯嵌入式国赛

一. 前言 本篇博客主要讲述十四届蓝桥杯嵌入式的国赛题目,包括STM32CubeMx的相关配置以及相关功能实现代码以及我在做题过程中所遇到的一些问题和总结收获。如果有兴趣的伙伴还可以去做做其它届的真题,可去 蓝桥云课 上搜索历届真题即可。 二. 题目概述 …

传输层协议 —— TCP协议(上篇)

目录 1.认识TCP 2.TCP协议段格式 3.可靠性保证的机制 确认应答机制 超时重传机制 连接管理机制 三次握手 四次挥手 1.认识TCP 在网络通信模型中,传输层有两个经典的协议,分别是UDP协议和TCP协议。其中TCP协议全称为传输控制协议(Tra…

分布式算法

分布式场景下的核心问题 分布式场景下困扰我们的3个核心问题(CAP):一致性、可用性、分区容错性。 1、一致性(Consistency):无论服务如何拆分,所有实例节点同一时间看到是相同的数据。 2、可用性…

MySQL记录存储过程执行的错误信息

创建业务表 CREATE TABLE tb_user (id int NOT NULL primary key,name varchar(255) DEFAULT NULL,age int DEFAULT NULL );创建错误日志表 CREATE TABLE error_log (error_id INT AUTO_INCREMENT PRIMARY KEY,error_code char(5),error_message text,error_timestamp TIMEST…