Flink+Iceberg环境搭建及生产问题处理

news/2024/12/21 22:38:44/

5d5a614dc98a331cd54d68bcb1f79756.png全网最全大数据面试提升手册!

概述

作为实时计算的新贵,Flink受到越来越多公司的青睐,它强大的流批一体的处理能力可以很好地解决流处理和批处理需要构建实时和离线两套处理平台的问题,可以通过一套Flink处理完成,降低成本,Flink结合数据湖的处理方式可以满足我们实时数仓和离线数仓的需求,构建一套数据湖,存储多样化的数据,实现离线查询和实时查询的需求。目前数据湖方面有Hudi和Iceberg,Hudi属于相对成熟的数据湖方案,主要用于增量的数据处理,它跟spark结合比较紧密,Flink结合Hudi的方案目前应用不多。Iceberg属于数据湖的后起之秀,可以实现高性能的分析与可靠的数据管理,目前跟Flink集合方面相对较好。

安装

本次主要基于flink+iceberg进行环境搭建。

1.安装flink

安装并启动hadoop、hive等相关环境。

下载flink安装包,解压后安装:

下载地址: https://archive.apache.org/dist/flink/flink-1.11.3/

wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz
tar xzvf flink-1.11.1-bin-scala_2.12.tgz

导入hadoop的环境包,flink-sql会使用到hdfs和hive等相关依赖包进行通讯。

export HADOOP_CLASSPATH=$HADOOP_HOME/bin/hadoop classpath

启动flink集群

./bin/start-cluster.sh

注:这里会遇到第一个坑,iceberg-0.11.1支持的是flink1.11的版本,如果使用过高的版本,会报一堆找不到类和方法的异常(因为flink1.12版本删掉了许多API)。请使用Flink1.11.x版本进行安装。

2.下载Iceberg环境包

主要是/iceberg-flink-runtime-xxx.jar和flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar两个jar包。

下载地址:

https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.11.1/

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive

3.启动Flink-sql

执行命令启动flink-sql。

./bin/sql-client.sh embedded
-j /iceberg-flink-runtime-xxx.jar
-j /flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
shell
62d1c4b018ec0054ae156dd2a6f59322.png

4.创建Catalog

Flink支持hadoop、hive、自定义三种Catalog。这里以Hive为例。

注:这里会遇到第二个坑,iceberg和flink当前版本支持的是hive2.3.x的版本,推荐安装hive2.3.8版本。不然也会遇到一堆找不到方法和类的异常。

执行命令,创建hive类型的Catalog。

CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://server1:9083','clients'='5','property-version'='1','warehouse'='hdfs://server1/user/hive/warehouse'
);
6abe7e4d3f6b868cad9c68a21319ec66.png

创建成功后的提示

5.创建表

创建DataBase:

create iceberg_db;
use iceberg_db;

创建表:

CREATE TABLE test (id BIGINT COMMENT 'unique id',busi_date STRING
)

6.插入数据和Flink任务执行情况

执行sql插入数据。

5308112d051c1dcfc84b2295a165bc09.png

可以在Flink任务中看到相应的Job。

c5f8759b25a34d418fe84b9646bcdf29.png 2dc6f146d895531b272bbbda1fddb70c.png

7.Iceberg组件介绍

6ebf82b4636801dffe94d0161f21b9d2.png

IcebergStreamWriter

主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给下游算子。

另外一个叫做 IcebergFilesCommitter,主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的数据写入,生成 DataFile。

IcebergFilesCommitter

为每个 checkpointId 维护了一个 DataFile 文件列表,即 map,这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。

在Flink的任务日志中,可以看到对应IcebergStreamWriter和IcebergFilesCommitter的信息,以及snap的ID(3509023638495847835)。

0804b2c0498123a9d9d982e55c9ded4e.png

8.Iceberg文件结构介绍

在HDFS系统中观察Iceberg的整个目录结构,可以看到分为data和metadata两个目录,对应开篇介绍的Iceberg文件结构。

下图中可看到Iceberg文件包含了数据文件、元数据和快照、manifest清单和manifest。

6d2905dd1f4347919e41e95190cb6083.png

观察Iceberg的表元数据文件

hadoop dfs text /user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00004-afffb920-e788-437e-80f0-4187a42ae74b.metadata.json

7e734e03b2debf64fb00b6da11aaea1c.png

可以看到对应的快照信息,表的版本、更新时间戳、manifest清单文件地址等信息。具体的字段描述可以参考官网介绍:https://iceberg.apache.org/spec/#iceberg-table-spec

这里可以看到刚刚Flink任务插入的快照信息(3509023638495847835)

2844034f7de896e29bdf2f6dc9b32ac6.png

观察manifest清单和manifest文件

2efb6295969bc4075d3f5fbd49d44d84.png

9.分区表

采集分区表并插入数据。

CREATE TABLE t_partition (id BIGINT COMMENT 'unique id',busi_date STRING
) PARTITIONED BY (busi_date);
af4b48e5a8d7c04a8aac8d594f39327c.png

可以看到表文件通过分区目录进行了划分,提高查询效率。

e1f5ffe5c366cca993f31c5f01468c2f.png

10.Iceberg执行计划

5f47352ea6f204858dd168f7b8b5b86f.png

11.通过Flink代码的方式操作Iceberg

package com.hyr.flink.icebergimport org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject IcebergDemo {def main(args: Array[String]): Unit = {val conf: Configuration = new Configuration()// 自定义web端口conf.setInteger(RestOptions.PORT, 9000)val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)streamEnv.setParallelism(1)val tenv = StreamTableEnvironment.create(streamEnv)// add hadoop config filetenv.executeSql("CREATE CATALOG hive_catalog WITH (\n  'type'='iceberg',\n  'catalog-type'='hive',\n  'uri'='thrift://server1:9083',\n  'clients'='5',\n  'property-version'='1',\n  'warehouse'='hdfs://server1:8020/user/hive/warehouse'\n)");tenv.useCatalog("hive_catalog");tenv.executeSql("show databases").print()tenv.useDatabase("iceberg_db")tenv.executeSql("show tables").print()tenv.executeSql("select id from test").print() }
}

完整的一个表元数据信息文件:

{"format-version" : 1,"table-uuid" : "cfa12929-0f4c-475c-aca0-7c9cc411a1ac","location" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test","last-updated-ms" : 1622771727393,"last-column-id" : 2,"schema" : {"type" : "struct","fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"} ]},"partition-spec" : [ ],"default-spec-id" : 0,"partition-specs" : [ {"spec-id" : 0,"fields" : [ ]} ],"default-sort-order-id" : 0,"sort-orders" : [ {"order-id" : 0,"fields" : [ ]} ],"properties" : { },"current-snapshot-id" : 555628243696744305,"snapshots" : [ {"snapshot-id" : 8531001366494199026,"timestamp-ms" : 1622770732247,"summary" : {"operation" : "append","flink.job-id" : "371316a9b274ff09f85957afe730e25d","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "1","total-data-files" : "1","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-8531001366494199026-1-63278140-aca4-4cd7-bffc-1e7d0e4b4b1b.avro"}, {"snapshot-id" : 626484522728673979,"parent-snapshot-id" : 8531001366494199026,"timestamp-ms" : 1622770733546,"summary" : {"operation" : "append","flink.job-id" : "bbdcfb52195a0b0b556c6a167fc3de9f","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "636","changed-partition-count" : "1","total-records" : "2","total-data-files" : "2","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-626484522728673979-1-a79e0789-dbc0-4a45-b57d-73575cdccb1d.avro"}, {"snapshot-id" : 4382866461439510817,"parent-snapshot-id" : 626484522728673979,"timestamp-ms" : 1622770735121,"summary" : {"operation" : "append","flink.job-id" : "947829c23ca09fba470204f5b146c191","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "3","total-data-files" : "3","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-4382866461439510817-1-1d78fbb1-7c97-4c3a-b47d-deef81273d0e.avro"}, {"snapshot-id" : 555628243696744305,"parent-snapshot-id" : 4382866461439510817,"timestamp-ms" : 1622771727393,"summary" : {"operation" : "append","flink.job-id" : "f30e7cd040204f737ba8aaf0350340f7","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "4","total-data-files" : "4","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-555628243696744305-1-194acdcb-abdf-4acd-8bc1-5de6d4bb76b0.avro"} ],"snapshot-log" : [ {"timestamp-ms" : 1622770732247,"snapshot-id" : 8531001366494199026}, {"timestamp-ms" : 1622770733546,"snapshot-id" : 626484522728673979}, {"timestamp-ms" : 1622770735121,"snapshot-id" : 4382866461439510817}, {"timestamp-ms" : 1622771727393,"snapshot-id" : 555628243696744305} ],"metadata-log" : [ {"timestamp-ms" : 1622770665028,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00000-8bec1008-0d2d-4dac-82a6-387d9354b2bc.metadata.json"}, {"timestamp-ms" : 1622770732247,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00001-66552574-1458-40a6-8ebc-2d5f2c58a65e.metadata.json"}, {"timestamp-ms" : 1622770733546,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00002-1b1fb277-d165-46cd-a3ee-f2b6c358213f.metadata.json"}, {"timestamp-ms" : 1622770735121,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00003-56ffa5b6-bac5-4a1b-9e8d-2f36fe379610.metadata.json"} ]
}

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

88f5c9aa70b23f2a5e2bb4c45ebd8694.png

a7fcd5aff1c3b8e1a4a8f64e2fdd8b4c.jpeg

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」


http://www.ppmy.cn/news/168101.html

相关文章

linux上怎么实现ssh免密登录

这里直接写步骤,下面的有兴趣可以看看 1.进入到.ssh目录下 [rootwangjian /]# cd /root/.ssh/ [rootwangjian .ssh]# 2. 执行生成密钥,所有的参数都可以为空,也就是一直回车确认: ssh-keygen -t rsa [rootwangjian .ssh]# ssh-keygen -t rsa Generating…

【LED子系统深度剖析】八、小试牛刀

个人主页:董哥聊技术 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强公司! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录 1、硬件管脚确定2、设备树配置3、子系统配置4、编译烧录5、验证5.1 设备树验证5.2 驱…

人工智能数学基础之线性代数(二)

前言 本文只会记录人工智能中所用到的线性代数知识,并不会记录大学线性代数教材中的所有知识。 现在CSDN不能发超长的文章了,只能分成多篇发布。 人工智能数学基础之线性代数(一) 人工智能数学基础之线性代数(二) 人工智能数学基础之线性代数(三) 行列…

华硕java安装教程win10_华硕电脑怎么安装win10?华硕电脑安装win10的图文教程

华硕电脑的性价比比较高,所以很多用户在购买电脑时都选择了这个品牌。现在有很多用户都喜欢使用win10系统,当新买的华硕电脑预装系统不是win10系统时,我们就需要给电脑重装系统了,那么具体应该怎么操作做呢?下面&#…

四阶代数余子式怎么求_已知四阶行列式,试求A41+A42与A43+A44,其中A4j(j=1,2,3,4)是D4中第4行第j个元素的代数余子式. - 搜题宝...

阅读理解:选择题 FREE MASSIVE ONLINE OPEN COURSES (MOOCS) A class with hundreds or even thousands of students might sound like a teachers worst nightmare. But a big idea in higher education these days is Massive Open Online Courses, or MOOCs. Som…

log4j2漏洞复现

log4j2漏洞复现 漏洞描述影响版本漏洞复现环境准备靶场搭建执行命令 修复建议 漏洞描述 Apache Log4j2是一款Java日志框架,是Log4j 的升级版。可以控制每一条日志的输出格式。通过定义每一条日志信息的级别,能够更加细致地控制日志的生成过程。 Apache …

ubuntu20安装xrdp以及解决黑屏问题

1、安装xrdp sudo apt-get install xrdp 2、将xrdp用户加入到如下用户组 sudo adduser xrdp ssl-cert 3、重启xrdp sudo service xrdp restart 4、打开windows远程面,连接,如果出现黑屏 sudo -s sudo vim /etc/xrdp/startwm.sh 加入如下内容&#xff…

Redis五大数据结构的底层实现

一)String类型:可以使用object encoding name就可以查看字符串的编码 SDS,flags的值不同,那么len和alloc所表示的值的数据范围也不同,所以flags的只是为了标识SDS头的总大小; alloc和len刚开始进行申请内存空间的时候都是相同的 S…