Flink滑动窗口(Sliding)中window和windowAll的区别

news/2024/11/14 1:41:24/

滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。

窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据

滑动窗口(window)

比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。

代码展示:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4Window {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS//每隔5秒计算最近15秒的数据.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}

滑动窗口(windowAll) 

将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加

 代码展示:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4WindowAll {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS//每隔5秒计算最近15秒的数据//windowAll:将同一个窗口的数据发一起进行计算.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowAllDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}


http://www.ppmy.cn/news/1545942.html

相关文章

软考系统架构设计师论文:云上自动化运维及其应用

论文四 云上自动化运维及其应用 1简要说明你参与开发的软件项目,以及你所承担的主要工作 2概要叙述云上自动化运维(如CloudOps)的主要衡量指标 3详细描述你参与开发的项目如何实现云上自动化运维。 论文四:云上自动化运维及其应用 简要说明你参与开发的软件项目,以及你所…

排序算法详细总结

算法 定义&#xff1a;算法是解决特定问题的明确步骤集合。算法的效率通常用时间复杂度和空间复杂度来衡量。 排序算法 定义&#xff1a;排序算法是计算机科学中用于对元素序列进行排序的一系列算法。排序算法在各种应用中都非常常见&#xff0c;从简单的数据处理到复杂的数…

软件工程 软考

开发大型软件系统适用螺旋模型或者RUP模型 螺旋模型强调了风险分析&#xff0c;特别适用于庞大而复杂的、高风险的管理信息系统的开发。喷泉模型是一种以用户需求为动力&#xff0c;以对象为为驱动的模型&#xff0c;主要用于描述面向对象的软件开发过程。该模型的各个阶段没有…

网络安全应急响应(归纳)

目录 一、概述二、理论 系统排查 系统基本信息 windowsLinux用户信息 WindowsLinux启动项&#xff1a;开机系统在前台或者后台运行的程序&#xff0c;是病毒等实现持久化驻留的常用方法。 WindowsLinux任务计划&#xff1a;由于很多计算机都会自动加载“任务计划”&#xff0c…

线上问题的排查之频繁FullGC问题如何排查

0 详细问题案例 假设我们有一个大型电子商务网站&#xff0c;最近用户反馈系统响应变慢。运维团队发现服务器频繁出现FullGC&#xff0c;严重影响性能。 1.收集日志 首先&#xff0c;我们需要开启详细的GC日志。在JVM参数中添加。 2. 分析GC日志 使用工具(如GCViewer)分析GC…

NoSQL大数据存储技术测试(1)绪论

写在前面&#xff1a;未完成测试的同学&#xff0c;请先完成测试&#xff0c;此博文供大家复习使用&#xff0c;&#xff08;我的答案&#xff09;均为正确答案&#xff0c;大家可以放心复习 单项选择题 第1题 以下不属于云计算部署模型的是&#xff08; &#xff09; 公…

Cesium实现雷达效果

目录 项目地址实现效果核心代码 项目地址 https://github.com/zhengjie9510/webgis-demo 实现效果 核心代码 var radar scene.primitives.add(new Primitive({geometryInstances: redCone,appearance: new MaterialAppearance({// 自定义纹理material: new Material({fabri…

计算机课程管理:Spring Boot与工程认证的整合之道

3系统分析 3.1可行性分析 通过对本基于工程教育认证的计算机课程管理平台实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于工程教育认证的计算机课程管理平…