Spark RDD中常用聚合算子源码层面的对比分析

ops/2024/11/18 7:23:42/

在 Spark RDD 中,groupByKeyreduceByKeyfoldByKeyaggregateByKey 是常用的聚合算子,适用于按键进行数据分组和聚合。它们的实现方式各不相同,涉及底层调用的函数也有区别。以下是对这些算子在源码层面的分析,以及每个算子适用的场景和代码示例。


1. groupByKey

  • 功能:将相同键的值分组,形成一个 (key, Iterable<values>)RDD

  • 源码分析
    groupByKey 底层使用了 combineByKeyWithClassTag 方法进行数据分组。

    def groupByKey(): RDD[(K, Iterable[V])] = {combineByKeyWithClassTag((v: V) => mutable.ArrayBuffer(v),(c: mutable.ArrayBuffer[V], v: V) => { c += v; c },(c1: mutable.ArrayBuffer[V], c2: mutable.ArrayBuffer[V]) => { c1 ++= c2; c1 }).asInstanceOf[RDD[(K, Iterable[V])]]
    }
    
    • 适用场景:适合需要按键分组、无聚合的场景,但由于需要把所有键的值都传输到驱动端,数据量大时可能导致内存问题。
  • 示例

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
    result = rdd.groupByKey().mapValues(list)
    print(result.collect())
    

    输出[('a', [1, 3]), ('b', [2])]


2. reduceByKey

  • 功能:基于给定的二元函数(如加法)对每个键的值进行聚合。

  • 源码分析
    reduceByKey 底层也是基于 combineByKeyWithClassTag 方法进行处理,但与 groupByKey 不同的是,它在每个分区内执行局部聚合,再进行全局聚合,减少了数据传输。

    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {combineByKeyWithClassTag[V]((v: V) => v, func, func)
    }
    
    • 适用场景:适用于需要对数据进行聚合计算的场景,能够有效减少 shuffle 数据量。
  • 示例

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
    result = rdd.reduceByKey(lambda x, y: x + y)
    print(result.collect())
    

    输出[('a', 4), ('b', 2)]


3. foldByKey

  • 功能:与 reduceByKey 类似,但提供了初始值,分区内和分区间合并时都使用这个初始值。

  • 源码分析
    foldByKey 的实现中调用了 aggregateByKey 方法,初始值会在每个分区中传递,确保聚合逻辑一致。

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {aggregateByKey(zeroValue)(func, func)
    }
    
    • 适用场景:当聚合操作需要一个初始值时使用,如从初始值开始累积计算。
  • 示例

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
    result = rdd.foldByKey(0, lambda x, y: x + y)
    print(result.collect())
    

    输出[('a', 4), ('b', 2)]


4. aggregateByKey

  • 功能:支持更复杂的聚合操作,提供了分区内和分区间不同的聚合函数。

  • 源码分析
    aggregateByKey 是最通用的聚合算子,调用了 combineByKeyWithClassTag 方法来控制分区内和分区间的计算方式。

    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = {// Implementation detail here
    }
    
    • 适用场景:适合复杂的聚合逻辑需求,例如在分区内和分区间使用不同的函数。
  • 示例

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
    result = rdd.aggregateByKey(0,lambda x, y: x + y,   # 分区内加和lambda x, y: x + y)   # 分区间加和
    print(result.collect())
    

    输出[('a', 4), ('b', 2)]


区别总结

  • groupByKey:按键分组返回集合,适合分组场景,但内存消耗大。
  • reduceByKey:按键聚合,没有初始值,适用于聚合计算。
  • foldByKey:按键聚合,支持初始值,适合自定义累加计算。
  • aggregateByKey:最灵活的聚合算子,适合复杂逻辑。

http://www.ppmy.cn/ops/134646.html

相关文章

使用 Grafana api 查询 Datasource 数据

一、使用grafana 的api 接口 官方API 二、生成Api key 点击 Administration -》Users and accss -》Service accounts 进入页面 点击Add service account 创建 service account 点击Add service account token 点击 Generate token , 就可以生成 api key 了 三、进入grafana…

设计模式-行为型-常用-2:职责链模式、状态模式、迭代器模式

目录 职责链模式 概念 代码实现 使用场景 扩展-Servlet Filter 扩展-Spring Interceptor 状态模式 什么是有限状态机&#xff1f; 状态机实现方式一&#xff1a;分支逻辑法 状态机实现方式二&#xff1a;查表法 状态机实现方式三&#xff1a;状态模式 迭代器模式 定…

函数式组件和类组件的区别

函数式组件和类组件的区别 函数式组件和类组件的区别1. 语法2. 状态管理3. 生命周期管理4. this5. 创建组件6. 继承7. 性能优化 为什么推荐使用函数式组件函数式组件的优势 函数式组件和类组件的区别 1. 语法 函数式组件&#xff1a;基于函数式编程思想&#xff0c;使用 函数…

c语言学习21数组

1.1数组介绍 概念&#xff1a;数组就是 相同数据类型 的一组数据的集合 数组中每个数据 元素 用一个名字来命名这个集合 数组名 用编号区分 下表&#xff08;从0开始自动标号&#xff09; 当处理大量…

Unreal engine5实现类似鬼泣5维吉尔二段跳

系列文章目录 文章目录 系列文章目录前言一、实现思路二、具体使用蓝图状态机蓝图接口三、中间遇到的问题 前言 先看下使用Unreal engine5实现二段跳的效果 一、实现思路 在Unreal Engine 5 (UE5) 中使用蓝图系统实现类似于《鬼泣5》中维吉尔的二段跳效果&#xff0c;可以通…

IDEA报包不存在,但实际存在

IDEA版本2024.2.1 现象 在IDEA里启动运行项目&#xff0c;报某个类有问题&#xff0c;引入的包不存在。 点击这个引入的包&#xff0c;可以看到它在左侧外部库里存在。 试过的无效方法 双击ctrl&#xff0c;在弹出框中mvn idea:idea在文件里&#xff0c;清空缓存并重启在右…

⽂件内容的读写

文件 InputStream &#xff08;字节流读出 抽象类&#xff09; InputStream 只是⼀个抽象类&#xff0c;要使⽤还需要具体的实现类 FileInputStream&#xff08;字节流读出&#xff09; OutputStream&#xff08;字节流写入&#xff09; Reader&#xff08;字符流读入&#xff…

vue2和vue3:diff算法的区别?

Vue 2 和 Vue 3 在 diff 算法方面的主要区别是&#xff1a; Vue 2 使用普通的 diff 算法&#xff0c;它会遍历所有的节点进行比对。 Vue 3 引入了 patch flag 的概念&#xff0c;并且对 diff 算法进行了优化&#xff0c;比如在相同层级的节点间不会去递归比对已经被移除的节点…