文章目录
- 出现场景:
- 表现:
- 问题:
- 解决:
tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.
出现场景:
双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。
表现:
问题:
此时消费该topic的flink程序会出现,空指针异常
DataStream Api会出现,Table Api 未发现
解决:
自定义kafka反序列化器过滤Null值,flink1.14.4
代码:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("").setTopics("test").setGroupId("gid").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new MySimpleStringSchema()).setProperty("auto.offset.commit", "false").build();DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");kfkDs.print();env.execute();}// 自定义反序列化器static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{@Overridepublic String deserialize(byte[] message) {if (message != null) return new String(message, StandardCharsets.UTF_8);else{return deserialize(new byte[1]); // 返回空 不是Null}}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic byte[] serialize(String element) {return element.getBytes(StandardCharsets.UTF_8);}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}