在 Spark 中,agg
是用于对 DataFrame 进行聚合操作的函数。它可以同时对多个列应用多个聚合函数,并返回一个新的 DataFrame。agg
通常与 groupBy
结合使用,用于对分组后的数据进行聚合操作。
以下是 agg
的详细用法和示例。
1. agg
的基本用法
语法
val aggregatedDF = df.agg(F.sum("column1").as("total_column1"),F.avg("column2").as("average_column2")
)
-
F.sum("column1")
:对column1
列求和。 -
F.avg("column2")
:对column2
列计算平均值。 -
as("alias")
:为聚合结果指定别名。
2. agg
与 groupBy
结合使用
agg
通常与 groupBy
结合使用,用于对分组后的数据进行聚合操作。
示例
假设有一个 DataFrame,包含用户的姓名、部门和工资:
import org.apache.spark.sql.{SparkSession, functions => F}val spark = SparkSession.builder().appName("Agg Example").master("local[*]").getOrCreate()// 示例数据
val data = Seq(("Alice", "HR", 3000),("Bob", "IT", 4000),("Charlie", "HR", 3500),("David", "IT", 4500),("Eva", "Finance", 5000)
)// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")// 按部门分组,并计算工资总和、平均工资、最高工资和最低工资
val aggregatedDF = df.groupBy("department").agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary")
)// 显示结果
aggregatedDF.show()
输出:
+----------+------------+--------------+----------+----------+
|department|total_salary|average_salary|max_salary|min_salary|
+----------+------------+--------------+----------+----------+
| HR| 6500| 3250.0| 3500| 3000|
| IT| 8500| 4250.0| 4500| 4000|
| Finance| 5000| 5000.0| 5000| 5000|
+----------+------------+--------------+----------+----------+
-
groupBy("department")
:按部门分组。 -
agg
:对每个部门计算工资总和、平均工资、最高工资和最低工资。
3. agg
的多种聚合函数
agg
可以同时应用多个聚合函数。以下是一些常用的聚合函数:
聚合函数 | 描述 |
---|---|
F.sum("column") | 对列求和 |
F.avg("column") | 计算列的平均值 |
F.min("column") | 计算列的最小值 |
F.max("column") | 计算列的最大值 |
F.count("column") | 统计列的非空值数量 |
F.collect_list("column") | 将列的值收集为列表 |
F.collect_set("column") | 将列的值收集为集合(去重) |
示例
统计每个部门的员工数量、工资总和、平均工资、最高工资、最低工资,以及员工姓名列表:
val aggregatedDF = df.groupBy("department").agg(F.count("name").as("employee_count"),F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary"),F.collect_list("name").as("employees")
)aggregatedDF.show(truncate = false)
输出:
+----------+--------------+------------+--------------+----------+----------+----------------------+
|department|employee_count|total_salary|average_salary|max_salary|min_salary|employees |
+----------+--------------+------------+--------------+----------+----------+----------------------+
|HR |2 |6500 |3250.0 |3500 |3000 |[Alice, Charlie] |
|IT |2 |8500 |4250.0 |4500 |4000 |[Bob, David] |
|Finance |1 |5000 |5000.0 |5000 |5000 |[Eva] |
+----------+--------------+------------+--------------+----------+----------+----------------------+
4. 全局聚合(不分组)
如果不使用 groupBy
,agg
会对整个 DataFrame 进行全局聚合。
示例
计算所有员工的工资总和、平均工资、最高工资和最低工资:
val globalAggDF = df.agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary"),F.max("salary").as("max_salary"),F.min("salary").as("min_salary")
)globalAggDF.show()
输出:
+------------+--------------+----------+----------+
|total_salary|average_salary|max_salary|min_salary|
+------------+--------------+----------+----------+
| 20000| 4000.0| 5000| 3000|
+------------+--------------+----------+----------+
5. 多列分组和聚合
可以对多列进行分组,并对多列应用聚合函数。
示例
假设有一个 DataFrame,包含用户的姓名、部门、职位和工资:
val data = Seq(("Alice", "HR", "Manager", 3000),("Bob", "IT", "Developer", 4000),("Charlie", "HR", "Analyst", 3500),("David", "IT", "Developer", 4500),("Eva", "Finance", "Manager", 5000)
)val df = spark.createDataFrame(data).toDF("name", "department", "role", "salary")// 按部门和职位分组,并计算工资总和和平均工资
val multiGroupDF = df.groupBy("department", "role").agg(F.sum("salary").as("total_salary"),F.avg("salary").as("average_salary")
)multiGroupDF.show()
输出:
+----------+---------+------------+--------------+
|department| role|total_salary|average_salary|
+----------+---------+------------+--------------+
| HR| Manager| 3000| 3000.0|
| IT|Developer| 8500| 4250.0|
| HR| Analyst| 3500| 3500.0|
| Finance| Manager| 5000| 5000.0|
+----------+---------+------------+--------------+
6. 使用表达式字符串
除了使用函数外,agg
还支持使用表达式字符串。
示例
val aggregatedDF = df.groupBy("department").agg("salary" -> "sum","salary" -> "avg","salary" -> "max","salary" -> "min"
)aggregatedDF.show()
输出:
+----------+-----------+-----------+-----------+-----------+
|department|sum(salary)|avg(salary)|max(salary)|min(salary)|
+----------+-----------+-----------+-----------+-----------+
| HR| 6500| 3250.0| 3500| 3000|
| IT| 8500| 4250.0| 4500| 4000|
| Finance| 5000| 5000.0| 5000| 5000|
+----------+-----------+-----------+-----------+-----------+
总结
-
agg
用于对 DataFrame 进行聚合操作,通常与groupBy
结合使用。 -
可以同时应用多个聚合函数,并为结果指定别名。
-
支持全局聚合(不分组)和多列分组聚合。
-
可以使用函数或表达式字符串定义聚合操作。