Flink中aggregate[AggregateFunction]的使用及讲解

news/2024/10/18 5:49:33/

Flink的aggregate()方法一般是通过实现AggregateFunction接口对数据流进行聚合计算的场景。例如,在使用 Flink 的DataStream API时,用户经常需要对输入数据进行分组操作,并按照一组 key对数据进行汇总、运算或聚合计算。对于这些场景,可以使用 aggregate()方法来实现聚合计算。通过指定一个AggregateFunction类型的函数作为聚合操作来调用aggregate()方法,可以对元素流进行聚合和处理,生成新的输出流。在具体应用中,根据不同的业务需求,可以根据实际情况选择不同类型的AggregateFunction来完成聚合计算任务。
接下来先对AggregateFunction中的需要实现的4个方法进行说明
1. createAccumulator()
此方法用于创建累加器,并将其初始化为默认值
2. add()
此方法将输入的元素添加到累加器,返回更新后的累加器
3. getResult()
此方法用于从累加器中提取操作的结果
4. merge()
此方法将两个累加器合并为一个新的累加器

下面在通过代码实例说明AggregateFunction的使用,这里都以Tuple2类型作为举例说明

  • 求平均值
public static class AverageAggregate implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {@Overridepublic Tuple2<String, Double> createAccumulator() {// 先将累加器进行初始化,这里给了一个""作为key, 0.0作为值return Tuple2.of("", 0.0); }@Overridepublic Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {// 这里的实现是将输入的元素和累加器中的元素相加,并返回一个新的元素return Tuple2.of(value.f0, accumulator.f1 + value.f1); }@Overridepublic Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {// 这里返回一个包含平均值的 Tuple2 对象,这里是将累加器中的元素除以2,然后返回一个新元素。return Tuple2.of(accumulator.f0, accumulator.f1 / 2.0);}@Overridepublic Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) {// 这里是将两个累加器中的元素相加并除以2,然后返回一个新的元素对。return Tuple2.of(a.f0, (a.f1 + b.f1) / 2);}}
  • 求最大值
public class MaxAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {@Overridepublic Tuple2<String, Double> createAccumulator() {return Tuple2.of("", Double.MIN_VALUE); // 将累加器初始化为最小值}@Overridepublic Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {if (value.f1 > accumulator.f1) {return Tuple2.of(value.f0, value.f1);} else {return accumulator;}}@Overridepublic Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {return accumulator; // 返回最大值}@Overridepublic Tuple2<String, Double> merge(Tuple2<String, Double> a,Tuple2<String, Double> b) {if (a.f1 > b.f1) {return a;} else {return b;}}
}
  • 求最小值
public class MinAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {@Overridepublic Tuple2<String, Double> createAccumulator() {return Tuple2.of("", Double.MAX_VALUE); // 将累加器初始化为最大值}@Overridepublic Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {if (value.f1 < accumulator.f1) {return Tuple2.of(value.f0, value.f1);} else {return accumulator;}}@Overridepublic Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {return accumulator; // 返回最小值}@Overridepublic Tuple2<String, Double> merge(Tuple2<String, Double> a,Tuple2<String, Double> b) {if (a.f1 < b.f1) {return a;} else {return b;}}
}
  • 求和
public class SumAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Double, Double> {@Overridepublic Double createAccumulator() {return 0.0; // 将累加器初始化为0}@Overridepublic Double add(Tuple2<String, Double> value, Double accumulator) {return value.f1 + accumulator; // 将输入元素和累加器中的元素相加}@Overridepublic Double getResult(Double accumulator) {return accumulator; // 返回总和}@Overridepublic Double merge(Double a,Double b) {return a + b; // 合并两个累加器中的元素相加}
}

以上代码就是通过实现AggregateFunction接口,自定义不同的逻辑达到求平均值、最大值、最小值、总和的目的。

  • 方法调用演示
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/*** @Author: J* @Version: 1.0* @CreateTime: 2023/2/1* @Description: 测试**/
public class Demo1 {public static void main(String[] args) throws Exception {Properties prop = new Properties();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里以kafka作为数据源KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("lx01:9092").setTopics("topic-01").setGroupId("g02").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");// 先将数据转成需要的Tuple2的形式SingleOutputStreamOperator<Tuple2<String, Double>> mapStream = stream.map((MapFunction<String, Tuple2<String, Double>>) value -> {JSONObject data = JSONObject.parseObject(JSONObject.parseObject(value).get("data").toString());return Tuple2.of(data.getString("gender"), data.getDouble("salary"));}).returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));// 这里先通过keyBy将数据根据性别进行分组,然后5秒为一个窗口,再求不同性别对应的工资平均值SingleOutputStreamOperator<Tuple2<String, Double>> avg = mapStream.keyBy((KeySelector<Tuple2<String, Double>, String>) value -> {String key = value.f0;return key;}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AverageAggregate()); // 这里调用平均值的AggregateFunctionavg.print();env.execute();}
  • AggregateFunction的这4个方法在flink中执行的原理
    在 Flink 中AggregateFunction的这四个方法在执行过程中会被转化为对应的内部Function对象,用于Flink的运行时执行计算。

    1. 在数据输入流进入 Flink 的过程中,Flink会为每一个key创建对应的累加器。键值对流会按照键所在的组进行分区,然后把每一个组的所有元素分配到一个task slot中,并为每个key创建一个累加器。累加器的类型是任务的一个状态的函数,Flink根据累加器函数的类型来决定使用哪种累加器。
    2. 当每个数据元素输入到累加器中时,add()方法会被调用。add()方法对输入的元素进行变换,然后更新累加器中的结果,返回新的结果给Flink 的相应算子。
    3. 在结果计算完毕后,getResult()方法将被调用,并将结果返回给 Flink。最后,如果有多个累加器需要合并的情况,Flink 会调用merge()方法将结果进行合并。通过这样的执行机制,AggregateFunction对象可以更加灵活快捷地处理数据。
  • 累加器的选择
    当 Flink 创建累加器时,它会根据AggregateFunction的类型来确定使用哪种类型的累加器。具体来说,Flink支持两种类型的累加器:heap-basedincremental
    heap-based累加器需要在内存中存储完整的所有元素,对于数据量较小的情况,它可以提供最好的性能。对于数据量更大的情况,它可能会导致内存不足的问题。
    incremental累加器可以在输入元素上进行增量操作,并在内存中保存仅仅是必要的元素。它可以处理更大的数据量,并且在内存使用上更加高效。在使用增量式累加器时,用户需要重写accumulate()retract()方法。
    根据AggregateFunction的类型,Flink会自动选择合适的累加器类型来进行计算,以提高计算的效率和性能。

以上就是对aggregate方法的使用讲解及简单的原理介绍


http://www.ppmy.cn/news/239593.html

相关文章

爬虫学习笔记02-基本模块

爬虫学习笔记02-基本模块 Request 概念&#xff1a;Request是python中原生的一款基于网络请求的模块。 特点&#xff1a;功能强大&#xff0c;简单便捷&#xff0c;效率极高。 作用&#xff1a;模拟浏览器发请求。 使用方式&#xff1a;&#xff08;requests 模块的编码流程&…

tp6支付宝支付

公司要整一个扫码支付然后有个后台能查看交易记录&#xff0c;然后百度搜寻&#xff0c;决定使用laytp2.0框架搭后台。 配置啥的不阐述了&#xff0c;支付宝支付相对微信来说简单一点&#xff0c;就支付&#xff0c;异步回调&#xff0c;同步回调三个方法。 前端代码&#xf…

PHP实现小程序微信支付(v3版本)

PS:本篇文章是PHP对小程序进行微信支付v3版本的实现,仅用于对支付流程的了解,具体使用方面需要大家自行调整 小程序端JS代码: getPrepayID(){var that thiswx.getStorage({key:openid,success(res){that.setData({openid:res.data})}})wx.getStorage({key:username,success(…

oppoa57计算机有存储功能吗,虽是入门级产品,但OPPO A57这两个功能值得称赞

原标题&#xff1a;虽是入门级产品&#xff0c;但OPPO A57这两个功能值得称赞 一向低调的OPPO随着R11s新机的预热再次走到了消费者的视野中&#xff0c;作为其首款全面屏旗舰&#xff0c;该机无疑成为了用户新的追求对象。当然&#xff0c;如果你是实用主义者觉得该机价格较高&…

中兴f660有没有千兆_电信F660是不是千兆光纤猫?200兆光宽带能不能用电信中兴F660光纤猫?...

展开全部 F660光纤猫可用于千兆光纤宽带,自带无线路由器,但只有lan1口支持32313133353236313431303231363533e59b9ee7ad9431333365633934千兆输出,其余的只能支持百兆输出。 中国电信集团公司是我国特大型国有通信企业、上海世博会全球合作伙伴&#xff0c;连续多年入选"世…

phpexcel 在php7中出现500错误

PHPExcel在PHP7中&#xff0c;Writer->save出现ERR_INVALID_RESPONSE错误的解决方法 这个Writer->save错误可能由于很多原因导致&#xff0c;其中有一部分是因为header和缓冲区的错误导致的。 这部分具体讨论可以看这里 http://stackoverflow.com/questions/8566196/php…

一代机皇Redmi K20 Pro销量破500万 Redmi K30 Pro即将正式发布

2020年2月24日&#xff0c;Redmi通过官方微博宣布&#xff0c;深受500多万用户喜爱的Redmi K20 Pro谢幕。Redmi K20 Pro于2019年5月28日发布&#xff0c;全球持续热销&#xff0c;总销量突破500万。受行业5G转型大环境的影响&#xff0c;过去一年4G旗舰市场竞争尤其激烈&#x…

移动光猫试玩

移动自带光猫中移物联吉比特GM620&#xff0c;网上查不到这个型号&#xff0c;还好CMCCAdmin密码网上查的到&#xff0c;登录进去看看&#xff0c;改掉TR069连接的VLAN ID和RMS上报URL。 这光猫配置还行&#xff0c;实际是ZTE ZX279128&#xff0c;双核1.5G&#xff0c;512M R…