PySpark用sort-merge join解决数据倾斜的完整案例

news/2025/1/15 16:58:24/

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

python">from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")# 先对连接键进行排序,为sort-merge join做准备sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

python">spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

python">table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

python">sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

python">joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

python">joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

python">spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。

http://www.ppmy.cn/news/1563368.html

相关文章

SpringBoot:SaToken的options预检请求鉴权失败

问题描述 使用如下sa-token配置,前端通过IP端口号的方式访问后端服务,会存在options预检请求鉴权失败的问题。 问题分析 http-options请求 HTTP OPTIONS 方法请求给定的 URL 或服务器的允许通信选项。客户端可以用这个方法指定一个 URL,或者…

MFC界面库ToolkitPro v15.3.1的编译和使用教程(支持VS2015和VS2017)

一、ToolkitPro v15.3.1库的下载 界面库全称为Codejock Xtreme Toolkit Pro,目前可以免费使用的版本为v15.3.1,可以在CSDN上搜索下载,有很多,比如 https://download.csdn.net/download/nizheng96/11151867 二、ToolkitPro v15.3…

IDEA的Java注释在Toggle Rendered View下的字号调整方式

记录IntelliJ IDEA的Java注释在Toggle Rendered View下的字号调整方式 如图,在Toggle Rendered View模式下的注释字号很大,与代码不协调,在此区域点击鼠标右键,选中 Adjust 出现一个滑动条,通过拖动游标调整字号大小…

单片机(MCU)-简单认识

简介: 内部集成了CPU,RAM,ROM,定时器,中断系统,通讯接口等一系列电脑的常用硬件功能。 单片机的任务是信息采集(依靠传感器),处理(依靠CPU)&…

什么是IDE,新手如何选择IDE?

IDE 是 Integrated Development Environment(集成开发环境)的缩写,它是一种软件应用程序,为程序员提供了一站式的开发环境,整合了多种工具和服务,以便高效地创建、修改、编译、调试和运行软件程序。IDE 集成…

搭建Node.js后端

从头开始搭建一个Node.js后端,并实现查询历史数据的功能,下面是详细的步骤说明,包括环境配置、项目初始化、代码编写、以及服务器启动。 1. 环境配置 1.1 安装 Node.js 和 npm 首先,你需要在你的电脑上安装 Node.js 和 npm&…

springmvc的获取请求数据

在使用 SpringMVC 开发 web 应用时,我们经常需要从用户的请求中获取数据。不管是表单提交、查询参数,还是路径上的数据,SpringMVC 都为我们提供了简单而强大的方式来获取这些数据。 1. 使用 RequestParam 获取查询参数 基本使用&#xff1a…

Agile Scrum 敏捷开发方法

Agile Scrum 是一种敏捷开发方法,广泛用于软件开发以及其他项目管理领域。它强调迭代式的工作流程、团队协作、灵活应对变化和持续改进,旨在通过快速交付和反馈来最大化项目价值。Scrum 是 Agile(敏捷)方法中的一种具体实践框架&a…