PySpark用sort-merge join解决数据倾斜的完整案例

devtools/2025/1/14 18:39:51/

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

python">from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")# 先对连接键进行排序,为sort-merge join做准备sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

python">spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

python">table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

python">sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

python">joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

python">joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

python">spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。

http://www.ppmy.cn/devtools/150477.html

相关文章

Transmon

Transmon 是一种超导量子比特,由耶鲁大学的研究人员在2007年提出。它是基于约瑟夫森结的量子比特的改进版本,旨在提高量子比特的相干时间。Transmon 的设计和原理涉及多个物理学领域的知识,包括量子电动力学(QED)。以下…

Autodl安装tensorflow2.10.0记录

首先租用新实例(我选的是3080*2卡),由于基础镜像中没有2.10.0版本,选miniconda3的基础环境 创建虚拟环境:conda create --name xxx python3.8(环境名)激活虚拟环境:conda activate x…

SpringBoot之LazyInitializationBeanFactoryPostProcessor类源码学习

源码分析 /**** author Andy Wilkinson* author Madhura Bhave* author Tyler Van Gorder* author Phillip Webb* since 2.2.0* see LazyInitializationExcludeFilter** 主要用于延迟初始化 Bean 的配置。它通过修改 BeanFactory 的配置来确保某些 Bean 在实际需要时才进行初始…

MySQL核心揭秘:InnoDB存储引擎深度探索

一、InnoDB体系结构概述 InnoDB的整体架构分为三部分:缓冲池(Buffer Pool)、后台线程、文件,如下图所示三部分 1.缓冲池 缓冲池是什么? InnoDB存储引擎基于磁盘文件存储,访问物理硬盘和在内存中进行访问,速度相差很…

Java的Stream流和Option类

1. Stream 流 背景 Stream是Java 8引入的一个用于处理集合(或其他数据源)中的元素的API。它提供了一种声明式的方式来处理数据,并可以链式调用。Stream支持惰性求值,也支持并行流处理。 1.1 创建 Stream 创建一个Stream可以通…

【Uniapp-Vue3】onUnload页面卸载和onPageScroll页面监听滚动

一、onUnload函数 当页面卸载就会触发该函数: import {onUnload} from "dcloudio/uni-app"; onUnload(()>{...}) open-type"reLauch"会在跳转到别的页面的同时将当前页面销毁。 当我们跳转页面以后,就触发了onUnload函数&#…

基于Springboot+Vue的仓库管理系统

开发一个基于Spring Boot和Vue的仓库管理系统涉及到前端和后端的开发。本文呢,给出一个简单的开发步骤指南,用于指导初入的新手小白如何开始构建这样一个系统,如果**你想直接学习全部内容,可以直接拉到文末哦。** 开始之前呢给小…

【芯片设计- RTL 数字逻辑设计入门 9.2 -- flip flop 与 寄存器的关系详细介绍】

请阅读【嵌入式开发学习必备专栏 Cache | MMU | AMBA BUS | CoreSight | Trace32 | CoreLink | ARM GCC | CSH】 文章目录 Overview硬件角度的 Flip-Flop软件角度的寄存器举例说明硬件设计角度软件开发角度D Flip-Flop 实现基本原理:Verilog 代码:UT 示例JK Flip-Flop 实现基…