Flink创建Hudi的Sink动态表

news/2024/11/29 19:33:03/

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。

createDynamicTableSink

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),"Option [path] should not be empty.");setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);setupSortOptions(conf, context.getConfiguration());return new HoodieTableSink(conf, schema);}

createDynamicTableSource

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->new ValidationException("Option [path] should not be empty.")));setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);return new HoodieTableSource(schema,path,context.getCatalogTable().getPartitionKeys(),conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),conf);}

创建Sink表过程

1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“Option [path] should not be empty.”。

2、做兼容性设置(setupTableOptions):

2.1、如果设置了 hoodie.table.recordkey.fields,但没有设置 hoodie.datasource.write.recordkey.field,则将 hoodie.datasource.write.recordkey.field 的值设置为 hoodie.table.recordkey.fields 的值;

2.2、如果设置了 hoodie.table.precombine.field,但没有设置 precombine.field,则将 precombine.field 的值设置为 hoodie.table.precombine.field 的值;

2.3、如果设置了 hoodie.datasource.write.hive_style_partitioning,但没有设置 hoodie.datasource.write.hive_style_partitioning,则将 hoodie.datasource.write.hive_style_partitioning 的值设置为 hoodie.datasource.write.hive_style_partitioning 的值。

3、必要选项检查:

3.1、检查表的类型(checkTableType),如果 table.type 的值为空,则不做处理,否则必须为 COPY_ON_WRITE 或者 MERGE_ON_READ,不然抛异常Invalid table type: TABLETYPE . Table type should be either MERGE_ON_READ or COPY_ON_WRITE.“;

3.2、如果为非 Append 模式,则检查是否设置了 hoodie.datasource.write.recordkey.field 和 precombine.field。

4、依次设置:

4.1、表名(hoodie.table.name);

4.2、主键(hoodie.datasource.write.recordkey.field);

4.3、分区(hoodie.datasource.write.partitionpath.field);

4.4、如果是 index 类型为 BUCKET,则设置桶(bucket)的键 hoodie.bucket.index.hash.field;

  4.4.1、如果还没有设置 hoodie.bucket.index.hash.field,则使用 hoodie.datasource.write.recordkey.field 的值作为 hoodie.bucket.index.hash.field 的值;4.4.2、否则进一步检查 hoodie.bucket.index.hash.field 的值是否为 hoodie.datasource.write.recordkey.field 值的子集。假设 hoodie.datasource.write.recordkey.field 值为“ds,dh”,则 hoodie.bucket.index.hash.field 值可以为“ds”、“dh”或“ds,dh”。4.5、设置压缩选项:4.5.1、设置 archive.min_commits,4.5.1、设置 archive.max_commits。4.6、设置Hive选项:4.6.1、如果没有设置 hive_sync.db,则设置 hive_sync.db;4.6.2、如果没有设置 hive_sync.table,则设置 hive_sync.table。4.7、设置read选项,如果不是增量查询则什么也不做;否则设置 hoodie.datasource.query.type 值为 incremental 。4.8、设置write选项:如果 write.operation 为默认值且为 cow 表,则实则 write.precombine 为 true 。4.9、如果没有设置 source.avro-schema.path 和 source.avro-schema,则设置 source.avro-schema 。

5、设置排序选项(flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java):

5.1、设置 Flink 的 table.exec.sort.max-num-file-handles

5.2、设置 Flink 的 table.exec.spill-compression.enabled

5.3、设置 Flink 的 table.exec.spill-compression.block-size

5.4、设置 Flink 的 table.exec.sort.async-merge-enabled

Append 模式

write.operation 值为 insert,并且为 mor 表;或则为 cow 表,但是 write.insert.cluster 值为 false。

  • write.insert.cluster

该选项用于控制是否在写入时合并小文件,仅对 cow 类型表有效,默认为 false。如果设置为 true,则每次写入前先合并小文件,这会降低写吞吐量,但可提高读性能。


http://www.ppmy.cn/news/62730.html

相关文章

家里网速越来越慢?路由器附近千万别放这几样东西

我们在日常生活中常常会使用到WiFi,当遇到网络卡顿、网速缓慢时往往感到疑惑,是不是自己的WiFi速度不够?其实除了无线路由器、终端等自身产品质量问题,还有许多外在因素。 在网络本身没有问题的情况下WiFi卡顿或不稳定&#xff0…

Spring Security 6.x 系列【35】认证篇之基于JWT的集成方案

有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 3.0.4 本系列Spring Security 版本 6.0.2 源码地址:https://gitee.com/pearl-organization/study-spring-security-demo 文章目录 1. 前言2. 方案2.1 Cookie Session2.2 Token2.3 JWT3. 案例演示3.1 登录成功…

rk3568 修改开机动画

rk3568 修改开机动画 bootanimation.zip是一个Android设备启动时播放的动画文件,它包含了一系列的图片和描述文件,用于描述动画的播放顺序和持续时间。 文章目录 rk3568 修改开机动画1. 开机动画(bootanimation.zip)构成1.1 资源文件夹1.2 描述文件--de…

Haoop集群的搭建(小白教学)

搭建hadoop集群我们必须拥有自己的虚拟机,下列我会给大家奉上超详细的集群搭建以及我在搭建的时候碰到的问题以及对应解决办法,正所谓自己走过的错路是曲折的,也是防止大家做弯路,不仅浪费时间还心态爆炸,下面带走入ha…

css行内元素、块元素、行内块元素的区别

行内元素的特点: 1.和其他元素在一行显示 2.元素的宽度、高度、行高及底部边距不可编辑 3.元素的宽度就是它包含的文字或图片的宽度,不可改变 4.行内元素只能容纳纯文本或者是其他的行内元素(a标签除外,a里面不能放自己&#xff0…

Java线程池编码示例

第1步:自定义线程实现类 Java中多线程编码时,定义线程类有两种方式: 继承Thread类实现Runnable接口(由于Java的单继承特性,一般推荐使用此方式) public class BizThread implements Runnable {private int …

k8s基础4——deployment控制器、应用部署、升级、回滚、水平扩容缩容

文章目录 一、基本介绍二、应用程序生命周期2.1 部署应用2.2 应用升级2.2.1 修改YAML文件升级(交互式)2.2.2 命令指定镜像版本升级(免交互式)2.2.3 调用vim升级 2.3 滚动升级2.3.1 升级流程 2.4 应用回滚2.4.1 查看历史发布版本2.…

Lodash学习进程

Lodash是一个流行的JavaScript工具库,提供了许多实用的函数和方法,用于简化JavaScript编程中的常见任务和操作。本文将介绍Lodash的基本用法和一些常见的应用场景。 javascript 安装和导入 可以通过npm或yarn安装Lodash,例如:npm …