在shuffle的key中包含rand,一般有两种使用场景:
1、distribute by rand,将数据随机打散
2、rand出现在join的on条件中,避免数据倾斜
负面影响:如果Spark的shuffle条件中包含rand,rand的shuffle阶段发生fetch fail,有可能引起数据错误。
因此,在处理数据倾斜时将热点key打散需要注意:尽量不要在join时,对关联key使用rand()函数。因为在hive中当遇到map失败重算时,就会出现数据重复(数据丢失)的问题,spark引擎使用rand容易导致task失败重新计算的时候偶发不一致的问题。 可以使用md5加密唯一维度值的方式替代rand(), 比如: md5(concat(coalesce(sku_id, 0), ‘’, coalesce(dim_store_num, 0), ‘’, coalesce(store_id, 0), ‘_’,coalesce(delv_center_id, 0))),其中concat的字段是表的唯一粒度;也可以使用hash。
修改SQL解析
出现问题的原因是:作业在进行的shuffle的时候,同一行数据,shuffle的结果不是幂等的。如果shuffle的mapper task由于失败重算,就有可能导致shuffle的数据分配错误。
修改的目标就是:
- 对于同一行数据,需要shuffle的结果是幂等的。
- 具体的方式:把shuffle的key与数据中确定的列绑定。
对于需要根据rand进行shuffle来实现将数据进行随机打散的情况,可以把shuffle规则从rand改成与数据的确定列绑定,也就是deterministic_function(deterministic_col1,
deterministic_col2, …)
比如原本是想通过rand随机分散到20个分区里面, distribute by cast(rand(11)*20 as int) 可以修改成
distribute by rand
1、用一个整数的id对20取模
distribute by id % 20
2、 用任意一个类型字段的hash,然后对20取模
distribute by abs(hash(col) % 20)
3、避免某个字段倾斜,多考虑几个字段,降低倾斜的概率
distribute by abs(hash(col1, col2, …) % 20)