Flink时间窗口程序骨架结构

devtools/2024/10/20 15:14:13/

前言

Flink 作业的基本骨架结构包含三部分:创建执行环境、定义数据处理逻辑、提交并执行Flink作业。

日常大部分 Flink 作业是基于时间窗口计算模型的,同样的,开发一个Flink时间窗口作业也有一套基本的骨架结构,了解这套结构有助于我们更快地上手时间窗口作业开发。

窗口程序的基本骨架

一个Flink时间窗口作业的代码基本骨架如下所示:

java">stream.keyBy(...)               <-  仅 keyed 窗口需要.window(...)              <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)[.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)[.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output).reduce/aggregate/apply() <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

时间窗口作业对数据逻辑的处理,主要包含以下步骤:

  • 对数据流进行分组,将DataStream装换为KeyedStream
  • 指定窗口分配器 WindowAssigner,将数据划分到对应的窗口
  • 指定窗口触发器 Trigger,决定了窗口何时关闭并计算
  • 指定窗口移除器 Evictor,它可以在窗口计算前后对窗口内的数据进行移除
  • allowedLateness 允许迟到的数据,事件时间语义下,即使事件时钟到达窗口关闭时间,窗口仍会保留一段时间以等待迟到的数据
  • sideOutputLateData 针对窗口关闭后到达的迟到数据,可以将其输出到另外一条数据流,对计算结果做修正
  • ProcessFunction 窗口内数据的处理函数

时间窗口作业实战

了解了时间窗口作业的基本骨架,以及相关组件的作用,接下来就实战一把。

如下示例程序,数据源每秒会生成2个一百以内的随机数,然后数据经过 keyBy 算子分组,这里为了简单,数据全部划分为一组,KeySelector 统一返回 “all”。

分组后,窗口分配器将数据划分到对应的窗口。这里基于处理时间语义,统一分配10秒大小的时间窗口,时间窗口被Flink封装为 TimeWindow 对象,包含两个属性,分别是起始时间戳和结束时间戳。

一旦有数据进入窗口,Trigger#onElement 就会触发,返回值决定了Flink如何处理窗口。显然我们的逻辑是时间到达窗口的结束时间,窗口就会触发计算并关闭,所以我们会注册一个 ProcessingTime 事件,窗口结束时间一到,Trigger#onProcessingTime 就会触发,窗口就会开始计算。

窗口计算前,还需要经过移除器Evictor。它有两个方法,分别在窗口计算前和计算后调用,在这里你可以根据条件移除窗口内无须计算的数据。示例程序中,把小于10的数移除掉了。

最终,窗口内的数据会交给 ProcessWindowFunction 处理,窗口内的数据被Flink封装成迭代器Iterable,通过它可以获得所有窗口内的数据。示例程序 中,我们所有元素打印出来并求和。

java">public class TimeWindowStructure {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(500);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all")// 窗口分配器.window(new WindowAssigner<Long, TimeWindow>() {static final long WINDOW_SIZE = 10_000L;@Overridepublic Collection<TimeWindow> assignWindows(Long event, long timestamp, WindowAssignerContext windowAssignerContext) {// 把数据分配到对应的窗口,一条数据甚至可以分配到多个窗口// 这里根据处理时间 分配10秒大小的窗口final long processingTime = windowAssignerContext.getCurrentProcessingTime();long start = processingTime / WINDOW_SIZE * WINDOW_SIZE;long end = start + WINDOW_SIZE;return List.of(new TimeWindow(start, end));}@Overridepublic Trigger<Long, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {// 默认触发器,废弃了return null;}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {// 窗口序列化器return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {// 是否基于事件时间语义return false;}})// 窗口触发器.trigger(new Trigger<Long, TimeWindow>() {private long max_register_processing_time = 0L;@Overridepublic TriggerResult onElement(Long element, long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {// 每个元素进入窗口,都会触发该方法 返回结果决定了窗口是否计算或关闭// 我们是根据处理时间窗口结束时间来判断是否触发的,所以注册一个处理时间事件即可if (timeWindow.maxTimestamp() > max_register_processing_time) {max_register_processing_time = timeWindow.maxTimestamp();triggerContext.registerProcessingTimeTimer(max_register_processing_time);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {// 窗口计算并清除数据return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {triggerContext.deleteProcessingTimeTimer(timeWindow.maxTimestamp());}})// 窗口移除器.evictor(new Evictor<Long, TimeWindow>() {@Overridepublic void evictBefore(Iterable<TimestampedValue<Long>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {// 窗口计算前触发Iterator<TimestampedValue<Long>> iterator = iterable.iterator();while (iterator.hasNext()) {TimestampedValue<Long> next = iterator.next();Long value = next.getValue();if (value < 10) {iterator.remove();System.err.println("Evicted:" + value);}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Long>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {// 窗口计算后触发}})// 因为是基于事件时间语义,不存在迟到数据,所以无须设置 allowedLateness、sideOutputLateData// 窗口处理函数.process(new ProcessWindowFunction<Long, String, String, TimeWindow>() {@Overridepublic void process(String group, ProcessWindowFunction<Long, String, String, TimeWindow>.Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {TimeWindow window = context.window();StringBuilder builder = new StringBuilder();builder.append("[" + window.getStart() + "-" + window.maxTimestamp() + "] elements:");Iterator<Long> iterator = iterable.iterator();Long sum = 0L;while (iterator.hasNext()) {Long value = iterator.next();sum += value;builder.append(value + " ");}builder.append(", sum:" + sum);System.err.println(builder.toString());}});environment.execute();}
}

运行Flink作业,控制台输出:

java">Evicted:3
Evicted:6
Evicted:1
[1722665800000-1722665809999] elements:89 17 16 57 94 47 67 98 , sum:485
Evicted:6
Evicted:4
[1722665810000-1722665819999] elements:86 50 71 95 36 10 55 43 96 36 28 87 89 50 53 35 63 95 , sum:1078
Evicted:4
Evicted:8
Evicted:0
[1722665820000-1722665829999] elements:85 20 42 86 46 20 32 45 91 59 57 64 31 67 78 71 28 , sum:922

尾巴

了解Flink时间窗口作业的基本骨架结构,理清Flink时间窗口的数据流转过程,有助于我们更快上手Flink时间窗口作业的开发。

Flink时间窗口作业包含的核心组件有:WindowAssigner、Window、Trigger、Evictor、ProcessWindowFunction。


http://www.ppmy.cn/devtools/127313.html

相关文章

车载软件架构---软件定义汽车的复杂性

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己,无利益不试图说服别人,是精神上的节…

构建后端为etcd的CoreDNS的容器集群(二)、下载最新的etcd容器镜像

在尝试获取etcd的容器的最新版本镜像时&#xff0c;使用latest作为tag取到的并非最新版本&#xff0c;本文尝试用实际最新版本的版本号进行pull&#xff0c;从而取到想的最新版etcd容器镜像。 一、用latest作为tag尝试下载最新etcd的镜像 1、下载镜像 [rootlocalhost opt]# …

28——循环结构之累加应用(配套练习后续更新~~~~~)

例28.1 统计奖牌 (Standard IO 3167) 时间限制: 1000 ms 空间限制: 262144 KB 具体限制题目&#xff1a;2008年北京奥运会&#xff0c;Y国的运动员参与了n天的决赛项目(1≤n≤20)。现在要统计一下Y国所获得的金、银、铜牌数目及总奖牌数。 输入 输入n&#xff0b;1行&#xf…

springboot030甘肃非物质文化网站的设计与开发(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;甘肃非物质文化网站设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本甘肃非物质文化…

Camera系统知识整理

本文是用来记录常用的Camera系统知识相关的文章 -- ing 1 图像格式 视频存储格式YUV420 NV12 NV21 i420 YV12详解 视频存储格式YUV420 NV12 NV21 i420 YV12详解_yuv420sp-CSDN博客 YUV&#xff08;YCbCr&#xff09;色彩空间详解 YUV&#xff08;YCbCr&#xff09;色彩空间…

学习虚幻C++开发日志——TSet

TSet 官方文档&#xff1a;虚幻引擎中的Set容器 | 虚幻引擎 5.5 文档 | Epic Developer Community (epicgames.com) TSet 是通过对元素求值的可覆盖函数&#xff0c;使用数据值本身作为键&#xff0c;而不是将数据值与独立的键相关联。 默认情况下&#xff0c;TSet 不支持重…

《深度学习》OpenCV EigenFaces算法 人脸识别

目录 一、EigenFaces算法 1、什么是EigenFaces算法 2、原理 3、实现步骤 1&#xff09;数据预处理 2&#xff09;特征提取 3&#xff09;构建模型 4&#xff09;识别 4、优缺点 1&#xff09;优点 2&#xff09;缺点 二、案例实现 1、完整代码 运行结果&#xff…

【星闪开发连载】WS63E模组的速度测试

目录 ​编辑 引言 程序工作原理 客户端 服务器端 测试记录 近距离测试 相邻两个房间之间的测试 相隔一个房间的两个房间之间的测试 结语 引言 今天终于又有点时间了&#xff0c;来测试一下星闪的数据传输速度。前面的博文已经分析了星闪的传输示例sle_uuid_client和…