数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)

news/2024/10/26 13:22:32/

数据湖Iceberg-简介(1)
数据湖Iceberg-存储结构(2)
数据湖Iceberg-Hive集成Iceberg(3)
数据湖Iceberg-SparkSQL集成(4)
数据湖Iceberg-FlinkSQL集成(5)
数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
数据湖Iceberg-Flink DataFrame集成(7)

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入版本问题问题原因解决方法

版本

Iceberg:1.1.0

Flink:1.14.3

问题

Kafka类型的Iceberg表创建完成后,通过语句写入其他表中执行成功,但是没数据

问题原因

当前版本的BUG(存疑)

解决方法

Kafka表必须要在default_catalog.default_database下,即catalog名为default_catalog,数据库(命名空间)为default_database下,否则kafka类型的表读取不到数据。

如果都在我们自己创建的catalog下创建,则执行INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;后,在Flink任务中看不到一个持续执行的Flink Job,而正常执行该命令Flink会执行一个持续执行的任务,去消费kafka数据写入Iceberg,正常情况如下图:

在这里插入图片描述

所以这里我们kafka表在default_catalog.default_database下,写入数据的表在我们自己创建的hadoop_catalog.iceberg_db

create table default_catalog.default_database.kafka1(id int,data string
) with ('connector' = 'kafka','topic' = 'ttt','properties.zookeeper.connect' = '172.16.24.194:2181','properties.bootstrap.servers' = '172.16.24.194:9092','format' = 'json','properties.group.id'='iceberg1','scan.startup.mode'='earliest-offset'
);CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample6` (`id`  INT UNIQUE COMMENT 'unique id',`data` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;

此时我们往Kafka发送数据:

{"id":123,"data":"llalalala"}
{"id":1123,"data":"asdfasfds"}

查看表中数据可以看到写入成功

select * from hadoop_catalog.iceberg_db.sample6;

在这里插入图片描述

再次发送数据

{"id":123,"data":"JastData"}

查看表中数据,发现修改成功

在这里插入图片描述


http://www.ppmy.cn/news/50787.html

相关文章

数字中国建设2522整体框架

作为影响中国未来发展的重磅文件,《数字中国建设整体布局规划》明确了两个重要时间节点: 到 2025 年,基本形成横向打通、纵向贯通、协调有力的一体化推进格局,数字中国建设取得重要进展; 到 2035 年,数字化…

中医诊所一定要去尝试软文营销,效果简直不要太好

中医诊所是一种传统的医疗机构,随着互联网时代的发展,软文营销已经成为了中医诊所宣传推广的一种重要方式。通过撰写高质量的软文,中医诊所可以提升品牌知名度、增加患者数量、提高医疗服务质量等方面取得良好的效果。今天结合我10年营销经验…

国产数字温度传感芯片M117 Pin to Pin替代PT100和PT1000

高精度数字温度传感芯片 - M117,可Pin to Pin替代PT100/PT1000,且具功能差异化优势,支持行业应用的定制化需求。高测温精度0.1℃,用户无需进行校准。芯片感温原理基于CMOS半导体PN节温度与带隙电压的特性关系,经过小信…

【历史上的今天】3 月 24 日:苹果推出 Mac OS X;微软前任 CEO 出生;Spring 1.0 正式发布

整理 | 王启隆 透过「历史上的今天」,从过去看未来,从现在亦可以改变未来。 今天是 2023 年 3 月 24 日,在 2016 年的今天,暴雪娱乐公司发布了第一人称射击多人游戏《守望先锋》。根据评分汇总网站 Metacritic 的统计&#xff0c…

设计模式 - 责任链模式

设计模式 - 责任链模式 1、责任链模式的应用1.1、啥是责任链模式1.2、责任链模式的优缺点 2、责任链模式小试牛刀2.1、实现场景描述2.2、常规实现2.3、责任链模式实现2.3.1、请求方2.3.2、处理方2.3.3、业务方法2.3.4、执行结果 3、总结 1、责任链模式的应用 1.1、啥是责任链模…

RocketMQ的学习历程(二)----MQ基本构架

文章目录 1.MQ的基本要素1.1.消息(Message)1.2.主题(Topic)1.3.标签(Tag)1.4.队列(MessageQueue)1.5.消息标识(MessageId) 2.MQ中的主要角色和相关联系2.1.Pr…

医院影像图像科室工作站PACS系统 DICOM 三维图像后处理与重建

PACS报告系统的主要任务是通过运用不断积累诊断常用语,减轻出报告的劳动强度,并且将报告保存成电子文档以便日后查阅。在PACS的报告系统中,有三种不同层次的方法输入文字—“高级模板”、“分类词条”和“短语词典”。这三种方法的内容都可以…

卡尔曼滤波原理及代码

目录 一.简介 二.原理 1.先验估计原理 2.后验估计原理 3.总结 三.示例 一.简介 卡尔曼滤波(Kalman filtering)是一种利用线性系统状态方程,通过系统输入输出观测数据,对系统状态进行最优估计的算法,它可以在任意…