性能优化案例:通过合理设置spark.default.parallelism参数的值来优化PySpark程序的性能

embedded/2025/1/31 17:39:29/

在 PySpark 中,spark.default.parallelism 是一个关键参数,直接影响作业的并行度和资源利用率。
通过合理设置 spark.default.parallelism 并结合数据特征调整,可显著提升 PySpark 作业的并行效率和资源利用率。建议在开发和生产环境中进行多轮基准测试以确定最优值。以下是如何通过调整此参数优化性能的详细说明,结合案例和最佳实践:


1. 参数作用与问题场景

参数意义
  • spark.default.parallelism 决定了:
    • 新生成的 RDD 的默认分区数(如 sc.parallelize())。
    • Shuffle 操作(如 groupByKeyreduceByKey)后的分区数(如果未显式指定)。
  • 默认值
    • 本地模式:CPU 核心数(local[*] 时为逻辑核心数)。
    • 集群模式:max(2, total_executor_cores)(YARN/Mesos/K8s)。
常见性能问题
  1. 分区过少
    • 数据倾斜(少数分区处理大量数据)。
    • 无法充分利用集群资源(并行度低)。
    • 频繁的磁盘溢出(单个分区数据过大)。
  2. 分区过多
    • 调度开销增大(大量小任务)。
    • 增加 Shuffle 网络传输压力。
    • 潜在的内存溢出(如广播变量重复分发)。

2. 优化策略与设置方法

合理值计算
  • 经验公式

    python">spark.default.parallelism = max(# 基础值:总核心数 × 2~4 倍(IO密集型取高值,CPU密集型取低值)total_executor_cores * 3,  # 确保至少与数据输入分区数对齐(如 HDFS 文件块数)input_partitions  
    )
    
    • 例如:集群总资源为 100 个核心 → 建议设置为 300(3 倍核心数)。
  • 动态调整

    • 如果数据量极大(如 TB 级),可进一步提高至 total_executor_cores * 4
    • 使用 df.rdd.getNumPartitions() 检查输入数据的分区数。
设置方式
  • 代码中配置
    python">from pyspark.sql import SparkSession
    spark = SparkSession.builder \.config("spark.default.parallelism", "300") \.getOrCreate()
    
  • 提交作业时指定
    spark-submit --conf spark.default.parallelism=300 app.py
    

3. 案例分析与验证

场景描述
  • 问题作业:处理 1TB 的日志数据,进行 groupByKey 后聚合。
  • 原始配置spark.default.parallelism=200(集群总核心数 100)。
  • 症状
    • Shuffle 阶段耗时 2 小时,某些 Task 处理时间超过 30 分钟。
    • Executor 的 CPU 利用率低于 40%。
优化步骤
  1. 参数调整
    spark-submit --conf spark.default.parallelism=300 --conf spark.sql.shuffle.partitions=300 app.py
    
  2. 代码优化
    • groupByKey 替换为 reduceByKey(减少 Shuffle 数据量)。
    • 添加 Salt 处理倾斜键(如 key -> f"{key}_{random.randint(0,9)}")。
结果对比
指标优化前优化后
总执行时间4.2 小时1.8 小时
最长 Task 耗时32 分钟8 分钟
Executor CPU 利用率35%75%

4. 注意事项与进阶技巧

注意事项
  • spark.sql.shuffle.partitions 联动
    • 针对 DataFrame/SQL 操作,需同时设置此参数(默认 200)。
    python">spark.conf.set("spark.sql.shuffle.partitions", "300")
    
  • 避免过度分区
    • 检查任务执行计划(df.explain()),确保没有生成过多小文件。
    • 监控 Spark UI 中的 Task 耗时分布(避免 99% 的任务在 1 秒内完成)。
进阶优化
  1. 动态分区控制
    • 对倾斜数据使用 repartition(n)coalesce() 显式调整。
    • 使用 spark.adaptive.enabled=true(Spark 3.0+ 自适应查询优化)。
  2. 数据本地性
    • 确保输入数据分区与 HDFS 块分布对齐(避免跨节点传输)。
  3. 硬件资源匹配
    • 每个分区的数据量建议在 128MB~1GB 之间(HDFS 块大小对齐)。

5. 监控与调优验证

  1. 监控工具
    • Spark UI(任务时间分布、Shuffle 读写量)。
    • Ganglia/Prometheus(集群级 CPU/内存/网络监控)。
  2. 调优验证
    • 逐步增加 spark.default.parallelism,观察任务时间变化曲线。
    • 使用 spark.dynamicAllocation.enabled=true 自动扩展 Executor。

http://www.ppmy.cn/embedded/158417.html

相关文章

F. Ira and Flamenco

题目链接:Problem - F - Codeforces 题目大意:给n,m n个数让从中选m个数满足一下条件: 1.m个数互不相同 2.里面的任意两个数相减的绝对值不能超过m 求这n个数有多少组数据满足。 第一行包含一个整数 t ( 1 ≤ t ≤ 1e4 ) - 测试用例数。 …

跟李沐学AI:视频生成类论文精读(Movie Gen、HunyuanVideo)

Movie Gen:A Cast of Media Foundation Models 简介 Movie Gen是Meta公司提出的一系列内容生成模型,包含了 3.2.1 预训练数据 Movie Gen采用大约 100M 的视频-文本对和 1B 的图片-文本对进行预训练。 图片-文本对的预训练流程与Meta提出的 Emu: Enh…

青少年编程与数学 02-008 Pyhon语言编程基础 05课题、数据类型

青少年编程与数学 02-008 Pyhon语言编程基础 05课题、数据类型 一、数据类型1. 数字类型(Numeric Types)2. 序列类型(Sequence Types)3. 集合类型(Set Types)4. 映射类型(Mapping Type&#xff…

【redis进阶】redis 总结

目录 介绍一下什么是 Redis,有什么特点 Redis 支持哪些数据类型 Redis 数据类型底层的数据结构/编码方式是什么 ZSet 为什么使用跳表,而不是使用红黑树来实现 Redis 的常见应用场景有哪些 怎样测试 Redis 服务器的连通性 如何设置 key 的过期时间 Redis …

React第二十八章(css modules)

css modules 什么是 css modules 因为 React 没有Vue的Scoped,但是React又是SPA(单页面应用),所以需要一种方式来解决css的样式冲突问题,也就是把每个组件的样式做成单独的作用域,实现样式隔离,而css modules就是一种…

LeetCode题练习与总结:最长和谐子序列--594

一、题目描述 和谐数组是指一个数组里元素的最大值和最小值之间的差别 正好是 1 。 给你一个整数数组 nums ,请你在所有可能的 子序列 中找到最长的和谐子序列的长度。 数组的 子序列 是一个由数组派生出来的序列,它可以通过删除一些元素或不删除元素…

Linux C++

一、引言 冯诺依曼架构是现代计算机系统的基础,它的提出为计算机的发展奠定了理论基础。在学习 C 和 Linux 系统时,理解冯诺依曼架构有助于我们更好地理解程序是如何在计算机中运行的,包括程序的存储、执行和资源管理。这对于编写高效、可靠的…

在线课堂小程序设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…