Apache Hudi 在袋鼠云数据湖平台的设计与实践

news/2024/11/19 10:34:08/

在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库管理功能。

本文将介绍袋鼠云基于 Hudi 构建数据湖的整体方案架构及其在实时数据仓库处理方面的特点,并且为大家展示一个使用 Apache Hudi 的简单示例,便于新手上路。

Apache Hudi 介绍

Apache Hudi 是一个开源的数据湖存储系统,可以在 Hadoop 生态系统中提供实时数据仓库处理功能。Hudi 最早由 Uber 开发,后来成为 Apache 顶级项目。

Hudi 主要特性

· 支持快速插入和更新操作,以便在数据仓库中实时处理数据;

· 提供增量查询功能,可有效提高数据分析效率;

· 支持时间点查询,以便查看数据在某一时刻的状态;

· 与 Apache Spark、Hive 等大数据分析工具兼容。

Hudi 架构

Apache Hudi 的架构包括以下几个主要组件:

· Hudi 数据存储:Hudi 数据存储是 Hudi 的核心组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存储类型会在对数据进行更新时,创建一个新的数据文件副本,将更新的数据写入副本中,之后,新的数据文件副本会替换原始数据文件;

· Merge-On-Read:MOR 存储类型会在查询时,将更新的数据与原始数据进行合并,这种方式可以减少数据存储的写入延迟,但会增加查询的计算量;

· Hudi 索引:Hudi 索引用于维护数据记录的位置信息,索引有两种类型:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);

· Hudi 查询引擎:Hudi 查询引擎负责处理查询请求,Hudi 支持多种查询引擎,如 Spark SQL、Hive、Presto 等。

file

Hudi 的使用场景

Apache Hudi 可以帮助企业和组织实现实时数据处理和分析。实时数据处理需要快速地处理和查询数据,同时还需要保证数据的一致性和可靠性。

Apache Hudi 的增量数据处理、ACID 事务性保证、写时合并等技术特性可以帮助企业更好地实现实时数据处理和分析,基于 Hudi 的特性可以在一定程度上在实时数仓的构建过程中承担上下游数据链路的对接(类似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的处理提供存储基础。

Hudi 的优势和劣势

● 优势

· 高效处理大规模数据集;

· 支持实时数据更新和查询;

· 实现了增量写入机制,提高了数据访问效率;

· Hudi 可以与流处理管道集成;

· Hudi 提供了时间旅行功能,允许回溯数据的历史版本。

● 劣势

· 在读写数据时需要付出额外的代价;

· 操作比较复杂,需要使用专业的编程语言和工具。

Hudi 在袋鼠云数据湖平台上的实践

Hudi 在袋鼠云数据湖的技术架构

Hudi 在袋鼠云的数据湖平台上主要对数据湖管理提供助力:

· 元数据的接入,让用户可以快速的对表进行管理;

· 数据快速接入,包括对符合条件的原有表数据进行转换,快速搭建数据湖能力;

· 湖表的管理,监控小文件定期进行合并,提升表的查询性能,内在丰富的表操作功能,包括 time travel ,孤儿文件清理,过期快照清理等;

· 索引构建,提供多种索引包括 bloom filter,zorder 等,提升计算引擎的查询性能。

file

Hudi 使用示例

在介绍了 Hudi 的基本信息和袋鼠云数据湖平台的结构之后,我们来看一个使用示例,替换 Flink 在内存中的 join 过程。

在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的方式来换个思路实现。

● 构建 catalog

public String createCatalog(){String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +"    'type' = 'hudi',\n" +"    'mode' = 'hms',\n" +"    'default-database' = 'default',\n" +"    'hive.conf.dir' = '/hive_conf_dir',\n" +"    'table.external' = 'true'\n" +")";return createCatalog;
}

● 创建 hudi 表

public String createHudiTable(){String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +"  id int ,\n" +"  name VARCHAR(10),\n" +"  age int ,\n" +"  address VARCHAR(10),\n" +"  dt VARCHAR(10),\n" +"  primary key(id) not enforced\n" +")\n" +"PARTITIONED BY (dt)\n" +"WITH (\n" +"  'connector' = 'hudi',\n" +"  'table.type' = 'MERGE_ON_READ',\n" +"  'changelog.enabled' = 'true',\n" +"  'index.type' = 'BUCKET',\n" +"  'hoodie.bucket.index.num.buckets' = '2',\n" +String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +"  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" +");";return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 从 kafka 中读取 topic1

public String createKafkaTable1(){String kafkaSource1 = "CREATE TABLE source1\n" +"(\n" +"    id        INT,\n" +"    name      STRING,\n" +"    age        INT,\n" +"    dt        String,\n" +"    PROCTIME AS PROCTIME()\n" +") WITH (\n" +"      'connector' = 'kafka'\n" +"      ,'topic' = 'join_topic1'\n" +"      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +"      ,'scan.startup.mode' = 'earliest-offset'\n" +"      ,'format' = 'json'\n" +"      ,'json.timestamp-format.standard' = 'SQL'\n" +"      )";return kafkaSource1;
}

02 从 kafka 中读取 topic2

public String createKafkaTable2(){String kafkaSource2 = "CREATE TABLE source2\n" +"(\n" +"    id        INT,\n" +"    name      STRING,\n" +"    address        string,\n" +"    dt        String,\n" +"    PROCTIME AS PROCTIME()\n" +") WITH (\n" +"      'connector' = 'kafka'\n" +"      ,'topic' = 'join_topic2'\n" +"      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +"      ,'scan.startup.mode' = 'earliest-offset'\n" +"      ,'format' = 'json'\n" +"      ,'json.timestamp-format.standard' = 'SQL'\n" +"      )";return kafkaSource2;
}

● 执行插入逻辑1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +"select id, name,age,dt from source1";

● 通过 spark 查询数据

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 执行插入逻辑2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +"select id, name, address,dt from source2";

● 运行成功

运行成功后在 spark 中查询对应的表数据:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

可以发现在第二次数据运行之后,表数据的对应字段 address 已经更新,达到了类似在 Flink 中直接执行 join 的效果。

insert into hudi_catalog.flink_db.test_hudi_flink_join_2 select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

同时,欢迎对大数据开源项目有兴趣的同学加入我们,一起交流最新开源技术信息,号码:30537511,项目地址:https://github.com/DTStack


http://www.ppmy.cn/news/75796.html

相关文章

【华为OD机试真题2023B卷 JAVA】字符串摘要

华为OD2023(B卷)机试题库全覆盖,刷题指南点这里 字符串摘要 知识点字符串排序 时间限制:1s 空间限制:256MB 限定语言:不限 题目描述: 给定一个字符串的摘要算法,请输出给定字符串的摘要值。 1、去除字符串中非字母的符号。 2、如果出现连续字符(不区分大小写),则输…

vc++内部排序算法比较,排序的六种算法之希尔排序,快速排序,堆排序,堆排序.冒泡泡排序

各种内部排序算法的时间复杂度分析结果只给出了算法执行时间的阶,或大概执行时间。试通过随机的数据比较各算法的关键字比较次数和关键字移动次数,以取得直观感受。 2.2基本要求: (1) 对以下6种常用的内部排序算法进…

Git常用命令clone和init和add

Git常用命令clone和init和add 1、clone 拷贝一个 Git 仓库到本地。 # 下载一个项目和它的整个代码历史 # 该命令可用于通过指定的URL获取一个代码库 $ git clone repository_url# 创建一个本地仓库的克隆版本 # 使用本地的一个仓库来创建一个仓库 $ git clone /path/to/repo…

小黑子—Java从入门到入土过程:第十一章 - 网络编程、反射及动态代理

Java零基础入门11.0 网络编程1. 初识网络编程2. 网络编程三要素3.IP三要素3.1 IPV4的细节3.1.1特殊的IP地址3.1.2 常用的CMD命令 3.2 InetAddress 的使用3.3 端口号3.4 协议3.4.1 UDP协议3.4.1 - I UDP 发送数据3.4.1 - II UDP 接收数据3.4.1 - III UDP 练习(聊天室…

智能排班系统 【数据库设计】

文章目录 数据库设计规范ER图物理模型数据表登录日志表操作日志表菜单表角色表企业表门店表省市区表门店节日表消息表职位表排班规则表排班任务表排班结果存储scheduling_date排班日表scheduling_shift排班班次表shift_user班次员工中间表 定时通知表用户表中间表role_menu角色…

Python对Excel文件多表对多表之间的匹配(两种不同表头)——之json版

首先Excel文件多表对多表之间的匹配(VLOOKUP),有多种办法, 1:将Excel文件导入Mysql或其他数据库,然后将两种表合并成一张表,接着用数据库匹配 2:将两种表内容,复制粘贴到一起,各自分别保存成一张表&#xf…

项目实战(cloud)--配置中心Config(码云来做一个配置中心)

服务的拆分原则&#xff1a; 单体应用向微服的一个改造&#xff1a; 搭建一个聚合项目 创建一个maven项目 父项目 pom <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"…

月薪从10k到30k,一个普通测试工程师的3年涨薪之路...

“要涨薪&#xff0c;先跳槽”各个行业都存在这一共识&#xff0c;但是任何行业也都没有像程序员这样更为适用且好用的了。 前不久&#xff0c;就有网友分享了自己作为一个普通的自动化测试工程师的三年真实涨薪经历。但看看这个三年涨薪之路&#xff0c;好像并不普通啊&#…