Flink-DataStreamAPI-生成水印

devtools/2025/3/11 9:01:51/

下面我们将学习Flink提供的用于处理事件时间戳和水印的API,也会介绍有关事件时间、流转时长和摄取时间,下面就让我们跟着官网来学习吧

一、水印策略介绍

为了处理事件时间,Flink需要知道事件时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来完成的。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。我们可以通过指定WatermarkGenerator来配置它。

Flink API需要一个包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。WatermarkStrategy上有许多开箱即用的常用策略作为静态方法,用户也可以在需要时构建自己的策略。

以下是一个接口示例:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{/***实例化一个{@link Timestamp Assigner},用于根据此策略分配时间戳。*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** 实例化一个水印生成器,根据此策略生成水印。*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常不会自己实现此接口,而是使用WatermarkStrategy上的静态帮助方法来实现常见的水印策略,或者将自定义TimestampAssigner与WatermarkGenerator捆绑在一起。例如,要将bounded-out-of-orderness水印和lambda函数用作时间戳赋值器,我们可以使用以下内容:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

指定TimestampAssigner是可选的,比如当使用Kafka或Kinesis时,将直接从Kafka/Kinesis记录中获取时间戳。

注意:时间戳和水印都指定为自1970-01-01T00:00:00Z的Java纪元以来的毫秒。

二、使用水印策略

Flink应用程序中有两个地方可以使用WatermarkStrategy:

        1)直接在源代码上,

        2)在非源代码操作之后。

第一个选项更可取,因为它允许源利用水印逻辑中有关分片/分区/拆分的知识。然后,源通常可以在更精细的级别跟踪水印,源生成的整体水印将更准确。直接在源上指定水印策略通常意味着我们必须使用特定于源的接口,我们我们详细看下水印策略和Kafka连接器,了解它在Kafka连接器上的工作原理,以及有关每个分区水印如何工作的更多详细信息。

第二个选项(在任意操作后设置WatermarkStrategy)仅在您不能直接在源上设置策略时才应使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...);

以这种方式使用WatermarkStrategy获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,则时间戳分配程序会覆盖它们。

三、处理空闲源

如果其中一个输入拆分/分区/分片有一段时间没有携带事件,这意味着WatermarkGenerator也没有获得任何新信息来建立水印。我们称之为空闲输入或空闲源。这是一个问题,因为我们的一些分区可能仍然携带事件。在这种情况下,水印将被保留,因为它被计算为所有不同并行水印的最小值。

为了解决这个问题,我们可以使用WatermarkStrategy来检测空闲并将输入标记为空闲。WatermarkStrategy为此提供了一个方便的代码

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

四、水印对齐

上面我知道拆分/分区/分片或源处于空闲状态并且可能会停止增加水印的情况。在光谱的另一方面,拆分/分区/分片或源可能会非常快地处理记录,从而相对更快地增加其水印。这本身不是问题。然而,对于使用水印发出一些数据的下游处理者来说,可能会成为一个问题。

在这种情况下,与空闲源相反,这种下游运算符的水印(如聚合上的窗口连接)可以继续。然而,这种运算符可能需要缓冲来自快速输入的过多数据,因为来自其所有输入的最小水印被滞后输入所抑制。因此,快速输入发出的所有记录都必须在所述下游运算符状态下进行缓冲,这可能导致运算符状态的不可控增长。

为了解决这个问题,我们可以启用水印对齐,这将确保没有源/拆分/分片/分区的水印比其他水印增加得太多。可以分别为每个源启用对齐:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

注意:我们只能为FLIP-27源启用水印对齐

启用对齐时,我们需要告诉Flink,源应该属于哪个组。通过提供一个标签(例如对齐组-1)来做到这一点,该标签将共享它的所有源绑定在一起。此外,我们必须告诉属于该组的所有源的当前最小水印的最大漂移。第三个参数描述了当前最大水印应该更新的频率。频繁更新的缺点是会有更多的RPC消息在TM和JM之间传输。

为了实现对齐,Flink将暂停从源/任务中的消费,这会产生太远的未来水印。与此同时,它将继续从其他来源/任务中读取记录,这些记录可以向前移动组合水印,从而解除屏蔽更快的水印。

注意:从Flink 1.17开始,FLIP-27源框架支持拆分级水印对齐。源连接器必须实现一个接口来恢复和暂停拆分,以便拆分/分区/分片可以在同一个任务中对齐。

如果从1.15. x和1.16.x之间的Flink版本升级,可以通过设置管道.水印对齐来禁用拆分级别对齐。allow-unaligned-source-splits为真。此外,可以通过检查源代码是否在运行时抛出UnsupportedOperationException或读取javadocs来判断它是否支持拆分级别对齐。在这种情况下,最好禁用拆分级别水印对齐以避免致命异常。

当将标志设置为true时,只有当拆分/分片/分区的数量等于源运算符的并行性时,水印对齐才会正常工作。这会导致每个子任务都被分配一个工作单元。另一方面,如果有两个Kafka分区,它们以不同的速度产生水印并被分配给同一个任务,那么水印可能不会按预期运行。幸运的是,即使在最坏的情况下,基本对齐的性能也不会比没有对齐差。

此外,Flink还支持跨相同源和/或不同源的任务对齐,这在我们有两个不同的源(例如Kafka和File)以不同的速度生成水印时很有用。

五、编写水印生成器

TimestampAssigner是一个从事件中提取字段的简单函数,因此我们不需要详细了解它们。另一方面,WatermarkGenerator的编写有点复杂。这是WatermarkGenerator接口:

/*** WatermarkGenerator 根据事件或定期生成水印**/
@Public
public interface WatermarkGenerator<T> {/*** 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 定期调用,并且可能会发出新的水印,或者不** 调用此方法和生成水印的间隔取决于{@link ExecutionConfig#getAutoWatermarkInterval()}*/void onPeriodicEmit(WatermarkOutput output);
}

水印生成有两种不同的风格:周期性和标点

周期性生成器通常通过onEvent()观察传入的事件,然后在框架调用onperiodicEmit()时发出水印。

穿孔生成器将查看onEvent()中的事件,并等待流中携带水印信息的特殊标记事件或标点符号。当它看到这些事件之一时,它会立即发出水印。通常,标点生成器不会从onPeriodicEmit()发出水印。

接下来我们将看看如何为每种样式实现生成器。

1、编写周期性水印生成器

周期性生成器观察流事件并周期性地生成水印(可能取决于流元素,或者纯粹基于流转时长)。

生成水印的间隔(每n毫秒)是通过ExecutionConfig定义的。setAutoWatermarkInterval(…)。每次都会调用生成器的onperiodicEmit()方法,如果返回的水印非空且大于前一个水印,则会发出一个新的水印。

/*** 该生成器生成水印,假设元素到达时顺序错误,但仅在一定程度上。某个时间戳t的最新元素将在时间戳t最早元素之后最多n毫秒到达。*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 以当前最高时间戳减去无序界限的形式发出水印output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** 此生成器生成的水印比处理时间滞后固定量。它假设元素在有界延迟后到达Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 不需要做任何事情,因为我们需要处理时间}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

注意:可以在每个事件上生成水印。但是,由于每个水印都会导致下游的一些计算,过多的水印会降低性能。

六、水印策略和Kafka连接器

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或有界无秩序)。然而,当使用来自Kafka的流时,多个分区通常会被并行使用,交错来自分区的事件并破坏每个分区的模式(这是Kafka的消费者客户端工作方式所固有的)。

在这种情况下,我们可以使用Flink的Kafka-partition-aware水印生成。使用该功能,水印在Kafka消费者内部,每个Kafka分区生成,每个分区的水印被合并,就像水印在流洗牌上合并一样。

例如,如果每个Kafka分区的事件时间戳严格升序,则使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。请注意,我们在示例中没有提供TimestampAssigner,而是使用Kafka记录本身的时间戳。

下面的插图展示了如何使用每个Kafka分区的水印生成,以及在这种情况下水印如何通过流数据流传播。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("my-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");

七、如何处理水印

一般来说,需要在向下游转发之前去处理给定的水印。例如,WindowOperator将首先评估应触发的所有窗口,只有在产生水印触发的所有输出后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发出。

同样的规则也适用于TwoInputStreamOperator。但是,在这种情况下,运算符的当前水印定义为其两个输入的最小值。

 -------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

2025年人工智能、数字媒体技术与社会计算国际学术会议

https://ais.cn/u/byAVfu

第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025)

https://ais.cn/u/77FJ3u

2025人工智能与计算机网络技术国际学术会议(ICAICN 2025)

https://ais.cn/u/jUfAVz

2025年数据挖掘与项目管理国际研讨会

https://ais.cn/u/nIbMvm


http://www.ppmy.cn/devtools/166244.html

相关文章

Rust 模式匹配中的可反驳性与不可反驳性

1. 什么是可反驳模式和不可反驳模式&#xff1f; 1.1.不可反驳模式&#xff08;Irrefutable Patterns&#xff09; 不可反驳模式是 总能匹配任何可能值 的模式。例如&#xff0c;下面的 let 语句&#xff1a; let x 5;x 是一个不可反驳模式&#xff0c;它匹配 任何值&#…

#函数探幽

c内联函数 内联函数与其他函数的区别&#xff08;这必须深入到程序的内部&#xff09;&#xff1a;编译的最终产品是可执行程序-----它是由机械语言指令组成。运行时程序&#xff0c;操作系统会把这些指令载入到计算机内存中&#xff0c;分配内存逐步执行。在调用函数时&#x…

深入解析K8s VolumeMounts中的subPath字段及其应用

文章目录 前言一、什么是subPath二、subPath使用场景三、场景一示例1.资源准备2.使用subPath字段 四、场景二示例1.资源准备2.测试 前言 在Kubernetes中&#xff0c;挂载存储卷是容器化应用的常见需求。然而当我们将整个卷挂载到容器中的某个目录时&#xff0c;可能会覆盖目标…

深入理解网络通信:从OSI七层模型到TCP/IP协议栈

在网络技术飞速发展的今天&#xff0c;无论是日常浏览网页、在线观看视频&#xff0c;还是企业级的数据交换和云计算服务&#xff0c;背后都离不开复杂而精密的网络通信机制。这些机制确保了数据能够在全球范围内的不同设备间准确无误地传输。为了更好地理解和掌握这一过程&…

HTML页面中divborder-bottom不占用整个底边,只占用部分宽度

根据豆包提示&#xff0c;有2个方案&#xff1a;使用使用伪元素 ::after&#xff0c;使用 linear-gradient 背景 方案1&#xff1a;通过伪元素 ::after 可以创建一个新的元素&#xff0c;并为其设置样式&#xff0c;模拟只显示一半宽度的底部边框。 解释&#xff1a; .half-…

反射是什么?

反射是Java语言的一个强大特性&#xff0c;它允许程序在运行时动态地检查和操作类、方法、字段等。通过反射&#xff0c;你可以实现一些非常灵活的功能&#xff0c;比如动态调用方法、访问私有字段等。 1. 什么是反射&#xff1f; 定义 反射&#xff08;Reflection) 是Java语…

边缘计算盒子:解决交通拥堵的智能方案

在当今的智能交通系统中&#xff0c;边缘计算盒子&#xff08;Edge Computing Box&#xff09;正逐渐成为不可或缺的核心组件。这种设备通过将计算能力下沉到网络边缘&#xff0c;极大地提升了数据处理的速度和效率&#xff0c;特别适用于实时性要求极高的交通监控场景。本文将…

【Python机器学习】1.10. 逻辑回归实战(高阶):超多阶(大于2)的逻辑回归

喜欢的话别忘了点赞、收藏加关注哦&#xff08;关注即可查看全文&#xff09;&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 1.10.1. 一些准备工作 这篇文章我们会在 1.9. 逻辑回归实战(进阶) 的基础上再进一步&#xf…