深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

news/2025/2/23 2:28:32/

Flink Window 常见需求背景

需求描述

每隔 5 秒,计算最近 10 秒单词出现的次数 —— 滑动窗口
每隔 5 秒,计算最近 5 秒单词出现的次数 —— 滚动窗口
在这里插入图片描述

关于 Flink time 种类 TimeCharacteristic

在这里插入图片描述

  • ProcessingTime
  • IngestionTime
  • EventTime

WindowAssigner 的子类

  • SlidingProcessingTimeWindows
  • SlidingEventTimeWindows
  • TumblingEventTimeWindows
  • TumblingProcessingTimeWindows

使用 EventTime + WaterMark 处理乱序数据

示意图:
在这里插入图片描述

  • 使用 onPeriodicEmit 方法发送 watermark,默认每 200ms 发一次。
  • 窗口起始时间默认按各个时区的整点时间,支持自定义 offset。

Flink Watermark 机制定义

有序的流的 Watermarks

在这里插入图片描述

无序的流的 Watermarks

在这里插入图片描述

多并行度流的 Watermarks

在这里插入图片描述

深入理解 Flink Watermark

Flink Window 触发的条件:

  1. watermark 时间 >= window_end_time
  2. 在 [window_start_time, window_end_time) 区间中有数据存在(注意是左闭右开的区间),而且是以 event time 来计算的

Flink 处理太过延迟数据

Flink 丢弃延迟太多的数据

企业生产中一般不用。

Flink 指定允许再次迟到的时间

治标不治本,企业生产中一般不用。

Flink 收集迟到的数据单独处理

企业生产中应用较为广泛。

Flink 多并行度 Watermark

一个 window 可能会接受到多个 waterMark,我们以最小的为准。
在这里插入图片描述

Flink Window 概述

官网介绍

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
在这里插入图片描述

Flink Window 分类

Flink 的 window 分为两种类型的 Window,分别是:Keyed Windows 和 Non-Keyed Windows,他们的使用方式不同:

// Keyed Windows 
stream.keyBy(...) <- keyed versus non-keyed windows.window(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"

Window 的生命周期

  1. 当属于某个窗口的第一个元素到达的时候,就会创建一个窗口。
  2. 当时间(event or processing time)超过 window 的结束时间戳加上用户指定的允许延迟(Allowed Lateness)时,窗口将被完全删除。
  3. 每个 Window 之上,都绑定有一个 Trigger 或者一个 Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用来执行窗口内数据的计算。
  4. 可以给 Window 指定一个 Evictor,它能够在 after the trigger fires 以及 before and/or after the function is applied 从窗口中删除元素。

Flink Window 类型

Flink 流批同一前后的 Window 分类:
在这里插入图片描述

tumblingwindows —— 滚动窗口

在这里插入图片描述

slidingwindows —— 滑动窗口

在这里插入图片描述

session windows —— 会话窗口

在这里插入图片描述

global windows —— 全局窗口

在这里插入图片描述

Flink Window 操作使用

高级玩法:自定义 Trigger、自定义 Evictor,读者可自行搜索相关文章与代码。

Flink Window 增量聚合

  • reduce(ReduceFunction)
  • aggregate(AggregateFunction)
  • sum()
  • min()
  • max()
  • sum()

Flink Window 全量聚合

  • apply(WindowFunction)
  • process(ProcessWindowFunction)

Flink Window Join

// 在 Flink 中对两个 DataStream 做 Join
// 1、指定两张表
// 2、指定这两张表的链接字段
stream.join(otherStream) // 两个流进行关联.where(<KeySelector>) // 选择第一个流的key作为关联字段.equalTo(<KeySelector>) // 选择第二个流的key作为关联字段.window(<WindowAssigner>) // 设置窗口的类型.apply(<JoinFunction>) // 对结果做操作 process apply = foreach

Tumbling Window Join

在这里插入图片描述

Sliding Window Join

在这里插入图片描述

Session Window Join

在这里插入图片描述

Interval Join

在这里插入图片描述
核心代码示例:

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

http://www.ppmy.cn/news/1299253.html

相关文章

基于ChatGPT4+Python近红外光谱数据分析及机器学习与深度学习建模

2022年11月30日&#xff0c;可能将成为一个改变人类历史的日子——美国人工智能开发机构OpenAI推出了聊天机器人ChatGPT3.5&#xff0c;将人工智能的发展推向了一个新的高度。2023年4月&#xff0c;更强版本的ChatGPT4.0上线&#xff0c;文本、语音、图像等多模态交互方式使其在…

RTX20系开启超分辨率

我的显卡是2070s现在也支持了超分辨率,根据网上的教程一通折腾后发现了不少的坑,记一下.希望有缘人可以少走点弯路. 机器配置如下: [CPU] CPU #1: 3600MHz, AMD Ryzen 7 3700X 8-Core Processor , AMD64 Family 23 Model 113 Stepping Instruction set: MMX MMX…

如何在没有密码的情况下将 iPhone 13/14/15 恢复出厂设置

您想知道如何在没有密码的情况下将 iPhone 13/14/15 恢复出厂设置吗&#xff1f; 出厂重置 iPhone 13/14/15 成为所有 iPhone 机型中最简单的。大多数情况下&#xff0c;iPhone 13/14/15 是在 iOS 15 或更高版本的 iOS 版本上&#xff0c;Apple 更新了无需密码重置 iPhone 13/…

【开源】基于JAVA、微信小程序的音乐平台

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示 四、核心代码4.1 查询单首音乐4.2 新增音乐4.3 新增音乐订单4.4 查询音乐订单4.5 新增音乐收藏 五、免责说明 一、摘要 1.1 项目介绍 基于微信小程序JAVAVueSpringBootMySQL的音乐平台&#xff0c;包含了音乐…

Java程序员面试-场景篇

前言 裁员增效潮滚滚而来&#xff0c;特总结一些实际场景方案的面试题&#xff0c;希望对大家找工作有一些帮助。 注册中心 题目&#xff1a; 有三台机器&#xff0c;分别部署了微服务A、微服务B、注册中心&#xff0c;其中A和B都有服务接口提供并正常注册到了注册中心&…

hadoop自动获取时间

1、自动获取前15分钟 substr(from_unixtime(unix_timestamp(concat(substr(20240107100000,1,4),-,substr(20240107100000,5,2),-,substr(20240107100000,7,2), ,substr(20240107100000,9,2),:,substr(20240107100000,11,2),:,00))-15*60,yyyyMMddHHmmss),1) unix_timestam…

代码随想录day1:①二分查找 ②删除数组中的元素

一、二分查找 两种写法 1、维护[left&#xff0c;right] class Solution { public:int search(vector<int>& nums, int target) {int left0;int rightnums.size()-1;while(left<right) // 维护[left,right]{int mid(leftright)/2;if(nums[mid]>target) righ…

【angular教程240108】05 js的DOM与 @ViewChild获取DOM节点和子组件的实例、调用css3动画(侧边栏 actionSheet)

Angular中的Dom操作以及ViewChild、Angular调用css3动画(侧边栏 actionSheet) 0Angular中的Dom操作以及ViewChild、Angular调用css3动画(侧边栏 actionSheet) 〇、复习 JavaScript 的 dom是什么 一、Angular中通过原生JS获取Dom 生命周期函数介绍 二、Angular 中通过 ViewChil…