Flink之Watermark策略代码模板

news/2024/11/8 17:14:03/
方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.

http://www.ppmy.cn/news/1140148.html

相关文章

卷积神经网络的发展历史-ResNet

ResNet的产生 2015 年&#xff0c;Kaiming He 提出了ResNet&#xff08;拿到了 2016 年 CVPR Best Paper Award&#xff09;&#xff0c;不仅解决了神经网络中的退化问题还在同年的ILSVRC和COCO 竞赛横扫竞争对手&#xff0c;分别拿下分类、定位、检测、分割任务的第一名。 R…

计算机视觉: 基于隐式BRDF自编码器的文生三维技术

论文链接: MATLABER: Material-Aware Text-to-3D via LAtent BRDF auto-EncodeR 背景 得益扩散模型和大量的text - image 成对的图片&#xff0c; 现在文生2D的模型已经比较成熟的框架和模型&#xff0c;主流的技术比如说stable diffusion 和 midjourney 以及工业领域runway 等…

字符串思维题练习 DAY5(CF1137 B , CF 733D , CF 1360 F)

字符串思维题练习 DAY5(CF1137 B , CF 733D , CF 1360 F) CF 1137 B. Camp Schedule(border) Problem - B - Codeforces 大意&#xff1a;给出一个字符串 S 和一个字符串 T &#xff0c; 要求重排 S 使得在 S中和 T 相等的子串出现次数最多。 不妨先考虑最暴力的重排方式 &…

乌班图22.04 kubeadm简单搭建k8s集群

1. 我遇到的问题 任何部署类问题实际上对于萌新来说都不算简单&#xff0c;因为没有经验&#xff0c;这里我简单将部署的步骤和想法给大家讲述一下 2. 简单安装步骤 准备 3台标准安装的乌班图server22.04&#xff08;采用vm虚拟机安装&#xff0c;ip为192.168.50.3&#xff0…

Centos 服务器 MySQL 8.0 快速开启远程访问

环境&#xff1a; MySQL 8.0&#xff08;低版本会有些不同&#xff09;&#xff0c; Rocky Linux 9.0&#xff08;CentOS&#xff09; 直接上干货&#xff0c;相信大家看到这个文章的时候都已经安装完了。 1. 先从服务器上使用 root 进行登录&#xff08;刚安装完默认只能本地…

(论文调研) Multi-task的网络结构 在图像去噪问题中的应用

1.SNIDER: Single Noisy Image Denoising and Rectification for Improving License Plate Recognition 这是一篇用于实现端到端的车牌恢复 (LPR: License Plate Recognition) 网络, 其中使用去噪和校正网络来生成清晰的恢复图像, 以实现稳健的 LPR 性能. 这个网络的名称为SN…

安装配置deep learning开发环境

1. 下载安装anacondahttps://www.anaconda.com/download-success vim ~/.condarcchannels: - bioconda - https://mirrors.ustc.edu.cn/anaconda/pkgs/main/ - https://mirrors.ustc.edu.cn/anaconda/cloud/conda-forge/ - https://mirrors.tuna.tsinghua.edu.cn/anaco…

高级 I/O【Linux】

阅读前导&#xff1a; “高级 I/O”处于知识树中网络和操作系统的最后&#xff0c;因此本文默认读者有计算机网络和操作系统的基础。 1. 什么是 I/O 下面以“流”&#xff08;stream&#xff09;和冯诺依曼体系架构的视角来简单回顾一下什么是 I/O&#xff1a; I/O可以理解…