Flink SQL中窗口和水印触发机制

news/2024/12/29 18:39:59/

下面是一个使用窗口的例子,按说明写入了2条数据,各个窗口的开始和结束时间规则,以及水印的使用,代码如下:

-- 第一条数据09:00:25,第二条数据09:01:10
CREATE TEMPORARY TABLE kafka_test_report (`my_id` VARCHAR(64),`event_time` VARCHAR(20),`my_num` BIGINT,`ts` AS TO_TIMESTAMP(event_time),WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH ('connector' = 'kafka',...
);CREATE TEMPORARY TABLE print_sink (`window_start` TIMESTAMP(3),`window_end` TIMESTAMP(3),`my_id` VARCHAR(64),`my_value` BIGINT
) WITH ('connector' = 'print'
);BEGIN STATEMENT SET;-- 滚动窗口,09:00:00作为窗口开始时间
INSERT INTO print_sink
SELECTTUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end,my_id,COUNT(1) AS my_num
FROM kafka_test_report
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),my_id;-- 滑动窗口,首个更新窗口08:51:00作为开始时间,09:01:00作为结束时间
INSERT INTO print_sink
SELECTHOP_START (ts, INTERVAL '60' SECOND, INTERVAL '10' MINUTE) AS window_start,HOP_END (ts, INTERVAL '60' SECOND, INTERVAL '10' MINUTE) AS window_end,my_id,SUM(my_num) AS my_num
FROM kafka_test_report
GROUP BY HOP(ts, INTERVAL '60' SECOND, INTERVAL '10' MINUTE),my_id;-- 累计窗口,首个更新窗口09:00:00->09:00:40
INSERT INTO print_sink
SELECTwindow_start,window_end,'101' AS my_id,SUM(my_num) AS my_num
FROM TABLE(CUMULATE(TABLE kafka_test_report,DESCRIPTOR(ts), -- 时间戳INTERVAL '40' SECOND,INTERVAL '1' HOUR
))
GROUP BY window_start,window_end;END;

数据源为Kafka,有2个分区(并发),有1个分区一直没有数据,会导致无法触发窗口结束,导致结果数据异常。解决办法为:

  • 根据数据乱序的程度设置合理的offset大小,并保证所有并发都有数据。
  • 如果某个源表并发或源表上游partition因没有数据导致窗口始终无法被触发,可在Flink配置中添加table.exec.source.idle-timeout: s来触发窗口结束。table.exec.source.idle-timeout表示某个并发来源如果在指定时间内没有数据进来则临时设为空闲,此时下游任务无需等待该空闲并发源去增长他们的watermark,默认为0表示空闲并发源是禁用的。

综上,Flink解决一个分区没有数据无法触发窗口结束的方式是设置一个过期时间告诉Flink系统这个Partition没数据了,那么计算Watermark的时候就不考虑他了,等他有数据再把他列入计算Watermark的范畴。但如果每个分区都没有数据,还是无法更新watermark,即无法触发下游。此时结束标识数据可使用Sink的RDS延迟输出参数的方式配置。


http://www.ppmy.cn/news/667499.html

相关文章

pytorch 实现a3c算法

Asynchronous Advantage Actor-Critic 主要学习资源来自莫烦: github连接 等有时间更新了, 发一下自己团队的项目实现的部分a3c代码

奇瑞a3中控按键图解_奇瑞A3空调三个键中间键是如何使用?

展开全部 奇瑞A3空调e68a843231313335323631343130323136353331333431363665三个键中间键是用于调节风量的。发动机上的空气过滤器是为了过滤发动机运转所需要的空气,在汽油发动机上都有此装置,在玩车人看来,原车所安装的空气过滤器似有进气量…

激光雷达在机器人中的避障方案

如今,在各种商用场景中服务机器人已屡见不鲜,对于一些在餐厅、酒店等地的服务机器人来说,往往会面临应用环境复杂多变的情况,这就对机器人的避障能力提出了很大的挑战,避障是指移动机器人根据采集的障碍物的状态信息&a…

[A3C]:算法原理详解

强化学习: A3C算法原理 深度强化学习框架使用异步梯度下降来优化深度神经网络控制器。提出了四种标准强化学习算法的异步变体,并证明并行actor-learners在训练中具有稳定作用,使得四种方法都能成功地训练神经网络控制器。首先明确什么是A3C?…

激光雷达RPLIDAR A1使用教程

激光雷达RPLIDAR A1使用教程 一.雷达硬件连接 1.A1雷达包含组件 RPLIDAR A1开发套装包含了如下组件: o RPLIDAR A1模组(内置 PWM电机驱动器) o USB适配器 o RPLIDAR A1模组通讯排线 注意:另需自备 USB线缆用于连接。 2.A1雷达…

DJI A3飞控

A3飞行控制系统 集智可靠 创造非凡 为行业应用与专业级航拍打造 技术参数 1、主要模块 模块尺寸主控64mm x 42mm x 19.5mmPMU51 mm x 34mm x 13.5 mmIMU34mm x 26.5mm x 20mmGPS-Compass Pro61mm(直径) x 13mmLED27mm x 27mm x 8mm IMU:惯性测量单元&#xff…

STM32c8t6驱动激光雷达(一)

思岚A1激光雷达 前言 先来了解激光雷达 RPLIDAR A1M8 360 度激光扫描测距雷达是由 SLAMTEC 公司开发的低成本二维激光雷达(LIDAR)解决方案。它可以实现在二维平面的 12 米半径范围内进行 360度全方位的激光测距扫描,并产生所在空间的平面点云地图信息。这些云地图…

思岚激光雷达+cartographer建图

系统环境: Ubuntu18.04 ROS Melodic gcc 7.5.0 1.安装思岚ROS包 1.1 clone并编译 cd catkin_ws/src/ git clone https://github.com/Slamtec/rplidar_ros.git cd .. catkin_make1.2 修改rplidar_ros/launch/rplidar.launch中的波特率 1.3 试一下效果 source …