PySpark用sort-merge join解决数据倾斜的完整案例

ops/2025/1/14 17:49:39/

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

python">from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")# 先对连接键进行排序,为sort-merge join做准备sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

python">spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

python">table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

python">sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

python">joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

python">joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

python">spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。

http://www.ppmy.cn/ops/150064.html

相关文章

Blazor开发复杂信息管理系统的优势

随着现代企业信息管理需求的不断提升,开发高效、易维护、可扩展的系统变得尤为重要。在这个过程中,Blazor作为一种新兴的Web开发框架,因其独特的优势,逐渐成为开发复杂信息管理系统的首选技术之一。本文将结合Blazor在开发复杂信息…

IIS安全配置基线

IIS安全配置基线 1. 限制目录的执行权限2. 开启日志记录功能3. 自定义错误页面4. 关闭目录浏览功能5. 停用或删除默认站点6. 删除不必要的脚本映射7. 专职低权限用户运行网站8. 在独立的应用程序池中运行网站 IIS(Internet Information Services)安全部署…

【C语言】标准IO

目录 1. 什么是标准IO 1.1概念 1.2特点 1.3 操作 2. 缓存区 3. 函数接口 3.1打开文件fopen 3.2关闭文件 3.3读写文件操作 3.3.1每次读写一个字符:fgetc()、fputc() a. 针对文件 b. 针对终端 3.3.2 每次一串字符的读写fgets()和fputs() c. 针对终端 d…

基于微信小程序的书籍销售系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…

PHP RCE

靶场搭建 生成容器 docker run -p 18022:22 -p 18080:80 -p 18081:81 -p 18082:82 -p 18085:85 -i -t mcc0624/cmd:latest bash -c /etc/rc.local; /bin/bash 管理网站 http://<IP>:18085/CZKJ2022 用户名&#xff1a;admin 密码&#xff1a;Pssw0rd SSH 用户名root 密…

深度学习——pytorch基础入门

一、张量 在PyTorch中&#xff0c;张量是PyTorch中最基本的数据结构。张量可以看作是一个多维数组&#xff0c;可以在GPU上加速运算。PyTorch的张量和Numpy的数组非常类似&#xff0c;但是与Numpy不同的是&#xff0c;PyTorch的张量可以自动地在GPU上进行加速计算。 PyTorch中的…

解锁 C# 与 LiteDB 嵌入式 NoSQL 数据库

一、开篇&#xff1a;邂逅 C# 与 LiteDB 新世界 在当今的软件开发领域&#xff0c;数据管理如同建筑的基石&#xff0c;而选择一款合适的数据库则是项目成功与否的关键因素之一。对于 C# 开发者来说&#xff0c;面对琳琅满目的数据库选项&#xff0c;如何抉择常常令人头疼。今…

【Linux】Linux常见指令(下)

个人主页~ Linux常见命令&#xff08;上&#xff09;~ 初识Linux 一、Linux基本命令11、cat命令12、more指令13、less指令14、head指令15、tail指令16、时间相关的指令&#xff08;1&#xff09;date指令&#xff08;2&#xff09;cal指令 17、find指令18、grep指令19、压缩相…