Spark中为什么Left join比Full join 快

news/2025/1/8 15:23:22/

背景

最近在调优的过程中,发现了left outer joinfull outer join快很多的情况,
具体的sql如下:

from db.baseTb1 base  join db.tb1 a on base.id = a.idfull outer join db.tbl2 b on  a.id = b.id full outer join db.tbl3 c on  b.id = c.id full outer join db.tbl4 d on  c.id = d.id full outer join db.tbl5 e on  d.id = e.id -----from db.baseTb1 base  left join db.tb1 a on base.id = a.idleft outer join db.tbl2 b on  a.id = b.id left outer join db.tbl3 c on  a.id = c.id left outer join db.tbl4 d on  a.id = d.id left outer join db.tbl5 e on  a.id = e.id 

结论

先说结论:left join中4个join会在同一个Stage执行,也就是说会在同一个Task执行4个join,而full join每个join都是在单独的Stage中执行,是串行的,
如果在语意允许的情况下,选择left join可以大大加速任务运行,笔者遇到的情况就是 left join 运行了1个小时,而full join运行了6个小时

分析

对于full outer join的情况,运行的物理计划如下:
在这里插入图片描述

对于每个SortMergeJoin完后都会有一个Exchange的shuffle操作。

对于left outer join的情况,运行的物理计划如下:
在这里插入图片描述

只有在读取source文件完之后才会有Exchange的shuffle的操作。
这是为什么呢?
因为在RuleEnsureRequirements中,会对于不匹配的计划之间加上shuffle Exchange物理计划,具体代码如下:

  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistributionval requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrderingvar children: Seq[SparkPlan] = operator.childrenassert(requiredChildDistributions.length == children.length)assert(requiredChildOrderings.length == children.length)// Ensure that the operator's children satisfy their output distribution requirements.children = children.zip(requiredChildDistributions).map {case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>childcase (child, BroadcastDistribution(mode)) =>BroadcastExchangeExec(mode, child)case (child, distribution) =>val numPartitions = distribution.requiredNumPartitions.getOrElse(conf.numShufflePartitions)ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)}
  • child.outputPartitioning.satisfies(distribution)这块代码就是判断下游的输出分区是否满足当前计划所要求的分布
    我们分析第一个join的时候,也就是:
   FileSourceScanExec     FileSourceScanExec\                      /\/                    \/SortMergeJoin

这里SortMergeJoin的requiredChildDistributionClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys)
SortMergeJoin的child的outputPartitioning为FileSourceScanExec.outputPartitioning,即UnknownPartitioning

所以会引入Exchange,形成如下的物理计划:

FileSourceScanExec     FileSourceScanExec\                      /\/                    \/Exchange            Exchange\               /SortMergeJoin

而最终经过AQE以后会形成如下的物理计划:

   FileSourceScanExec     FileSourceScanExec\                      /\/                    \/Exchange                Exchange|                      |CustomShuffleReader CustomShuffleReader\               /SortMergeJoin

而对于接下来的第二个join,full join和left join的情况就不一样了:

  • 对于left join:

FileSourceScanExec     FileSourceScanExec\                      /\/                    \/Exchange                Exchange|                      |CustomShuffleReader CustomShuffleReader\               /                    SortMergeJoin                  FileSourceScanExec||                           Exchange|                              ||                          CustomShuffleReader|                              /SortMergeJoin(left outer join)

因为第二个SortMergeJoin的requiredChildDistributionClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys)
SortMergeJoin 的child的outputPartitioning为第一个SortMergeJoin.outputPartitioning,具体的代码如下:

  override def outputPartitioning: Partitioning = joinType match {case _: InnerLike =>PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))case LeftOuter => left.outputPartitioningcase RightOuter => right.outputPartitioningcase FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)case LeftExistence(_) => left.outputPartitioningcase x =>throw new IllegalArgumentException(s"ShuffledJoin should not take $x as the JoinType")}

所以是CustomShuffleReader.outputPartitioning,w为Exchange.outputPartitioningHashPartitioning,则能匹配satisfied上,所以不会引入额外的shuffle

  • 对于full outer join:

FileSourceScanExec     FileSourceScanExec\                      /\/                    \/Exchange                Exchange|                      |CustomShuffleReader CustomShuffleReader\               /                    SortMergeJoin                  FileSourceScanExec||                           Exchange|                              ||                          CustomShuffleReader|                              /SortMergeJoin(left outer join)

其他的都是left outer join一样,唯一不一样的是SortMergeJoin 的child的outputPartitioning是 第一个SortMergeJoin.outputPartitioning ,根据以上代码:
走的就是FullOuter的逻辑,也就是UnknownPartitioning,所以是不满足,得引入shuffle Exchange。

其实从逻辑上来说,full join后如果不重新shuffle,会导致一个任务中会有id为null值的存在,会导致join的结果不正确
而对于left join来说就不一样了,task join完后id还是保持原来的就不会变,所以就不必重新shuffle


http://www.ppmy.cn/news/908146.html

相关文章

smartprinter 这个绝对程序猿的福音啊

以前不知道有smartprinter这个东西,做套打程序的时候不知道浪费了多少纸张啊,想在想想都心疼。网上有下载中文破解版。特此记录。

倍福TwinCAT设置PLC的扫描周期,运行周期方法

倍福TwinCAT设置PLC的扫描周期,运行周期方法 双击PlcTask,然后再Cycle ticks中可以修改PLC的扫描周期,例如修改为2ms 为了验证是否真的是2ms,可以在程序中跟计数器绑定使用,PLC2ms扫描一次,计数器也是每个…

倍福PLC笔记

作为一个在自动化行业从事三年的工程师,我接触过的PLC主要有:德国倍福PLC、施耐德PLC、上海步科自动化的F1系列控制器等。 这几个控制平台大同小异,都是基于Codesys平台搭建的,其中倍福PLC主要基于Ethercat总线,施耐德…

SAP smartforms打印图片

注意:SAP只能上传打印bmp格式图片 1.标准程序上传 T-CODE:SE78 2.程序代码上传 DATA: P_FILENAME TYPE RLGRAP-FILENAME,P_NAME TYPE STXBITMAPS-TDNAME,P_TITLE LIKE BAPISIGNAT-PROP_VALUE,P_DOCID TYPE STXBITMAPS-DOCID,P_RESOLUTION TYPE …

倍福 (BeckHOFF)PLC 使用随笔小记1

我是一位工控小白,加入自动化这一行以来,参与的项目都是非标定制类,我的工作内容是电气设计、电气元件选型、PLC编程、设备调试等(我不清楚怎么称呼我的职位,他们说叫电气工程师),目前用的PLC主…

smartprinter注册版_SmartPrinter免费版

SmartPrinter是一款很经典的虚拟打印软件,专为转换文件而研发的高品质打印驱动,以运行稳定、转换速度快和图像质量高而著称,可以将任意可打印文档转换成 PDF、TIFF、JPEG,BMP、PNG、EMF、GIF、TXT格式,因其速度快&…

ABB机器人smart组件仿真码垛

01导入简易的输送链及产品模型库 02创建工具简易吸盘和tooldata 03Smart组件之创建动态输送链 04简易输送链的属性和信号连接 05验证传送带运行是否正常 06动态简易吸盘Smart组件属性和信号连接 07验证吸盘是否正常吸和放 08新建IO板和信号 09工作站逻辑的信号和连接 …

ABB AC500 系列 PLC 与上位机iFix 的通讯配置

ABB PLC IP 及 MODBUS TCP/IP 协议设置 通过 IP config tool 配置设备 IP 在 软件中,有 3 种方式可以进入 IP config tool 的配置界面  双击 IP_settings,点击 IP config tool,即可进入 IP config tool 界面 点击菜单栏的 Tool→IP confi…