Flink解决延迟数据问题

devtools/2024/11/27 13:27:29/

总结:

水印:对于迟到数据不长

allowedLateness: 迟到时间很长

侧道输出:对于迟到时间特别长

对于延迟数据的理解:

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。

这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口

这个等待不能一直等,因为会一直缓着数据不计算。

一般水印也就是几秒钟最多几分钟而已(看业务)

那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。

比如:

l 客户端断网,等了好几个小时才恢复

l 车联网系统进入隧道后没有信号无法上报数据

l 手机欠费没有网

等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了

而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理的。

那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

水印:乱序数据处理(时间很短的延迟)

延迟处理:长期延迟数据的处理机制。

延迟数据的处理:

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法

该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。

和watermark不同。

未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭

当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭

也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。

但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)

水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。

2) 侧道输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。

处理主要使用两个方式:

对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方

对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream

注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。

sideOutputLateData方法:


使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =allowedLateness.sideOutputLateData(outputTag);

DataStream.getSideOutput方法:

用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");

代码演示

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!

package com.bigdata.day05;import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;class MyOrderSource2 implements SourceFunction<OrderInfo> {@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {Random random = new Random();while(true){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));// 在这个地方可以模拟迟到数据long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);orderInfo.setOrdertime(orderTime);int money = random.nextInt(10);System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);orderInfo.setMoney(money);orderInfo.setUserId(random.nextInt(2));ctx.collect(orderInfo);Thread.sleep(500);}}@Overridepublic void cancel() {}
}
public class Demo01 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 每隔五秒统计每个用户的前面5秒的订单的总金额//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());//-2.告诉Flink最大允许的延迟时间/乱序时间为多少SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))//-3.告诉Flink哪一列是事件时间.withTimestampAssigner((order, time) -> order.getOrdertime()));OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};//3. transformation-数据处理转换SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(4)).sideOutputLateData(outputTag).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key,  // 代表分组key值     五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out  // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;// 专门存放迟到的订单时间for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}});result.print("流中的数据,包含迟到的数据:");result.getSideOutput(outputTag).print("严重迟到的数据:");//4. sink-数据输出//5. execute-执行env.execute();}
}

虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。

只考虑 1 个并行度的问题

订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒

10:44:00 第一个区间就应该是10:44:00 10:44:05

10:44:01

10:44:02

10:44:03

10:44:04

10:44:05

10:44:07 第一个区间就应该是10:44:05 10:44:10

10:44:22 第一个区间就应该是10:44:20 10:44:25

10:44:30

10:44:28

10:44:20

通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10

44:22 一个数据触发了两个区间的执行 00~05 05~10

假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间

 


http://www.ppmy.cn/devtools/137398.html

相关文章

SpringBoot(四十)SpringBoot集成RabbitMQ使用过期时间+死信队列实现延迟队列

前边我们使用RabbitMQ实现了高并发下对流量的削峰填谷。正常在实际应用中大概也就够用了。 有的时候呢&#xff0c;我们需要使用到延迟队列&#xff0c;RabbitMQ不像RocketMQ一样默认就支持延迟队列&#xff0c;RabbitMQ是不支持延迟队列的&#xff0c;但是呢&#xff1f;我们可…

Android8设置拔出充电器自动关机

通常Android机器拔出充电后&#xff0c;将进入断开充电流程&#xff0c;关闭充电灯和充电图标。 那么需要实现拔出充电器直接进入关机&#xff0c;则需要在充电判断机制中额外增加实现代码。 || || 修改方案如下&#xff1a; 在系统中存在服务时刻监听的充电状态&#xff…

24.11.23 Ajax

1动态网页技术与静态网页技术对比: 静态网页: 如果数据库中有用户列表 html中要显示 如果用户列表数据变化 html要改代码才能显示完整数据 (不能使用动态数据 ) 动态网页: servlet可以通过代码 以输出流显示数据 当数据库数据改变时 不需要改代码 2.为了解决html不能使用动…

Cocos编辑器

1、下载 下载地址&#xff1a;https://www.cocos.com/creator-download 2、编辑器界面介绍 官方链接&#xff1a;https://docs.cocos.com/creator/3.8/manual/zh/editor/ 3、项目结构 官方链接&#xff1a;https://docs.cocos.com/creator/3.8/manual/zh/getting-started/…

冷却小型电子设备

TLDR&#xff1a;间隙较小&#xff08;~1 毫米&#xff09;的设备可以进行基于传导的热模拟。 虚构的 ANSYS 腕带上的温度。 这些小玩意儿很酷&#xff0c;而且卖得很热。 小型设备的一个长期问题是它们容易发热&#xff0c;而且很难散热。大多数设备需要通过外表面散热。当你…

C# 程序来计算三角形的面积(Program to find area of a triangle)

给定一个三角形的边&#xff0c;任务是求出该三角形的面积。 例如&#xff1a; 输入&#xff1a;a 5, b 7, c 8 输出&#xff1a;三角形面积为 17.320508 输入&#xff1a;a 3, b 4, c 5 输出&#xff1a;三角形面积为 6.000000 方法&#xff1a;可以使用以下公式…

html+css+js打字游戏网页

1. 效果 2. html代码 <!doctype html> <html><head><meta charset"utf-8" /><title>打字练习</title><!--引入第三方动画库--><link rel"stylesheet" href"animate.css"><style>html {h…

网站布局编辑器前端开发:设计要点与关键考量

一、设计说明 &#xff08;一&#xff09;功能模块 可视化操作区域 这是用户进行网站布局设计的主要画布。通过拖放各种页面元素&#xff08;如文本框、图片、按钮、导航栏等&#xff09;到该区域&#xff0c;用户能够直观地构建网站页面的布局结构。支持对元素的实时缩放、旋…