Flink消费kafka出现空指针异常

news/2025/1/16 1:43:10/

文章目录

      • 出现场景:
      • 表现:
      • 问题:
      • 解决:

tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("").setTopics("test").setGroupId("gid").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new MySimpleStringSchema()).setProperty("auto.offset.commit", "false").build();DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");kfkDs.print();env.execute();}// 自定义反序列化器static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{@Overridepublic String deserialize(byte[] message) {if (message != null) return new String(message, StandardCharsets.UTF_8);else{return deserialize(new byte[1]); // 返回空 不是Null}}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic byte[] serialize(String element) {return element.getBytes(StandardCharsets.UTF_8);}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}

http://www.ppmy.cn/news/663526.html

相关文章

查看p10文件内容

1.通过openssl查看 [rootahdjg ~]#openssl req -in cerseq.csr -text #cerseq.csr存放的是p10的base64格式数据 unable to load X509 request 140239849273232:error:0906D06C:PEM routines:PEM_read_bio:no start line:pem_lib.c:707:Expecting: CERTIFICATE REQUEST 但是…

mac 配置 gradle 、gradle-wrapper.properties文件 distributionUrl 指定本地gradle.zip

mac配置 gradle https://blog.csdn.net/RreamigOfGirls/article/details/126300196 gradle-wrapper.properties 文件&#xff1a; distributionUrl 指定本地gradle.zip 注意 如果是从官网下载&#xff0c;用的是https #Thu May 10 21:25:29 CST 2018 distributionBaseGRADLE…

Pyside6-QtCharts+psutil实战-绘制一个CPU监测工具

今天是实战篇章&#xff0c;我们结合可以快速提升我们开发效率的工具一起开实战一波实时读取系统CPU使用情况的折线图。 使用的开发工具Qt Designer来开发UI界面。 十分便捷。使用起来也算比较的简单了&#xff0c;虽然也存在不少的BUG。 ❝ 对所需要的控件进行拖拽式&#xff…

最强神作!Crysis深度剖析与优化指南(18-25)

http://bak2.beareyes.com.cn/2/lib/200711/06/20071106421_18.htm 第18页&#xff1a;无限创造力&#xff1a;Sandbox 2编辑器概览 1 集成的CryENGINE Sandbox2编辑器 运行时间引擎是完全集成在CryENGINE Sandbox2编辑器给了设计者“所见既所玩”特性。 开发者在开发过程中可以…

代理服务器之 squid、lvs、nginx、haproxy之间的区别

代理服务器之 squid、lvs、nginx、haproxy之间的区别 代理服务可简单的分为正向代理和反向代理 1、正向代理 正向代理服务器&#xff1a;squid 用于代理内部网络对 Internet 的连接请求(如 VPN/NAT)&#xff0c;客户端指定代理服务器,并将本来要直接发送给目标 Web 服务器的 HT…

Stage模型HarmonyOS服务卡片开发ArkTS卡片相关模块

图1 ArkTS卡片相关模块 FormExtensionAbility&#xff1a;卡片扩展模块&#xff0c;提供卡片创建、销毁、刷新等生命周期回调。 FormExtensionContext&#xff1a;FormExtensionAbility的上下文环境&#xff0c;提供FormExtensionAbility具有的接口和能力。 formProvider&…

如何在网上挣钱,这几个项目让你月入过万元

在我国互联网高速发达的时代&#xff0c;上网也已经非常普及。互联网给我们的日常生活带了了无比的便利&#xff0c;也为很多创业者提供了不少的创业机会。互联网可以让你的产品得到更为广阔的销售市场&#xff0c;也可以让你在网络上购买到你所需要的产品。那么如何在网上挣钱…

程序员如何快速赚钱

开发高质量的软件产品&#xff1a; 这是一个长期而又有前途的途径&#xff0c;程序员可以利用自己的专业技能和经验&#xff0c;开发出具有竞争力的软件产品&#xff0c;并通过在线市场或直接销售的方式&#xff0c;获得相应的收入。需要注意的是&#xff0c;开发出高质量的产品…