3. Flink 窗口

devtools/2025/1/16 17:55:39/
一. 基本概念

窗口是处理无限流的核心。窗口将流划分为固定大小的“桶”,方便程序员在上面应用各种计算。Window操作是流式数据处理的一种非常核心的抽象,它把一个无限流数据集分割成一个个有界的Window,然后就可以非常方便地定义作用于Window之上的各种计算操作。

二. 窗口分类

Window分为两类,窗口程序的通用结构详见下面,主要区别是针对Keyed Window,需要首先使用keyBy将stream转化为Keyed stream,然后使用window处理,但Non-Keyed Window直接使用windowAll处理。

1. Keyed Windows

  • 将输入原始流stream转换成多个Keyed stream
  • 每个Keyed stream会独立进行计算,这样多个Task可以对Windowing操作进行并行处理
  • 具有相同Key的数据元素会被发到同一个Task中进行处理
stream.keyBy(...)               <-  keyed versus non-keyed windows.window(...)              <-  required: "assigner"[.trigger(...)]            <-  optional: "trigger" (else default trigger)[.evictor(...)]            <-  optional: "evictor" (else no evictor)[.allowedLateness(...)]    <-  optional: "lateness" (else zero)[.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data).reduce/aggregate/apply()      <-  required: "function"[.getSideOutput(...)]      <-  optional: "output tag"

2. Non-Keyed Windows

  • 原始流stream不会被分割成多个逻辑流
  • 所有的Windowing操作逻辑只能在一个Task中进行处理,计算并行度为1
stream.windowAll(...)           <-  required: "assigner"[.trigger(...)]            <-  optional: "trigger" (else default trigger)[.evictor(...)]            <-  optional: "evictor" (else no evictor)[.allowedLateness(...)]    <-  optional: "lateness" (else zero)[.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data).reduce/aggregate/apply()      <-  required: "function"[.getSideOutput(...)]      <-  optional: "output tag"
三. 窗口分配器

窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
1.滚动窗口
在这里插入图片描述

滚动窗口赋值器将每个元素赋给一个固定长度大小的窗口。滚动窗口有一个固定的大小且相互不重叠
特点:

  • 时间对齐 : 默认是aligned with epoch(整点、整分、整秒等),可以通过offset参数改变对齐方式。
  • 窗口长度固定
  • 每条数据只属于一个窗口,无重叠

代码示例:

stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
stream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

2.滑动窗口

在这里插入图片描述

滑动窗口赋值器将每个元素赋值给一个或多个固定长度大小的窗口
特点:

  • 时间对齐 : 默认是aligned with epoch(整点、整分、整秒等),可以通过offset参数改变对齐方式。
  • 窗口长度固定
  • 数据存在重叠

代码示例:

stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
stream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

3.会话窗口
在这里插入图片描述

与滚动窗口和滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时,即发生不活动间隙【session gap】时,会话窗口将关闭。
特点

  • 时间无对齐
  • event不重叠
  • 没有固定开始和结束时间

代码示例:

stream.window(EventTimeSessionWindows.withGap(Time.minutes(10)))

4.全局窗口
在这里插入图片描述

全局窗口赋值器将具有相同键的所有元素分配给同一个全局窗口。此窗口模式仅在您指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有一个可以处理聚合元素的自然结束。

代码示例:

stream.window(GlobalWindows.create())

5.计数窗口

计数窗口很好理解,当窗口的数据量达到了设定的窗口大小时,窗口函数就会被触发

代码示例:

stream.countWindowAll(10);
四. 窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

1. 增量聚合函数(ReduceFunction / AggregateFunction)
增量聚合函数就是窗口中的数据每来一个就在之前的结果上聚合一次,典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

ReduceFunction指定了来自输入的两个元素如何组合以产生相同类型的输出元素。Flink使用ReduceFunction来增量聚合窗口中的元素

需求:求最近10秒输入数字的总和
代码示例:

package org.example.test.window;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.47.130", 8888).map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String s) throws Exception {return Integer.parseInt(s);}});source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer t1, Integer t2) throws Exception {System.out.println("前值: " + t1 +  ", 最新数值: " + t2);return new Integer(t1 + t2);}}).print("sum:");env.execute("WindowReduceFunction");}
}

运行结果:
在这里插入图片描述

前值: 10, 最新数值: 20
sum::5> 30

AggregateFunction是ReduceFunction的泛化版本,它有三种类型:
输入类型(IN):输入流中元素的类型
累加类型(ACC): 聚合过程中的中间结果类型
输出类型(OUT): 聚合的最终结果类型

需求:求最近10秒输入数字的平均值

代码示例:

package org.example.test.window;import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.47.130", 8888).map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String s) throws Exception {return Integer.parseInt(s);}});source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {System.out.println("创建累加器");return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer integer, Tuple2<Integer, Integer> accumulator) {System.out.println("调动add方法, value = " + integer);return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + integer);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {System.out.println("调用getResult方法");return  new Double((double)accumulator.f1 / accumulator.f0);}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> acc1) {System.out.println("调用merge方法");return null;}}).print("avg:");env.execute("WindowAggregateFunction");}
}

运行结果:
在这里插入图片描述

创建累加器
调动add方法, value = 10
调动add方法, value = 20
调用getResult方法
avg::8> 15.0

2. 聚合函数(ProcessWindowFunction)
ProcessWindowFunction获得一个包含窗口所有元素的Iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供更大的灵活性。窗口内的数据需要在内部缓冲,直到窗口触发时间到达,因此需要耗费更多的资源。

需求:求最近10秒输入数字的平均值

代码示例:

package org.example.test.window;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WindowProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.47.130", 8888).map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String s) throws Exception {return Integer.parseInt(s);}});source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunction<Integer, String, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Integer, String, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {int count = 0;long sum = 0L;for (Integer ele : elements) {count += 1;sum += ele;}long winodwStart = context.window().getStart();long windowEnd = context.window().getEnd();String windowStartStr = DateFormatUtils.format(winodwStart, "yyyy-MM-dd HH:mm:ss");String windowEndStr = DateFormatUtils.format(windowEnd, "yyyy-MM-dd HH:mm:ss");out.collect("窗口[" + windowStartStr + "," + windowEndStr + ")包含" + count + "条数据,平均值: " + ((double)sum/count));}}).print("process:");env.execute("WindowProcessFunction");}
}

执行结果:
在这里插入图片描述

process::8> 窗口[2025-01-15 11:14:50,2025-01-15 11:15:00)包含3条数据,平均值: 17.666666666666668

3. 增量聚合和全窗口函数的结合使用
在实际应用中,我们往往希望兼具增量聚合和全量聚合的优点,Flink的Window API就给我们实现了这样的用法。

需求:求最近10秒输入数字的总和,并输出窗口信息

代码示例:

package org.example.test.window;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WindowProcessFunctionWithIncrementalAggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.47.130", 8888).map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String s) throws Exception {return Integer.parseInt(s);}});source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).aggregate(new MyAggregateFunction(), new MyProcessFunction()).print("sum:");env.execute("WindowProcessFunctionWithIncrementalAggregation");}public static class MyAggregateFunction implements AggregateFunction<Integer, Integer, Integer> {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(Integer t1, Integer t2) {System.out.println("调用add方法, value = " + t1);return t1 + t2;}@Overridepublic Integer getResult(Integer integer) {System.out.println("调用getResult方法");return integer;}@Overridepublic Integer merge(Integer integer, Integer acc1) {System.out.println("调用merge方法");return 0;}}public static class MyProcessFunction extends ProcessAllWindowFunction<Integer, String, TimeWindow> {@Overridepublic void process(ProcessAllWindowFunction<Integer, String, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long winodwStart = context.window().getStart();long windowEnd = context.window().getEnd();String windowStartStr = DateFormatUtils.format(winodwStart, "yyyy-MM-dd HH:mm:ss");String windowEndStr = DateFormatUtils.format(windowEnd, "yyyy-MM-dd HH:mm:ss");long count = elements.spliterator().estimateSize();out.collect("窗口[" + windowStartStr + "," + windowEndStr + ")包含" + count + "条数据===>" + elements.toString());}}}

执行结果:
在这里插入图片描述

创建累加器
调用add方法, value = 10
调用add方法, value = 25
调用getResult方法
sum::4> 窗口[2025-01-15 11:29:10,2025-01-15 11:29:20)包含1条数据===>[35]

http://www.ppmy.cn/devtools/151015.html

相关文章

Python海龟绘图库:从入门到精通 - Python官方文档(三万字解析!)

turtle --- 海龟绘图 源码&#xff1a; Lib/turtle.py 概述 海龟绘图是对 最早在 Logo 中引入的受欢迎的几何绘图工具 的实现&#xff0c;它由 Wally Feurzeig, Seymour Papert 和 Cynthia Solomon 在 1967 年开发。 入门 请想象绘图区有一只机器海龟&#xff0c;起始位置在…

Flask-SQLAlchemy 基于一个base表 - 动态创建使用相同字段的其他业务表

1 安装 首先&#xff0c;确保您安装了 Flask 和 SQLAlchemy&#xff0c;以及 MySQL 的驱动程序&#xff08;例如 mysql-connector-python 或 PyMySQL&#xff09;&#xff1a; pip install Flask Flask-SQLAlchemy mysql-connector-python2 创建项目结构 创建一个简单的项目…

【数模学习笔记】插值算法和拟合算法

声明&#xff1a;以下笔记中的图片以及内容 均整理自“数学建模学习交流”清风老师的课程资料&#xff0c;仅用作学习交流使用 文章目录 插值算法定义三个类型插值举例插值多项式分段插值三角插值 一般插值多项式原理拉格朗日插值法龙格现象分段线性插值 牛顿插值法 Hermite埃尔…

java集合面试题

java集合面试题 java集合面试题java集合框架的基础接口有哪些&#xff1f;单列集合的基础接口List, Set, Map 是否继承自 Collection 接口List、Map、Set 三个接口&#xff0c;存取元素时&#xff0c;各有什么特点&#xff1f;你所知道的集合类都有哪些&#xff1f;主要方法&am…

(即插即用模块-Attention部分) 四十四、(ICIP 2022) HWA 半小波注意力

文章目录 1、Half Wavelet Attention2、代码实现 paper&#xff1a;HALFWAVELET ATTENTION ON M-NET FOR LOW-LIGHT IMAGE ENHANCEMENT Code&#xff1a;https://github.com/FanChiMao/HWMNet 1、Half Wavelet Attention 传统的图像增强方法主要关注图像在空间域的特征信息&am…

DFT可测性设置与Tetramax测试笔记

1 DFT 1.1 DFT类型 1、扫描链&#xff08;SCAN&#xff09;&#xff1a; 扫描路径法是一种针对时序电路芯片的DFT方案.其基本原理是时序电路可以模型化为一个组合电路网络和带触发器(Flip-Flop&#xff0c;简称FF)的时序电路网络的反馈。 Scan 包括两个步骤&#xff0c;scan…

蓝桥杯第二天学习笔记

二维码生成&#xff1a; import qrcode from PIL import Image, ImageDraw, ImageFont import osdef generate_custom_qr_code(data, qr_file_path, logo_file_pathNone, textNone):# 创建QRCode对象qr qrcode.QRCode(version1,error_correctionqrcode.constants.ERROR_CORRE…

Qt 各版本选择

嵌入式推荐用 Qt4.8&#xff0c;打包的程序小&#xff1a;Qt4.8.7是Qt4的终结版本&#xff0c;是Qt4系列版本中最稳定最经典的 最后支持xp系统的长期支持版本&#xff1a;Qt5.6.3&#xff1b;Qt5.7.0是最后支持xp系统的非长期支持版本。 最后提供mysql数据库插件的版本&#xf…