Flink自定义函数之表值聚合函数(UDTAGG函数)

news/2025/1/12 19:43:26/

1.表值聚合函数概念

自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。

理解:假设有一个饮料的表,这个表有 3 列,分别是 id、name 和 price,一共有 5 行。假设你需要找到价格最高的两个饮料,类似于 top2() 表值聚合函数。你需要遍历所有 5 行数据,结果是有 2 行数据的一个表。

2.表值聚合函数实现

表值聚合函数是通过扩展 TableAggregateFunction 类来实现。

TableAggregateFunction实现原理:

  1. 构造accumulator,它负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。
  2. 对于每一行数据,调用 accumulate 方法来更新 accumulator。
  3. 当所有数据都处理完之后,调用 emitValue 方法来计算和返回最终的结果。

对应必须实现以下三个方法:

  • createAccumulator():创建累加器
  • accumulate():更新累加器

下面几个 TableAggregateFunction 的方法在某些特定场景下是必须要实现的:

  • retract() :在 bounded OVER 窗口中的聚合函数必须要实现。
  • merge() :在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现
  • resetAccumulator() :在许多批式聚合中是必须要实现的。
  • emitValue() :在批式聚合以及窗口聚合中是必须要实现的。

提升流式任务的效率方式:

emitUpdateWithRetract() 在 retract 模式下,该方法负责发送被更新的值。emitValue 方法会发送所有 accumulator 给出的结果。拿 TopN 来说,emitValue 每次都会发送所有的最大的 n 个值。这在流式任务中可能会有一些性能问题。为了提升性能,用户可以实现 emitUpdateWithRetract 方法。这个方法在 retract 模式下会增量的输出结果,比如有数据更新了,我们必须要撤回老的数据,然后再发送新的数据。如果定义了 emitUpdateWithRetract 方法,那它会优先于 emitValue 方法被使用,因为一般认为 emitUpdateWithRetract 会更加高效,因为它的输出是增量的。

注意:

  1. TableAggregateFunction 的所有方法都必须是 public 的、非 static 的,而且名字必须跟上面提到的一样。
  2. createAccumulator、getResultType 和 getAccumulatorType 这三个方法是在抽象父类 TableAggregateFunction 中定义的,而其他的方法都是约定的方法。
  3. 要实现一个表值聚合函数,必须扩展org.apache.flink.table.functions.TableAggregateFunction,并且实现一个(或者多个)accumulate 方法。
  4. accumulate 方法可以有多个重载的方法,也可以支持变长参数。


http://www.ppmy.cn/news/85122.html

相关文章

百度API实现自动写诗

作者介绍 张琪,男,西安工程大学电子信息学院,2022级研究生 研究方向:机器视觉与人工智能 电子邮件:3126743452qq.com 王泽宇,男,西安工程大学电子信息学院,2022级研究生&#xff0…

IOC初始化 IOC启动阶段 (Spring容器的启动流程)

[toc](IOC初始化 IOC启动阶段 (Spring容器的启动流程)) IOC初始化 IOC启动阶段 (Spring容器的启动流程) Resource定位过程:这个过程是指定位BeanDefinition的资源,也就是配置文件(如xml)的位置,并将其封装成Resource对…

ROS学习——rviz打开bag文件

一、首先可以在这个网站中下载.bag文件用于学习。 二、显示.bag文件信息 在终端1中启动&#xff1a; roscore 在终端2中输入&#xff1a; rosbag info <bag_file_name> 这里要把<bag_file_name>换成你自己的bag文件名字&#xff0c;之后在终端就会显示出bag文件…

水下图像0

d_r_1_.jpg 一个拖着电线的水下六足机器人在海水中作业 A robot is exploring the reef on the sea floor A hexapod robot works next to reef at the bottom of the sea A rectangular deep-sea robot swims past a patch of reef An underwater robot is detecting coral …

比赛记录:Codeforces Round 874 (Div. 3) A~G

传送门:CF 前题提要:赛时A出了5道题,并且都是一遍过的,F题也已经找到了解决方法,但是没时间完成了.以为应该能上分,但是没想到赛后E题被hack掉了…绝了.然后打完这场 d i v 3 div3 div3后立马阳了,加上一大堆烦心事(包括但不限于各类考试).就导致现在才写出这篇题解. A题:A. …

自学软件测试怎么学?新增软件测试(全栈),笔试及面试全套方法

既然是自学&#xff0c;那就如下方面着手吧。 1、面试(此篇文章的重磅) 2、思路 3、心态 4、技能 真所谓&#xff0c;“面试造飞机&#xff0c;工作拧螺丝”。咱们先从第一个&#xff0c;面试着手&#xff0c;这就好比写文章先列好提纲一样&#xff0c;要知道你这个行业具体有那…

五、AOP(1)

一、AOP基本概念 1.什么是AOP 面向切面编程&#xff08;方面&#xff09;&#xff0c;利用AOP可以对业务逻辑的各个部分进行隔离&#xff0c;从而使得业务逻辑各部分之间的耦合度降低&#xff0c;提高程序的可重用性&#xff0c;同时提高了开发的效率。不通过修改源代码方式添…

CVE-2018-2894WebLogic未授权任意文件上传

CVE-2018-2894WebLogic未授权任意文件上传 这个洞的限制就比较多了 限制版本 Oracle WebLogic Server版本 10.3.6.0 12.1.3.0 12.2.1.2 12.2.1.3 限制配置 该漏洞的影响模块为web服务测试页&#xff0c;在默认情况下不启用。 /ws_utc/config.do /ws_utc/begin.do 默认情况下不…