Spark 中agg的用法

ops/2025/3/19 18:54:30/

在 Spark 中,agg 是用于对 DataFrame 进行聚合操作的函数。它可以同时对多个列应用多个聚合函数,并返回一个新的 DataFrame。agg 通常与 groupBy 结合使用,用于对分组后的数据进行聚合操作。

以下是 agg 的详细用法和示例。


1. agg 的基本用法

语法
val aggregatedDF = df.agg(F.sum("column1").as("total_column1"),F.avg("column2").as("average_column2")
)
  • F.sum("column1"):对 column1 列求和。

  • F.avg("column2"):对 column2 列计算平均值。

  • as("alias"):为聚合结果指定别名。


2. agg 与 groupBy 结合使用

agg 通常与 groupBy 结合使用,用于对分组后的数据进行聚合操作。

示例

假设有一个 DataFrame,包含用户的姓名、部门和工资:

import org.apache.spark.sql.{SparkSession, functions => F}val spark = SparkSession.builder().appName("Agg Example").master("local[*]").getOrCreate()// 示例数据
val data = Seq(("Alice", "HR", 3000),("Bob", "IT", 4000),("Charlie", "HR", 3500),("David", "IT", 4500),("Eva", "Finance", 5000)
)// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")// 按部门分组,并计算工资总和、平均工资、最高工资和最低工资
val aggregatedDF = df.groupBy("department").agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary")
)// 显示结果
aggregatedDF.show()

输出:

+----------+------------+--------------+----------+----------+
|department|total_salary|average_salary|max_salary|min_salary|
+----------+------------+--------------+----------+----------+
|        HR|        6500|        3250.0|      3500|      3000|
|        IT|        8500|        4250.0|      4500|      4000|
|   Finance|        5000|        5000.0|      5000|      5000|
+----------+------------+--------------+----------+----------+
  • groupBy("department"):按部门分组。

  • agg:对每个部门计算工资总和、平均工资、最高工资和最低工资。


3. agg 的多种聚合函数

agg 可以同时应用多个聚合函数。以下是一些常用的聚合函数:

聚合函数描述
F.sum("column")对列求和
F.avg("column")计算列的平均值
F.min("column")计算列的最小值
F.max("column")计算列的最大值
F.count("column")统计列的非空值数量
F.collect_list("column")将列的值收集为列表
F.collect_set("column")将列的值收集为集合(去重)
示例

统计每个部门的员工数量、工资总和、平均工资、最高工资、最低工资,以及员工姓名列表:

val aggregatedDF = df.groupBy("department").agg(F.count("name").as("employee_count"),F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary"),F.collect_list("name").as("employees")
)aggregatedDF.show(truncate = false)

输出:

+----------+--------------+------------+--------------+----------+----------+----------------------+
|department|employee_count|total_salary|average_salary|max_salary|min_salary|employees             |
+----------+--------------+------------+--------------+----------+----------+----------------------+
|HR        |2             |6500        |3250.0        |3500      |3000      |[Alice, Charlie]      |
|IT        |2             |8500        |4250.0        |4500      |4000      |[Bob, David]          |
|Finance   |1             |5000        |5000.0        |5000      |5000      |[Eva]                 |
+----------+--------------+------------+--------------+----------+----------+----------------------+

4. 全局聚合(不分组)

如果不使用 groupByagg 会对整个 DataFrame 进行全局聚合。

示例

计算所有员工的工资总和、平均工资、最高工资和最低工资:

val globalAggDF = df.agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary")
)globalAggDF.show()

输出:

+------------+--------------+----------+----------+
|total_salary|average_salary|max_salary|min_salary|
+------------+--------------+----------+----------+
|       20000|        4000.0|      5000|      3000|
+------------+--------------+----------+----------+

5. 多列分组和聚合

可以对多列进行分组,并对多列应用聚合函数。

示例

假设有一个 DataFrame,包含用户的姓名、部门、职位和工资:

val data = Seq(("Alice", "HR", "Manager", 3000),("Bob", "IT", "Developer", 4000),("Charlie", "HR", "Analyst", 3500),("David", "IT", "Developer", 4500),("Eva", "Finance", "Manager", 5000)
)val df = spark.createDataFrame(data).toDF("name", "department", "role", "salary")// 按部门和职位分组,并计算工资总和和平均工资
val multiGroupDF = df.groupBy("department", "role").agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary")
)multiGroupDF.show()

输出:

+----------+---------+------------+--------------+
|department|     role|total_salary|average_salary|
+----------+---------+------------+--------------+
|        HR|  Manager|        3000|        3000.0|
|        IT|Developer|        8500|        4250.0|
|        HR|  Analyst|        3500|        3500.0|
|   Finance|  Manager|        5000|        5000.0|
+----------+---------+------------+--------------+

6. 使用表达式字符串

除了使用函数外,agg 还支持使用表达式字符串。

示例
val aggregatedDF = df.groupBy("department").agg("salary" -> "sum","salary" -> "avg","salary" -> "max","salary" -> "min"
)aggregatedDF.show()

输出:

+----------+-----------+-----------+-----------+-----------+
|department|sum(salary)|avg(salary)|max(salary)|min(salary)|
+----------+-----------+-----------+-----------+-----------+
|        HR|       6500|     3250.0|       3500|       3000|
|        IT|       8500|     4250.0|       4500|       4000|
|   Finance|       5000|     5000.0|       5000|       5000|
+----------+-----------+-----------+-----------+-----------+

总结

  • agg 用于对 DataFrame 进行聚合操作,通常与 groupBy 结合使用。

  • 可以同时应用多个聚合函数,并为结果指定别名。

  • 支持全局聚合(不分组)和多列分组聚合。

  • 可以使用函数或表达式字符串定义聚合操作。


http://www.ppmy.cn/ops/167086.html

相关文章

仿“东方甄选”直播商城小程序运营平台

在公域直播流量红利趋于饱和、流量成本大幅攀升的当下,私域直播为企业开辟了新的流量聚集和转化渠道,特别是对于那些希望在私域流量领域取得突破的品牌商家来说,直播场景以其独特的高频互动氛围,相比其他运营方式,展现…

1.angular介绍

初級使用视频添加链接描述 angular工具 angular.module(‘名’, [依赖模块]) 模块 angular.bind(*) : 修改this指向 angualr.copy() // a angular.copy(a, b) —a完全覆盖了b,c就是a angular.extend(a, b) a里面集成了b属性 angular.isArray angular.isDate angular.isDefin…

涨薪技术|Kubernetes(k8s)之Pod生命周期(下)

上次推文我们学习了Pod生命周期(上)知识:相位、创建与终止、初始化容器,今天继续分享完余下的3个知识:钩子函数、容器探测、重启策略。 01钩子函数 钩子函数能够感知自身生命周期中的事件,并在相应的时刻…

【机器学习】基于conda虚拟环境的gcc、g++版本升级

最近在学习大模型部署,需要安装flash-attn,在编译时报错 c: error: unrecognized command line option ‘-stdc17’centos7.9默认gcc最高版本为4.8.5 (base) [rootxx ~]# cat /proc/version Linux version 3.10.0-1160.el7.x86_64 (mockbuildkbuilder.…

排错 -- FISCO BCOS区块链网络 -- 3. 编译智能合约

文章为FISCO BCOS2.0搭建区块链平台中发现的问题与总结,出错原因不唯一 ,解决办法不唯一 目前社区缺少完整,稳定的搭建平台和教程 ,欢迎各位及时补充,如有错误请及时评论纠正! 感谢各位搜索到这里&#…

【STM32】从新建一个工程开始:STM32 新建工程的详细步骤

STM32 开发通常使用 Keil MDK、STM32CubeMX、IAR 等工具来创建和管理工程。此处是 使用 Keil MDK5 STM32CubeMX 创建 STM32 工程的详细步骤。 新建的标准库工程文件已上传至资源中,下载后即可直接使用。 标准库新建 STM32 工程的基本目录结构:STD_STM…

Vue3中正确解析RefImpl对象

在 Vue 3 中,当你看到 RefImpl 对象时,说明这是一个通过 ref() 创建的响应式引用。要获取它的实际值,直接访问 .value 属性即可。以下是具体方法: 直接获取值(标准方式) javascript 复制 console.log(&q…

【蓝桥杯】每天一题,理解逻辑(4/90)【Leetcode 二进制求和】

题目描述 我们解析一下题目 我们可以理解到两个主要信息 给的是二进制的字符串返回他们的和 我们知道,十进制的加减法需要进位,例如:9716是因为91之后进了一位,二进制也是如此,只不过十进制是逢10进1,二…