写了一段非常简单的SQL。 数据量级一天大约5千万,拉取的30天的数据,按天 count(distinct)计算一系列指标。原本以为执行的效率会很快,结果发现运行了2h!所有探究其为什么运行慢,以及后续该如何规避这类问题。Spark使用的版本是3.x,SQL示例如下:
SELECT dt
, COUNT(DISTINCT a) as uv -- 用户数
, COUNT(DISTINCT b) as pay_uv_1d
, ... 类似诸如此类的count distinct 一共有30多个指标
FROM
(
SELECT *
FROM table
WHERE dt BETWEEN DATE_SUB('${dt}',29) AND '${dt}'
) tmp
GROUP BY dt
单个count distinct 和普通的shuffle类型,经历map端预聚合,然后shuffle到reduce端最终聚合的方式,就把最终的结果给统计出来了。
多个count distinct 执行原理:
但是多个 count distinct 的逻辑和普通的shuffle确有点不太一样,它会多了一个Expand节点,这个节点会将产生数据膨胀,有N个count distinct 数据就会膨胀N倍,最终这里导致的执行耗时非常之长。
例如有这样一条数据:
dt | a | b |
2024-12-15 | aa | bb |
会膨胀成如下样子,再按照a 和 b为key 分别进行group by去重,然后再count,就最终实现了多个count distinct 计算了:
dt | a | b |
2024-12-15 | aa | null |
2024-12-15 | null | a |
多个count distinct 优化:
了解了多个count distinct 原理之后,优化就大致有思路了。
1. 首先还是得简化SQL,看看能不能不通过count distinct处理 或者 提前过滤掉一些不必要的数据,尽量减少膨胀的数据量
2. 将多个count distinct 拆开分在多段SQL中写,防止膨胀不需要膨胀的数据。
3. 进行spark参数的优化,目前个人优化了如下几个参数:
spark.sql.files.maxPartitionBytes(读取数据单个task读取更少的数据,设置为16m 16777216,默认为128m)
spark.sql.shuffle.partitions 设置为1w,增大partition的数目,减少单个partiton处理的数据量
参考:
sparksql源码系列 | 一文搞懂with one count distinct 执行原理
再来说说sparksql中count(distinct)原理和优化手段吧~-腾讯云开发者社区-腾讯云
collect set函数 spark sql spark count distinct_mob6454cc716fb0的技术博客_51CTO博客