聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

server/2024/12/2 9:38:49/

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;/*** Called for every element that gets added to a pane. The result of this will determine whether* the pane is evaluated to emit results.** @param element The element that arrived.* @param timestamp The timestamp of the element that arrived.* @param window The window to which the element is being added.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)throws Exception;/*** Called when a processing-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception;/*** Called when an event-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)throws Exception;/*** Returns true if this trigger supports merging of trigger state and can therefore be used with* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.** <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,* OnMergeContext)}*/public boolean canMerge() {return false;}/*** Called when several windows have been merged into one window by the {@link* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.** @param window The new window that results from the merge.* @param ctx A context object that can be used to register timer callbacks and access state.*/public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}/*** Clears any state that the trigger might still hold for the given window. This is called when* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.*/public abstract void clear(W window, TriggerContext ctx) throws Exception;// ------------------------------------------------------------------------/*** A context object that is given to {@link Trigger} methods to allow them to register timer* callbacks and deal with state.*/public interface TriggerContext {// ...}/*** Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,* OnMergeContext)}.*/public interface OnMergeContext extends TriggerContext {<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}
}

关于上述方法,需要注意三件事:

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:

public enum TriggerResult {// 不触发,也不删除元素CONTINUE(false, false),// 触发窗口,窗口出发后删除窗口中的元素FIRE_AND_PURGE(true, true),// 触发窗口,但是保留窗口元素FIRE(true, false),// 不触发窗口,丢弃窗口,并且删除窗口的元素PURGE(false, true);// ------------------------------------------------------------------------private final boolean fire;private final boolean purge;TriggerResult(boolean fire, boolean purge) {this.purge = purge;this.fire = fire;}public boolean isFire() {return fire;}public boolean isPurge() {return purge;}
}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

1.3 ProcessingTimeTrigger源码分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/** Creates a new trigger that fires once system time passes the end of the window. */public static ProcessingTimeTrigger create() {return new ProcessingTimeTrigger();}
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

EventTimeTriggerr在onElement设置的定时器:

在这里插入图片描述
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。

二、移除器(Evictor)

2.1 Evictor扮演的角色
在这里插入图片描述
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。

Evictor接口定义如下:

在这里插入图片描述
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。

2.2 Flink内置的Evitor

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {// 小于最大数量,不做处理return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {// 移除前size - maxCount个元素,只剩下最后maxCount个元素iterator.remove();}}}
}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {// 获取最后一个元素TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较// 若计算结果大于threshold值或者是相等,则该元素会被移除if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:

在这里插入图片描述
TimeEvictor的代码实现如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {// 如果element没有timestamp,直接返回if (!hasTimestamp(elements)) {return;}// 获取elements中最大的时间戳(到来最晚的元素的时间)long currentTime = getMaxTimestamp(elements);// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();// 清除所有时间戳小于截止时间的元素if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}

http://www.ppmy.cn/server/146677.html

相关文章

基于卷积网络结构的火灾检测系统实现

1.摘要 本文实现了实现了一个完整的火灾检测工作流&#xff0c;从数据预处理、模型训练到最终的推理和报警功能。首先基于卷积神经网络&#xff08;CNN&#xff09;模型&#xff0c;设计实现了一个可分离卷积&#xff08;SeparableConv2D&#xff09;和残差连接的卷积神经网络模…

【Git】Git 完全指南:从入门到精通

Git 完全指南&#xff1a;从入门到精通 Git 是现代软件开发中最重要的版本控制工具之一&#xff0c;它帮助开发者高效地管理项目&#xff0c;支持分布式协作和版本控制。无论是个人项目还是团队开发&#xff0c;Git 都能提供强大的功能来跟踪、管理代码变更&#xff0c;并保障…

封闭解(Closed-Form Solution)与复杂数值优化(Complex Numerical Optimization)的比较:中英双语

中文版 什么是封闭解&#xff1f; 在数学和统计学中&#xff0c;封闭解&#xff08;Closed-Form Solution&#xff09; 是指通过有限次基本运算&#xff08;如加减乘除、开方、对数、指数运算等&#xff09;即可明确表达的解。这意味着&#xff0c;当我们遇到一个数学问题或模…

[免费]SpringBoot+Vue景区订票(购票)系统【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的SpringBootVue大景区订票(购票)系统&#xff0c;分享下哈。 项目视频演示 【免费】SpringBootVue景区订票(购票)系统 Java毕业设计_哔哩哔哩_bilibili 项目介绍 现代经济快节奏发展以及不断完善升级的信息…

企业网站面临的爬虫攻击及安全防护策略

在当今数字化时代&#xff0c;企业网站不仅是展示企业形象的窗口&#xff0c;更是进行商业活动的重要平台。然而&#xff0c;企业网站在日常运营中面临着多种类型的爬虫攻击&#xff0c;这些攻击不仅会对网站的正常访问造成影响&#xff0c;还可能窃取敏感数据&#xff0c;给企…

#渗透测试#SRC漏洞挖掘#红蓝攻防#黑客工具之XSStrike介绍01

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

网络安全 社会工程学 敏感信息搜集 密码心理学攻击 密码字典生成

网络安全 社会工程学 敏感信息搜集 密码心理学攻击 理解社会工程学的概念掌握获取敏感信息的方法提高自我信息保护的意识和方法理解密码心理学的概念理解密码特征分析掌握黑客猜解密码的切入方法掌握如何提高密码强壮性 敏感信息搜集 「注」由于对实验环境的限制&#xff0c;…

利用边缘计算网关轻松解决线缆生产企业设备数据采集

边缘计算网关集成了数据采集、处理和传输等功能&#xff0c;位于传感器和执行器组成的设备层与云计算平台之间。它能够实时处理和响应本地设备的数据请求&#xff0c;减轻云平台的压力&#xff0c;提高数据处理的速度和效率。边缘计算网关的主要原理是将大部分计算和存储任务从…