flink的EventTime和Watermark

news/2025/1/8 5:01:24/

时间机制

Flink中的时间机制主要用在判断是否触发时间窗口window的计算。

在Flink中有三种时间概念:ProcessTime、IngestionTime、EventTime。

ProcessTime:是在数据抵达算子产生的时间(Flink默认使用ProcessTime)

IngestionTime:是在DataSource生成数据产生的时间

EventTime:是数据本身携带的时间,具有实际业务含义,不是Flink框架产生的时间

水位机制

由于网络原因、故障等原因,数据的EventTIme并不是单调递增的,是乱序的,有时与当前实际时间相差很大。

水位(watermark)用在EventTime语义的窗口计算,可以当作当前计算节点的时间。当水位超过窗口的endtime,表示事件时间t <= T的数据都**已经到达,**这个窗口就会触发WindowFunction计算。当水位超过窗口的endtime+允许迟到的时间,窗口就会消亡。本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。

  1. 队列中是乱序的数据,流入长度3s的窗口。2s的数据进入[0,4)的窗口中
  2. 2s、3s、1s的数据进入[0,4)的窗口,7s的数据分配到[4,8)的窗口中
  3. 水印4s到达,代表4s以前的数据都已经到达。触发[0,4)的窗口计算,[4,8)的窗口等待数据
  4. 水印9s到达,[4,8)的窗口触发

多并行情况下,不同的watermark流到算子,取最小的wartermark当作当前算子的watermark。

如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

生成水位

首先设置env为事件时间

使用 DataStream API 实现 Flink 任务时,Watermark Assigner 能靠近 Source 节点就靠近 Source 节点,能前置尽量前置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//测试数据,间隔1s发送
DataStreamSource<Tuple2<String, Long>> source = env.addSource(new SourceFunction<Tuple2<String, Long>>() {@Overridepublic void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {ctx.collect(Tuple2.of("aa", 1681909200000L));//2023-04-19 21:00:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909500000L));//2023-04-19 21:05:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909800000L));//2023-04-19 21:10:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910100000L));//2023-04-19 21:15:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910400000L));//2023-04-19 21:20:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910700000L));//2023-04-19 21:25:00Thread.sleep(Long.MAX_VALUE);}@Overridepublic void cancel() {}});

抽取EventTime、生成Watermark

周期性水位–AssignerWithPeriodicWatermarks�(常用)

周期性生成水位。周期默认的时间是 200ms.

源码如下:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}

自定义实现AssignerWithPeriodicWatermarks,代码如下:

source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {return new Watermark(currentTimestamp);}@Override//获取事件时间public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

运行结果如下:

水位时间到达2023-04-19 21:10:00触发窗口2023-04-19 21:00:00到2023-04-19 21:10:00,窗口中的数据为2023-04-19 21:00:00和2023-04-19 21:05:00

水位时间到达2023-04-19 21:20:00触发窗口2023-04-19 21:10:00到2023-04-19 21:20:00,窗口中的数据为2023-04-19 21:10:00和2023-04-19 21:15:00

长时间等待后,2023-04-19 21:20:00到2023-04-19 21:30:00是存在一个2023-04-19 21:25:00的数据,一直没有触发。这是因为没有新的数据进入,周期性生成的watermark一直是2023-04-19 21:20:00。所以后面窗口即使有数据也没有触发计算。

BoundedOutOfOrdernessTimestampExtractor�

BoundedOutOfOrdernessTimestampExtractor实现了AssignerWithPeriodicWatermarks接口,是flink内置的实现类。

主要源码如下:

public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}public abstract long extractTimestamp(T element);@Overridepublic final Watermark getCurrentWatermark() {long potentialWM = currentMaxTimestamp - maxOutOfOrderness;if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}@Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = extractTimestamp(element);if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;}

BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

设置maxOutOfOrderness为5min,代码如下:

source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

运行结果如下:

看起来和我们自定义实现结果一样。但是10min的水位时间是来自数据15min减去延迟时间5min得来的。

同理20min的水位时间是来自数据25min减去延迟时间5min得来的。

我们可以设置延迟时间为10min,看一下结果。最后一条数据是25min,那么最后的水位线就是25min-10min=15min。只会触发00-10的窗口。

同样的,由于没有后续数据导致后面的窗口没有触发。

AscendingTimestampExtractor

AscendingTimestampExtractor要求生成的时间戳和水印都是单调递增的。用户实现从数据中获取自增的时间戳extractAscendingTimestamp与上一次时间戳比较。如果出现减少,则打印warn日志。

源码如下:

    public abstract long extractAscendingTimestamp(T element);@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}@Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}

间断性水位线

适用于根据接收到的消息判断是否需要产生水位线的情况,用这种水印生成的方式并不多见。

举例如下,数据为15min的时候生成水位。

source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {DateTime date = DateUtil.date(lastElement.f1);return date.minute()==15?new Watermark(lastElement.f1):null;}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

结果如下:

15min的数据生成了15min的水位,只触发了00-10的窗口。

窗口处理迟到的数据

allowedLateness�

Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

side output

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

// 侧输出的OutputTagOutputTag<Tuple2<String, Long>> lateOutputTag = new OutputTag<>("late_data_output_tag");SingleOutputStreamOperator<Object> process = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).sideOutputLateData(lateOutputTag).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:" + DateUtil.date());System.out.println("当前水位时间:" + DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:" + DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:" + DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:" + DateUtil.date(element.f1)));}});//处理侧输出数据
//        process.getSideOutput(lateOutputTag).addSink()

最后的window不触发解决方法

自定义自增水位

周期性获取watermark时,自定义增加水位

source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {currentTimestamp+=60000;return new Watermark(currentTimestamp);}@Override//获取事件时间public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}})

结果如下:

自定义trigger

当watermark不能满足关窗条件时,我们给注册一个晚于事件时间的处理时间定时器使它一定能达到关窗条件。

import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class MyTrigger extends Trigger<Object, TimeWindow> {@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());ctx.registerProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());return TriggerResult.FIRE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (time == window.maxTimestamp()) {ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);}
}

Test.java

参考链接:

https://www.jianshu.com/p/c612e95a5028

https://blog.csdn.net/lixinkuan328/article/details/104129671

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/

https://cloud.tencent.com/developer/article/1573079

https://blog.csdn.net/m0_73707775/article/details/129560540?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&utm_relevant_index=10


http://www.ppmy.cn/news/1561464.html

相关文章

哦?将文本转换为专业流程图的终极解决方案?

前言 今天介绍的这款工具号称是将文本转换成专业流程图的终极解决方案。一起来看看是否能满足你的需求吧。 首页 平平无奇的首页&#xff08;通常地下隐藏着不平常的东西&#xff09;&#xff0c;和我们之前介绍过的工具类似&#xff0c;核心就是我们中间的文本输入框。在输入…

【Python】基于blind-watermark库添加图片盲水印

blind-watermark 是一个用于在图像中添加和提取盲水印的 Python 库。盲水印是一种嵌入信息&#xff08;如水印&#xff09;到图像中的方法&#xff0c;使得水印在视觉上不可见&#xff0c;但在需要时可以通过特定的算法进行提取。以下是如何使用 blind-watermark 库来添加和提取…

君正T41交叉编译ffmpeg、opencv并做h264软解,利用君正SDK做h264硬件编码

目录 1 交叉编译ffmpeg----错误解决过程&#xff0c;不要看 1.1 下载源码 1.2 配置 1.3 编译 安装 1.3.1 报错&#xff1a;libavfilter/libavfilter.so: undefined reference to fminf 1.3.2 报错&#xff1a;error: unknown type name HEVCContext; did you mean HEVCPr…

Fastlio_localization的完整跑通记录,包括源码获取、编译、地图制作、运行及运行视频、脚本修改、结果保存等(跑通官方数据集以及Apollo数据集)

这篇文章做一个Fastlio_localization(以下简称fastlio_lc)的跑通记录文档。 步骤包括&#xff1a;源码获取、数据集获取、环境配置、源码编译、定位地图制作、基于数据集的脚本修改、源码运行、定位结果保存txt 等步骤 1.源码获取 1.直接进入github上下载&#xff0c;地址为&…

爬虫基础之爬取表情包GIF

网站地址 : 热门表情_发表情&#xff0c;表情包大全fabiaoqing.com 爬取思路: 通过开发者工具找到包含页面表情包的数据包 通过re parsel css等解析提取数据 爬取步骤: 一. 请求数据 模拟浏览器向服务器发送请求 打开F12 or 右击开发者模式 CtrlF 打开搜索框 输入要…

深入浅出 Pytest:自动化测试的最佳实践 pytest教程 程序测试 单元化测试

一、用法 .1 断言 在测试函数中用assert&#xff08;断言&#xff09;来判断测试是否符合预期。 # 一个成功的测试函数def test_passing():assert (1, 2, 3) (1, 2, 3)# 一个失败的测试函数def test_failing():assert (1, 2, 3) (3, 2, 1)assert 语句本质上是用于检查一个表…

【保姆级爬虫】微博关键词搜索并获取博文和评论内容(python+selenium+chorme)

微博爬虫记录 写这个主要是为了防止自己忘记以及之后的组内工作交接&#xff0c;至于代码美不美观&#xff0c;写的好不好&#xff0c;统统不考虑&#xff0c;我只能说&#xff0c;能跑就不错了&#xff0c;上学压根没学过python好吧&#xff0c;基本上是crtlc&ctrlv丝滑小…

JavaScript网页设计案例:响应式动态购物车

在现代网页开发中&#xff0c;购物车是电子商务网站的重要功能之一。通过JavaScript&#xff0c;我们可以实现一个响应式动态购物车&#xff0c;提供用户友好的体验&#xff0c;并展示前端开发的核心能力。 案例需求 我们的购物车需要实现以下功能&#xff1a; 动态添加商品&…