大数据Flink(一百二十):Flink SQL自定义函数(UDF)

devtools/2024/9/25 3:19:38/

文章目录

Flink SQL自定义函数(UDF)

一、概述

二、​​​​​​​自定义标量函数(UDSF)

三、​​​​​​​​​​​​​​自定义聚合函数(UDAF)

四、 ​​​​​​​​​​​​​​自定义表值函数(UDTF)


Flink SQL自定义函数(UDF)

Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。

一、概述

在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:

其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。

进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。

点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。 

点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。

二、​​​​​​​​​​​​​​自定义标量函数(UDSF)

  • 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。

udfs.py内容如下:

from pyflink.table import DataTypes
from pyflink.table.udf import udf@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end]

 说明:

  • sub_string定义了获取每条数据中从begin~end位的字符的代码;
  • 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
  • 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;

进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:

sql">CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH ('connector' = 'socket','hostname' = '178.23.146.213','port' = '9999','format' = 'csv'
);
  • 查询语句如下
sql">SELECT sub_string(a,2,5)
FROM function_udf;
  • 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
  • 在ecs向9999发送数据
123|456,4,2
12|3456,7,1

结果如下:

查询结果是function_udf表中a字段每行字符串的第3-5个字符。

三、​​​​​​​​​​​​​​自定义聚合函数(UDAF)

  • 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。

udafs.py内容如下:

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))

说明:

  • 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
  • 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
  • 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
  • create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。

仍然使用function_udf表,查询语句如下:

sql">SELECT weighted_avg(b,c)
FROM function_udf;

选中查询语句运行之后,向9999端口依次发送数据,如下:

123|456,4,2

12|3456,7,1

 

查询结果是以c字段为权重的b字段当前数据和历史数据的均值。

四、 ​​​​​​​​​​​​​​自定义表值函数(UDTF)

自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。

udtfs.py内容如下:

from pyflink.table import DataTypes
from pyflink.table.udf import udtf@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits = s.split("|")yield splits[0], splits[1]

说明:

  • 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
  • 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
  • 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。

 仍然使用function_udf表,查询语句如下:

sql">SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
  • 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1

结果:

查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

http://www.ppmy.cn/devtools/116782.html

相关文章

自动化学习3:日志记录及测试报告的生成--自动化框架搭建

一.日志记录 1.配置文件pytest.ini:将日志写入文件方便日后查询或查看执行信息。 需要将文件处理器(文件存放位置/时间/格式等等)添加到配置文件中的【日志记录器】 # pytest.ini [pytest] # ---------------日志文件,需要配合…

力扣算法题总结

lc253 题目:求最多重叠(x,y)的数量 思路:按y排序,把y放入优先队列,逐个比较x,x大于优先队列的堆顶元素就弹出堆顶。 lc148 题目:对链表排序 思路:归并排序。快慢指针找到链表中点&#xff0c…

为什么现在的LLM都是Decoder only的架构?

在我看来,Encoder-Decoder架构和Decoder only架构的主要区别,在于它们的灵活性和专业性有所差别。 Encoder-Decoder架构通常用于处理一些需要在输入和输出间建立精确映射的任务,比如机器翻译、文本摘要等。在这些任务中,理解输入…

蓝桥杯算法之暴力

暴力 1.十进制数转换成罗马数字 2.判断给出的罗马数字是否正确 小知识 %(模除): % 符号用作模除(或取模)运算符。模除运算是一种数学运算,它返回两个数相除的余数。 具体来说,如果 a 和 b 是…

Android CarrierConfig 配置问题的解决流程

开发步骤 确认代码路径 查看编译用的CarrierConfig APK在项目代码的path,一般是源码或者厂商定制的: packages/apps/CarrierConfig/vendor/mediatek/proprietary/packages/apps/CarrierConfig Note:一些overlay的方式是替换xml文件&#…

C++ STL容器(三) —— 迭代器底层剖析

本篇聚焦于STL中的迭代器,同样基于MSVC源码。 文章目录 迭代器模式应用场景实现方式优缺点 UML类图代码解析list 迭代器const 迭代器非 const 迭代器 vector 迭代器const 迭代器非const迭代器 反向迭代器 迭代器失效参考资料 迭代器模式 首先迭代器模式是设计模式中…

[数据结构]无头单向非循环链表的实现与应用

文章目录 一、引言二、线性表的基本概念1、线性表是什么2、链表与顺序表的区别3、无头单向非循环链表 三、无头单向非循环链表的实现1、结构体定义2、初始化3、销毁4、显示5、增删查改 四、分析无头单向非循环链表1、存储方式2、优点3、缺点 五、总结1、练习题2、源代码 一、引…

Redis中的setnx的使用场景

Redis中的SETNX命令是一个非常有用的工具,特别是在处理分布式系统和并发控制时。SETNX是“Set if Not Exists”的缩写,用于设置键的值,但仅当键不存在时。以下是SETNX命令的一些主要使用场景: 1. 分布式锁 在分布式环境中&#…