Flink移除器Evictor

embedded/2024/10/18 11:20:55/

前言

在 Flink 窗口计算模型中,数据被 WindowAssigner 划分到对应的窗口后,再经过触发器 Trigger 判断窗口是否要 fire 计算,如果窗口要计算,会把数据丢给移除器 Evictor,Evictor 可以先移除部分元素再交给 ProcessFunction 处理,也可以等 ProcessFunction 处理完成后再移除数据。

认识Evictor

Flink中所有的移除器都是org.apache.flink.streaming.api.windowing.evictors.Evictor的子类

public interface Evictor<T, W extends Window> extends Serializable {void evictBefore(Iterable<TimestampedValue<T>> var1, int var2, W var3, EvictorContext var4);void evictAfter(Iterable<TimestampedValue<T>> var1, int var2, W var3, EvictorContext var4);public interface EvictorContext {long getCurrentProcessingTime();MetricGroup getMetricGroup();long getCurrentWatermark();}
}

Evictor定义了两个方法:

  • evictBefore ProcessFunction处理前调用,用于移除无须计算的元素
  • evictAfter ProcessFunction处理后调用

内置的Evictor

Flink内置了三个Evictor,当这些Evictor不满足业务场景时,也可以自定义Evictor。

1、TimeEvictor

给定一个时间窗口大小,仅保留该时间窗口范围内的元素,对于超过了窗口时间范围的元素,会一律移除。

public class TimeEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long windowSize;private final boolean doEvictAfter;public TimeEvictor(long windowSize) {this.windowSize = windowSize;this.doEvictAfter = false;}public TimeEvictor(long windowSize, boolean doEvictAfter) {this.windowSize = windowSize;this.doEvictAfter = doEvictAfter;}public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, Evictor.EvictorContext ctx) {if (this.hasTimestamp(elements)) {long currentTime = this.getMaxTimestamp(elements);long evictCutoff = currentTime - this.windowSize;Iterator<TimestampedValue<Object>> iterator = elements.iterator();while(iterator.hasNext()) {TimestampedValue<Object> record = (TimestampedValue)iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}}
}

2、DeltaEvictor

给定一个 double 阈值和一个差值计算函数 DeltaFunction,依次计算窗口内元素和最后一个元素的差值 delta,所有 delta 超过阈值的元素都会被移除。

public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {private static final long serialVersionUID = 1L;DeltaFunction<T> deltaFunction;private double threshold;private final boolean doEvictAfter;private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.doEvictAfter = false;}private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.doEvictAfter = doEvictAfter;}public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<T>> elements, int size, Evictor.EvictorContext ctx) {TimestampedValue<T> lastElement = (TimestampedValue)Iterables.getLast(elements);Iterator<TimestampedValue<T>> iterator = elements.iterator();while(iterator.hasNext()) {TimestampedValue<T> element = (TimestampedValue)iterator.next();if (this.deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}}
}

3、CountEvictor

给定一个 maxCount,依次遍历窗口内的元素,数量超过 maxCount 后的所有元素全部移除。

public class CountEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long maxCount;private final boolean doEvictAfter;private CountEvictor(long count, boolean doEvictAfter) {this.maxCount = count;this.doEvictAfter = doEvictAfter;}private CountEvictor(long count) {this.maxCount = count;this.doEvictAfter = false;}public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, Evictor.EvictorContext ctx) {if ((long)size > this.maxCount) {int evictedCount = 0;Iterator<TimestampedValue<Object>> iterator = elements.iterator();while(iterator.hasNext()) {iterator.next();++evictedCount;if ((long)evictedCount > (long)size - this.maxCount) {break;}iterator.remove();}}}
}

自定义Evictor

实现org.apache.flink.streaming.api.windowing.evictors.Evictor接口即可自定义 Evictor,泛型要注意,第一个是元素类型,第二个是窗口类型。

举个例子,我们定义一个 Evictor,它在 ProcessFunction 计算前把窗口内所有的奇数全部移除掉,只保留偶数。

public static class MyEvictor implements Evictor<Integer, GlobalWindow> {@Overridepublic void evictBefore(Iterable<TimestampedValue<Integer>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {Iterator<TimestampedValue<Integer>> iterator = iterable.iterator();while (iterator.hasNext()) {TimestampedValue<Integer> value = iterator.next();if (value.getValue() % 2 != 0) {iterator.remove();}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Integer>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {}
}

编写一个简单的 Flink 作业验证一下我们自定义的 Evictor,数据源手动指定为数字1到6,统一分配到 GlobalWindow 窗口,Trigger 元素等于6个就出发计算,最终输出窗口内的元素

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.fromElements(1, 2, 3, 4, 5, 6).windowAll(GlobalWindows.create()).trigger(CountTrigger.of(6)).evictor(new MyEvictor()).process(new ProcessAllWindowFunction<Integer, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Integer, Object, GlobalWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {String elements = StringUtils.joinWith(",", iterable);System.err.println(elements);}});environment.execute();
}

运行Flink作业,控制台输出[2, 4, 6],奇数在计算前就被移除掉了。

尾巴

Flink 的 Evictor 主要用于在窗口计算过程中,对窗口中的元素进行筛选和剔除。通过定义特定的 Evictor 策略,可以有效地控制窗口内数据的留存和输出。 Evictor 有助于提高数据处理的准确性和效率。它能够根据业务需求,如时间、数据特征等,去除不符合条件的数据,从而使窗口的计算结果更具针对性和可靠性。


http://www.ppmy.cn/embedded/128434.html

相关文章

【Neo4j】图数据库Neo4j 认证专家考试题目总结(判断/单选/多选),正确率高达99%

目录 前言 1. 证书展示 2. 考试介绍 3. 题目荟萃 Ⅰ判断题 Ⅱ单选题 Ⅲ多选题 前言 旅程中重要的也许不是终点,而是沿途的风景。我们总是追求实现目标时的释怀,却没有认真体会过程中心境的变化和丰富的经历。 最近一直忙于学习和工作,好长一段时间没更新了,便静下…

在Oxygen编辑器中支持数学公式

在编写文档时&#xff0c;经常需要插入公式。虽然将公式作为图片插入到文档中是可以的&#xff0c;但这会使后续的修改变得非常不便。目前&#xff0c;MathML (Mathematical Markup Language) 和 LaTeX 是两种常用的数学公式描述语言&#xff0c;它们各自具有不同的特点和适用场…

Node.js基础(二)

1. NodeJs操作Mongodb 1.1. 连接数据库 const mongoose require("mongoose") mongoose.connect("mongodb://127.0.0.1:27017/company-system")1.2. 创建模型 const mongoose require("mongoose") const Schema mongoose.Schemaconst UserT…

标准IO输入输出

1、完成标准io的单字符、字符串、格式化、模块化实现两个文件的拷贝&#xff1b; //单字符拷贝 #include <myhead.h> int main(int argc, const char *argv[]) {//判断是否有3个文件传入if(3 ! argc){fputs("input file error:",stderr);return -1;}//判断源文…

使用OpenCV实现光流追踪

在计算机视觉领域&#xff0c;光流追踪是一种重要的技术&#xff0c;用于估计图像中像素运动的瞬时速度。它广泛应用于视频分析、运动检测、物体跟踪和姿态估计等领域。本文将介绍如何使用OpenCV库实现光流追踪&#xff0c;并展示一个简单的示例代码。 什么是光流追踪&#xf…

Nginx + RTMP Module搭建流媒体服务器简单步骤

Nginx RTMP Module搭建流媒体服务器的步骤如下&#xff1a; 一、准备工作 安装Nginx&#xff1a; 首先&#xff0c;需要确保服务器上已经安装了Nginx。如果尚未安装&#xff0c;可以通过包管理器&#xff08;如yum、apt等&#xff09;或从Nginx官方网站下载源代码进行编译安装…

WPF组件的自定义模板和触发器全面解析

Windows Presentation Foundation&#xff08;WPF&#xff09;是微软提供的一个用于构建桌面客户端应用程序的UI框架。其依赖于XAML&#xff08;Extensible Application Markup Language&#xff09;进行用户界面设计&#xff0c;提供了一套强大的控件和组件模型。在WPF开发中&…

在生产制造领域,可视化大屏的作用可以说无可替代。

在生产制造领域&#xff0c;可视化大屏的作用确实无可替代。 可视化大屏能够实时展示生产过程中的关键数据。如生产进度、产量、设备运行状态、质量指标等。管理人员可以通过大屏一目了然地掌握生产的整体情况&#xff0c;及时发现问题并采取措施&#xff0c;确保生产的高效进…