TableAPI

news/2024/11/29 13:27:46/

序言

我个人还是觉得直接使用DataStream更加直观,编程也更有灵活,控制起来也更方便.如果全靠一堆SQL或者方法来替代,你完全没法确定最终会转变成什么算子.编程逻辑不直观.而且肯定会造成跟多的资源浪费,比如状态存储的空间,算子间的转发.cuiyaonan2000@163.com

TableAPI

Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。---有点想说TableAPI 比SQL更好的感觉cuiyaonan2000@163.com

示例1

简单点的示例,通过代码都能猜出来

序扫描了 Orders 表,通过字段 a 进行分组,并计算了每组结果的行数。

import org.apache.flink.table.api.*;import static org.apache.flink.table.api.Expressions.*;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 在表环境中注册 Orders 表
// ...// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)Table counts = orders.groupBy($("a")).select($("a"), $("b").count().as("cnt"));// 打印
counts.execute().print();

示例2

这就不好猜了.

这个程序也扫描 Orders 表。程序过滤了空值,使字符串类型的字段 a 标准化,并且每个小时进行一次计算并返回 a 的平均账单金额 b

// 环境配置
// ...// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)Table result = orders.filter(and($("a").isNotNull(),$("b").isNotNull(),$("c").isNotNull())).select($("a").lowerCase().as("a"), $("b"), $("rowtime")).window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow")).groupBy($("hourlyWindow"), $("a")).select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount"));

选择|过滤|别名

From #

Batch Streaming

和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。

Table orders = tableEnv.from("Orders");

FromValues #

Batch Streaming

和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。

你可以使用 row(...) 表达式创建复合行:

//Flink会自动匹配数据类型
Table table = tEnv.fromValues(row(1, "ABC"),row(2L, "ABCDE")
);//指定数据类型
Table table = tEnv.fromValues(DataTypes.ROW(DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),DataTypes.FIELD("name", DataTypes.STRING())),row(1, "ABC"),row(2L, "ABCDE")
);

关于TableApi的数据类型可以查看Flink TableAPI Aggregation And DataType_cuiyaonan2000的博客-CSDN博客

Select #

Batch Streaming

和 SQL 的 SELECT 子句类似。 执行一个 select 操作。

//获取制定的列 并设置别名
Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));//获取所有列
Table result = orders.select($("*"));

As #

Batch Streaming

重命名字段。


//这个设置别名比较有意思
//同时如上所有的操作都是返还一个Table对象
Table orders = tableEnv.from("Orders");
Table result = orders.as("x, y, z, t");

Where / Filter #

Batch Streaming

和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。

//where可能更像sql
orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));//filter
Table orders = tableEnv.from("Orders");
Table result = orders.filter($("b").isEqual("red"));

列操作

AddColumns #

Batch Streaming

执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。(这个就是Sql中的子查询返回一个结果列cuiyaonan2000@163.com)

Table orders = tableEnv.from("Orders");//这表示新增一个字段,是orders的c字段内容加上sunny 组成结果列cuiyaonan2000@163.com
Table result = orders.addColumns(concat($("c"), "sunny"));//如下a1 是别名
Table result = table.addColumns(  $("mycount").plus(1).as("a1"));

AddOrReplaceColumns #

Batch Streaming

执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换,之前的字段自动消失。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。

Table orders = tableEnv.from("Orders");
Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc"));

DropColumns #

Batch Streaming

删除字段

Table orders = tableEnv.from("Orders");
Table result = orders.dropColumns($("b"), $("c"));

RenameColumns #

Batch Streaming

执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。

Table orders = tableEnv.from("Orders");
Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2"));

Group Windows #

调整下顺序,官网的就不能把其模块一定会用的东西先说明下么,看的云里雾里的cuiyaonan2000@163.com

Group window 聚合根据时间行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。

定义窗口

窗口是使用 window(GroupWindow w) 子句定义的,并且需要使用 as 子句来指定别名(窗口一定要设置别名cuiyaonan2000@163.com)。为了按窗口对表进行分组,窗口别名的引用必须像常规分组属性一样在 groupBy(...) 子句中(如此这般相当于增加了一个字段,并按照该字段进行分组,分组的条件是该字段 + 时间窗口的开始和结束cuiyoanna2000@163.com)。 以下示例展示了如何在表上定义窗口聚合。

Table table = input.window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w.groupBy($("w"))  // 以窗口 w 对表进行分组.select($("b").sum());  // 聚合

在流环境中(是否只能在流环境中????),如果窗口聚合除了窗口之外还根据一个或多个属性进行分组,则它们只能并行计算,例如,groupBy(...) 子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名(例如在上面的示例中)的 groupBy(...) 子句只能由单个非并行任务进行计算。 以下示例展示了如何定义有附加分组属性的窗口聚合。

Table table = input.window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w.groupBy($("w"), $("a"))  // 以属性 a 和窗口 w 对表进行分组.select($("a"), $("b").sum());  // 聚合

时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中,如 w.startw.end 和 w.rowtime。窗口开始和行时间戳是包含的上下窗口边界。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳,“14:29:59.999” 作为行时间时间戳,“14:30:00.000” 作为结束时间戳。

Table table = input.window([GroupWindow w].as("w"))  // 定义窗口并指定别名为 w.groupBy($("w"), $("a"))  // 以属性 a 和窗口 w 对表进行分组.select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳

Tumble (Tumbling Windows 即滚动窗口) #

滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。

滚动窗口是通过 Tumble 类定义的,具体如下:

MethodDescription
over将窗口的长度定义为时间行计数间隔
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
// Tumbling Event-time Window
.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));

Slide (Sliding Windows即滑动窗口) #

滑动窗口可以定义在事件时间、处理时间或行数上。

滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。--换句话说 如果滑动间隔时间跟窗口时间大小一致,就跟滚动一样了.

因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。

滑动窗口是通过 Slide 类定义的,具体如下:

MethodDescription
over将窗口的长度定义为时间或行计数间隔。
every将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
/ Sliding Event-time Window
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("rowtime")).as("w"));// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("proctime")).as("w"));// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));

Session (Session Windows) #

会话窗口没有固定的大小,其边界是由不活动的间隔定义的,例如,如果在定义的间隔期内没有事件出现,则会话窗口将关闭。例如,定义30 分钟间隔的会话窗口,当观察到一行在 30 分钟内不活动(否则该行将被添加到现有窗口中)且30 分钟内没有添加新行,窗口会关闭。会话窗口支持事件时间和处理时间-----那如果会话窗口定义的很长,岂不是缓存的数据就非常多了????

会话窗口是通过 Session 类定义的,具体如下:

MethodDescription
withGap将两个窗口之间的间隙定义为时间间隔。
on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。
as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
// Session Event-time Window
.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));

Over Windows #

Partition By #

可选的

在一个或多个属性上定义输入的分区。每个分区单独排序聚合函数分别应用于每个分区(这里必须就是多线程么????)

注意:在流环境中,如果窗口包含 partition by 子句,则只能并行计算 over window 聚合。如果没有 partitionBy(…),数据流将由单个非并行任务处理。------我理解这里的并行不是 算子上的并行,而是将原来的流拆分成多个流的并行.跟在介绍DataStream中的keyBy会自动并行一样cuiyaonan2000@163.com

Order By #

必须的

定义每个分区内行的顺序,从而定义聚合函数应用于行的顺序。

注意:对于流处理查询,必须声明事件时间或处理时间属性。目前,仅支持单个排序属性。---流如果有延迟到的数据这个数据怎么保持?????

Preceding #

可选的

定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔。-

  • 有界 over window 用间隔的大小指定,例如,时间间隔为10分钟或行计数间隔为10行。
  • 无界 over window 通过常量来指定,例如,用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始。

如果省略前面的子句,则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值。

Following #

可选的

定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔(时间或行计数)相同的单位指定。

目前,不支持在当前行之后有行的 over window。相反,你可以指定两个常量之一:

  • CURRENT_ROW 将窗口的上限设置为当前行。
  • CURRENT_RANGE 将窗口的上限设置为当前行的排序键,例如,与当前行具有相同排序键的所有行都包含在窗口中。

如果省略后面的子句,则时间间隔窗口的上限定义为 CURRENT_RANGE,行计数间隔窗口的上限定义为CURRENT_ROW。

As #

必须的

为 over window 指定别名。别名用于在之后的 select() 子句中引用该 over window。

注意:目前,同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算。

Unbounded Over Windows #

// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));// 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"));// 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w"));// 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));

Bounded Over Windows #

// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));// 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w"));// 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));// 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));

Aggregations 

这才是重头戏

GroupBy Aggregation #

Batch Streaming Result Updating

和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。

Table orders = tableEnv.from("Orders");
Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));


http://www.ppmy.cn/news/403809.html

相关文章

最优化方法Python计算:一元函数搜索算法——二次插值法

已知连续函数 f ( x ) f(x) f(x)在 x ∗ x^* x∗近旁存在最优解 x 0 x_0 x0​。对博文《最优化方法Python计算:连续函数的单峰区间计算》讨论的 f ( x ) f(x) f(x)单峰区间的包围算法稍加修改,可算得 f ( x ) f(x) f(x)包含 x 0 x_0 x0​的单峰区间 [ a …

为什么看了那么多测试技术帖,自己都没有提升?

作为测试新手,最爱莫过于看各大牛发的技术贴,这篇很牛叉,那篇也很有道理,似乎自己看着看着也会成为高手。然而几年后,发现自己对专业知识的理解乱的很,里面更有很多自相矛盾的地方,这到底是哪里…

游泳耳机买什么牌子好一点?推荐四款出色的游泳耳机

游泳和跑步类似,短距离冲刺时,大脑没什么想法,而中长距离的有氧运动时,肉体是疲惫的,大脑是异常清晰的,时间却是格外难熬的。如何打发时间,让游泳锻炼变得不无聊,这是我从孩子时期就…

Esxi直通A40显卡给ubuntu20.4系统驱动安装过程记录

Esxi直通A40显卡给ubuntu20.4系统驱动安装过程记录 背景描述 PowerEdge R750(esxi虚拟化) 服务器已有一张T4显卡,后期新增一张A40显卡,开一台ubuntu20.4系统直通A40显卡无法开机! 开机问题解决后安装显卡驱动也各种报…

airpods pro是按压还是触摸_数码产品:airpodspro触摸区域在哪里 触摸感应在哪

最近小编发现大家对于airpodspro触摸区域在哪里 触摸感应在哪都很感兴趣,那么小编也是特地整理了一些跟airpodspro触摸区域在哪里 触摸感应在哪相关的知识,那么今天就来分享给大家关于airpodspro触摸区域在哪里 触摸感应在哪的知识吧。 airpodspro触摸区…

超火的数码产品犀牛rhino模型素材网站合集看过来

你还在为找质量高又精致的数码电子产品模型而发愁吗?看这里,小编早就贴心的为你们准备好咯~关注我,收割更多好看又实用的犀牛模型素材,之后也会陆续更新其他实用的素材哒~ 1、爱给网(高品质 全品类 最推荐)…

产品分析报告 | 二手市场面临着什么痛点?

一、竞品分析目的 象牙互GOapp是满足用户校园内安全交易闲置物品、任务悬赏的闲置交易平台。 本文旨在主要研究二手市场产品的产品定位、功能、等方面,探讨友物app的发展趋势,制定产品策略提供决策辅助,同时为象牙互GOapp交互形式提供参考方案…

2022-2028全球与中国数码配件市场现状及未来发展趋势

2021年全球数码配件市场销售额达到了 亿美元,预计2028年将达到 亿美元,年复合增长率(CAGR)为 %(2022-2028)。地区层面来看,中国市场在过去几年变化较快,2021年市场规模为 百万美元&a…