Flink实时开发添加水印的案例分析

devtools/2024/10/16 2:25:36/

在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:

修改前的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. unitDS 经过一个 map 操作,将每个元素的 deviceCode 转换为系统设备码。
  2. 使用 keyBy(_.getDeviceCode) 对转换后的设备码进行分组。
  3. 定义了一个基于事件时间的滚动窗口,窗口大小为60秒。
  4. 使用 process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。

注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。

修改后的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.assignTimestampsAndWatermarks(WatermarkStrategy.<boundedOutOfOrdernessDaysPower>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower.withIdleness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = {Math.max(element.getEventTime, recordTimestamp)}})
).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. 与修改前相同的部分:mapkeyBy, 和 window 操作。
  2. 添加了 assignTimestampsAndWatermarks 方法来处理事件时间和水印:
    • 使用 WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据(这里是5秒)。
    • .withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒,用于处理不活跃的键。
    • 使用 withTimestampAssigner 自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。

不同点和适用场景:

  • 事件时间和水印处理:修改后的代码显式地处理了事件时间和水印,这对于处理乱序数据、延迟数据以及确保正确的时间窗口计算是非常重要的。如果您的数据流中存在乱序或延迟数据,或者您希望更严格地保证处理时间窗口的正确性,那么应该使用修改后的代码。
  • 空闲超时:通过设置空闲超时,可以处理那些长时间不活跃的键,避免因为某些键长时间没有新数据而导致整个程序挂起。
  • 延迟数据处理:如果数据有可能晚到,但仍然需要被纳入正确的窗口进行计算,水印可以帮助界定数据的“迟到”界限。
    精确的时间窗口分析:对于需要基于事件实际发生时间而非数据处理时间进行分析的场景,如实时监控、金融交易分析等,事件时间模型是必须的。

http://www.ppmy.cn/devtools/58747.html

相关文章

基于STM32设计的药品柜温湿度监测系统(华为云IOT)(184)

基于STM32设计的药品柜温湿度监测系统(华为云IOT)(184) 文章目录 一、前言1.1 项目介绍【1】项目功能介绍【2】整体需求总结【3】项目硬件模块组成1.2 设计思路【1】整体设计思路【2】ESP8266工作模式配置【3】华为云IOT手机APP界面开发思路1.3 项目开发背景【1】选题的意义【2…

eNSP:防火墙设置模拟公司配置(二)

实验拓扑&#xff1a; 实验要求&#xff08;二&#xff09;&#xff1a; 7&#xff1a; 办公设备可以通过电信连接和移动上网&#xff08;多对多NAT&#xff0c;并且需要保留一个公网IP&#xff09; 8&#xff1a; 分公司通过公网移动电信&#xff0c;访问DMZ的http服务器 9&a…

虚幻引擎ue5如何调节物体锚点

当发现锚点不在物体上时&#xff0c;如何调节瞄点在物体上。 步骤1&#xff1a;按住鼠标中键拖动锚点&#xff0c;在透视图中多次调节锚点位置。 步骤2:在物体上点击鼠标右键点击-》锚定--》“设置为枢轴偏移”即可。

C++进阶:继承和多态

文章目录 ❤️继承&#x1fa77;继承与友元&#x1f9e1;继承和静态成员&#x1f49b;菱形继承及菱形虚拟继承&#x1f49a;继承和组合 ❤️多态&#x1fa77;什么是多态&#xff1f;&#x1f9e1;多态的定义以及实现&#x1f49b;虚函数&#x1f49a;虚函数的重写&#x1f499…

【C++进阶学习】第六弹——set和map——体会用C++来构建二叉搜索树

set和map基础&#xff1a;【C进阶学习】第五弹——二叉搜索树——二叉树进阶及set和map的铺垫-CSDN博客 前言&#xff1a; 在上篇的学习中&#xff0c;我们已经学习了如何使用C语言来实现二叉搜索树&#xff0c;在C中&#xff0c;我们是有现成的封装好的类模板来实现二叉搜索树…

设计模式使用场景实现示例及优缺点(结构型模式——组合模式)

结构型模式 组合模式&#xff08;Composite Pattern&#xff09; 组合模式使得用户对单个对象和组合对象的使用具有一致性。 有时候又叫做部分-整体模式&#xff0c;它使我们树型结构的问题中&#xff0c;模糊了简单元素和复杂元素的概念&#xff0c;客户程序可以像处理简单元…

传输层协议之UDP

1、端口号 我们在应用层创建的套接字&#xff0c;是需要通过bind()接口绑定我们的IP地址与端口号的&#xff0c;这是因为数据从传输层向上交付到应用层时&#xff0c;需要用端口号来查找特定的服务进程。一般在网络通信时&#xff0c;用IP地址标识一台主机&#xff0c;用端口号…

react学习——29react之useState使用

useState 是 React Hooks 中的一个重要函数&#xff0c;它用于在函数组件中添加状态。在类组件中&#xff0c;我们通常使用 this.state 和 this.setState 来管理组件的状态&#xff0c;而在函数组件中&#xff0c;我们可以使用 useState 来达到同样的目的。 1、导入 useState&…