序言
我个人还是觉得直接使用DataStream更加直观,编程也更有灵活,控制起来也更方便.如果全靠一堆SQL或者方法来替代,你完全没法确定最终会转变成什么算子.编程逻辑不直观.而且肯定会造成跟多的资源浪费,比如状态存储的空间,算子间的转发.cuiyaonan2000@163.com
TableAPI
Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。---有点想说TableAPI 比SQL更好的感觉cuiyaonan2000@163.com
示例1
简单点的示例,通过代码都能猜出来
序扫描了 Orders
表,通过字段 a
进行分组,并计算了每组结果的行数。
import org.apache.flink.table.api.*;import static org.apache.flink.table.api.Expressions.*;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 在表环境中注册 Orders 表
// ...// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)Table counts = orders.groupBy($("a")).select($("a"), $("b").count().as("cnt"));// 打印
counts.execute().print();
示例2
这就不好猜了.
这个程序也扫描 Orders
表。程序过滤了空值,使字符串类型的字段 a
标准化,并且每个小时进行一次计算并返回 a
的平均账单金额 b
。
// 环境配置
// ...// 指定表程序
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)Table result = orders.filter(and($("a").isNotNull(),$("b").isNotNull(),$("c").isNotNull())).select($("a").lowerCase().as("a"), $("b"), $("rowtime")).window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow")).groupBy($("hourlyWindow"), $("a")).select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount"));
选择|过滤|别名
From #
Batch Streaming
和 SQL 查询的 FROM
子句类似。 执行一个注册过的表的扫描。
Table orders = tableEnv.from("Orders");
FromValues #
Batch Streaming
和 SQL 查询中的 VALUES
子句类似。 基于提供的行生成一张内联表。
你可以使用 row(...)
表达式创建复合行:
//Flink会自动匹配数据类型
Table table = tEnv.fromValues(row(1, "ABC"),row(2L, "ABCDE")
);//指定数据类型
Table table = tEnv.fromValues(DataTypes.ROW(DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),DataTypes.FIELD("name", DataTypes.STRING())),row(1, "ABC"),row(2L, "ABCDE")
);
关于TableApi的数据类型可以查看Flink TableAPI Aggregation And DataType_cuiyaonan2000的博客-CSDN博客
Select #
Batch Streaming
和 SQL 的 SELECT
子句类似。 执行一个 select 操作。
//获取制定的列 并设置别名
Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));//获取所有列
Table result = orders.select($("*"));
As #
Batch Streaming
重命名字段。
//这个设置别名比较有意思
//同时如上所有的操作都是返还一个Table对象
Table orders = tableEnv.from("Orders");
Table result = orders.as("x, y, z, t");
Where / Filter #
Batch Streaming
和 SQL 的 WHERE
子句类似。 过滤掉未验证通过过滤谓词的行。
//where可能更像sql
orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));//filter
Table orders = tableEnv.from("Orders");
Table result = orders.filter($("b").isEqual("red"));
列操作
AddColumns #
Batch Streaming
执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。(这个就是Sql中的子查询返回一个结果列cuiyaonan2000@163.com)
Table orders = tableEnv.from("Orders");//这表示新增一个字段,是orders的c字段内容加上sunny 组成结果列cuiyaonan2000@163.com
Table result = orders.addColumns(concat($("c"), "sunny"));//如下a1 是别名
Table result = table.addColumns( $("mycount").plus(1).as("a1"));
AddOrReplaceColumns #
Batch Streaming
执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换,之前的字段自动消失。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。
Table orders = tableEnv.from("Orders");
Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc"));
DropColumns #
Batch Streaming
删除字段
Table orders = tableEnv.from("Orders");
Table result = orders.dropColumns($("b"), $("c"));
RenameColumns #
Batch Streaming
执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。
Table orders = tableEnv.from("Orders");
Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2"));
Group Windows #
调整下顺序,官网的就不能把其模块一定会用的东西先说明下么,看的云里雾里的cuiyaonan2000@163.com
Group window 聚合根据时间或行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。
定义窗口
窗口是使用 window(GroupWindow w)
子句定义的,并且需要使用 as
子句来指定别名(窗口一定要设置别名cuiyaonan2000@163.com)。为了按窗口对表进行分组,窗口别名的引用必须像常规分组属性一样在 groupBy(...)
子句中(如此这般相当于增加了一个字段,并按照该字段进行分组,分组的条件是该字段 + 时间窗口的开始和结束cuiyoanna2000@163.com)。 以下示例展示了如何在表上定义窗口聚合。
Table table = input.window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w.groupBy($("w")) // 以窗口 w 对表进行分组.select($("b").sum()); // 聚合
在流环境中(是否只能在流环境中????),如果窗口聚合除了窗口之外还根据一个或多个属性进行分组,则它们只能并行计算,例如,groupBy(...)
子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名(例如在上面的示例中)的 groupBy(...)
子句只能由单个非并行任务进行计算。 以下示例展示了如何定义有附加分组属性的窗口聚合。
Table table = input.window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w.groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组.select($("a"), $("b").sum()); // 聚合
时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中,如 w.start
、w.end
和 w.rowtime
。窗口开始和行时间戳是包含的上下窗口边界。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳,“14:29:59.999” 作为行时间时间戳,“14:30:00.000” 作为结束时间戳。
Table table = input.window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w.groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组.select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳
Tumble (Tumbling Windows 即滚动窗口) #
滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。
滚动窗口是通过 Tumble
类定义的,具体如下:
Method | Description |
---|---|
over | 将窗口的长度定义为时间或行计数间隔。 |
on | 要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。 |
as | 指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。 |
// Tumbling Event-time Window
.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));
Slide (Sliding Windows即滑动窗口) #
滑动窗口可以定义在事件时间、处理时间或行数上。
滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。--换句话说 如果滑动间隔时间跟窗口时间大小一致,就跟滚动一样了.
因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。
滑动窗口是通过 Slide
类定义的,具体如下:
Method | Description |
---|---|
over | 将窗口的长度定义为时间或行计数间隔。 |
every | 将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。 |
on | 要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。 |
as | 指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。 |
/ Sliding Event-time Window
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("rowtime")).as("w"));// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("proctime")).as("w"));// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
Session (Session Windows) #
会话窗口没有固定的大小,其边界是由不活动的间隔定义的,例如,如果在定义的间隔期内没有事件出现,则会话窗口将关闭。例如,定义30 分钟间隔的会话窗口,当观察到一行在 30 分钟内不活动(否则该行将被添加到现有窗口中)且30 分钟内没有添加新行,窗口会关闭。会话窗口支持事件时间和处理时间-----那如果会话窗口定义的很长,岂不是缓存的数据就非常多了????
会话窗口是通过 Session 类定义的,具体如下:
Method | Description |
---|---|
withGap | 将两个窗口之间的间隙定义为时间间隔。 |
on | 要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。 |
as | 指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。 |
// Session Event-time Window
.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));
Over Windows #
Partition By #
可选的
在一个或多个属性上定义输入的分区。每个分区单独排序,聚合函数分别应用于每个分区(这里必须就是多线程么????)。
注意:在流环境中,如果窗口包含 partition by 子句,则只能并行计算 over window 聚合。如果没有 partitionBy(…),数据流将由单个非并行任务处理。------我理解这里的并行不是 算子上的并行,而是将原来的流拆分成多个流的并行.跟在介绍DataStream中的keyBy会自动并行一样cuiyaonan2000@163.com
Order By #
必须的
定义每个分区内行的顺序,从而定义聚合函数应用于行的顺序。
注意:对于流处理查询,必须声明事件时间或处理时间属性。目前,仅支持单个排序属性。---流如果有延迟到的数据这个数据怎么保持?????
Preceding #
可选的
定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔。-
- 有界 over window 用间隔的大小指定,例如,时间间隔为10分钟或行计数间隔为10行。
- 无界 over window 通过常量来指定,例如,用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始。
如果省略前面的子句,则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值。
Following #
可选的
定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔(时间或行计数)相同的单位指定。
目前,不支持在当前行之后有行的 over window。相反,你可以指定两个常量之一:
CURRENT_ROW
将窗口的上限设置为当前行。CURRENT_RANGE
将窗口的上限设置为当前行的排序键,例如,与当前行具有相同排序键的所有行都包含在窗口中。
如果省略后面的子句,则时间间隔窗口的上限定义为 CURRENT_RANGE
,行计数间隔窗口的上限定义为CURRENT_ROW。
As #
必须的
为 over window 指定别名。别名用于在之后的 select()
子句中引用该 over window。
注意:目前,同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算。
Unbounded Over Windows #
// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));// 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"));// 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w"));// 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));
Bounded Over Windows #
// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));// 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w"));// 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));// 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));
Aggregations
这才是重头戏
GroupBy Aggregation #
Batch Streaming Result Updating
和 SQL 的 GROUP BY
子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。
Table orders = tableEnv.from("Orders");
Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));