方式 | 作用 |
---|---|
WatermarkStrategy.noWatermarks() | 不生成watermark |
WatermarkStrategy.forMonotonousTimestamps() | 紧跟最大事件时间watermark生成策略 |
WatermarkStrategy.forBoundedOutOfOrderness() | 允许乱序watermark生成策略 |
WatermarkStrategy.forGenerator() | 自定义watermark生成策略 |
- noWatermarks
关于public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();} }
noWaterMarks()
的使用没有太多内容. - forMonotonousTimestamps
对于public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();} }
forMonotonousTimestamps()
可说内容并不多,如果选择了forMonotonousTimestamps
这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
通过源码内容可以看到forMonotonousTimestamps
底层也是使用的forBoundedOutOfOrderness
方式,只不过将容错时间设置为了0
,源码如下:// 首先看这里,继承的BoundedOutOfOrdernessWatermarks public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0} }
- forBoundedOutOfOrderness
对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();} }
2S
,那么前后的数据差最大只能是2S
,如果差值大于2S
,后来的这条数据就会被抛弃.