Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

news/2024/12/4 3:47:43/

背景

之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,

闲说杂谈

继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:

   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)val (writeSuccessful, compactionInstant, clusteringInstant) =commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,writeResult, parameters, writeClient, tableConfig, jsc,TableInstantInfo(basePath, instantTime, commitActionType, operation))
  • doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:

     public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(resultRDD, instantTime, table);
    }
    
    • initTable 创建获取一个HoodieSparkMergeOnReadTable

    • validateSchema 校验Schema的兼容性

    • preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)

    • table.upsert 真正写入数据的操作
      最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write

        public HoodieWriteMetadata<O> write(String instantTime,I inputRecords,HoodieEngineContext context,HoodieTable<T, I, K, O> table,boolean shouldCombine,int shuffleParallelism,BaseCommitActionExecutor<T, I, K, O, R> executor,WriteOperationType operationType) {try {// De-dupe/merge if neededI dedupedRecords =combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);Instant lookupBegin = Instant.now();I taggedRecords = dedupedRecords;if (table.getIndex().requiresTagging(operationType)) {// perform index loop up to get existing location of recordscontext.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());taggedRecords = tag(dedupedRecords, context, table);}Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());HoodieWriteMetadata<O> result = executor.execute(taggedRecords);result.setIndexLookupDuration(indexLookupDuration);return result;} catch (Throwable e) {if (e instanceof HoodieUpsertException) {throw (HoodieUpsertException) e;}throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);}}
      
      • combineOnCondition 数据去重
        最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine
      • taggedRecords = tag(dedupedRecords, context, table) 因为默认的indexHoodieSimpleIndex,所以这个时候会调用到打标记这个操作
        最终调用到的是HoodieSimpleIndextagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的locationnull
      • executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:
        public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {// Cache the tagged records, so we don't end up computing both// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handlingJavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());} else {LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());}WorkloadProfile workloadProfile = null;if (isWorkloadProfileNeeded()) {context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());LOG.info("Input workload profile :" + workloadProfile);}// partition using the insert partitionerfinal Partitioner partitioner = getPartitioner(workloadProfile);if (isWorkloadProfileNeeded()) {saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);}// handle records update with clusteringSet<HoodieFileGroupId> fileGroupsInPendingClustering =table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();updateIndexAndCommitIfNeeded(writeStatuses, result);return result;}
        
        • inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速

        • workloadProfile = new WorkloadProfile(buildProfile(inputRecords)
          构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key, *Pair<instantTime,count>*为value的Map数据

        • final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.typeDEFAULT),
          其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdatesassignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理

        • mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)
          这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,
          之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)

        • updateIndexAndCommitIfNeeded(writeStatuses, result)
          该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),
          其次如果hoodie.auto.committrue(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析

    • postWrite 这里的postCommit 和之前的table.upsert有重复?

  • commitAndPerformPostOperations
    这里主要是异步Compcation和Clustering以及同步hive元数据,类似Apache Hudi初探(七)(与spark的结合)


http://www.ppmy.cn/news/306998.html

相关文章

一个关于宏定义的问题,我和ChatGPT、NewBing、Google Bard、文心一言 居然全军覆没?

文章目录 一、问题重述二、AI 解题2.1 ChatGPT2.2 NewBing2.3 Google Bard2.4 文心一言2.5 小结 一、问题重述 今天在问答模块回答了一道问题&#xff0c;要睡觉的时候&#xff0c;又去看了一眼&#xff0c;发现回答错了。 问题描述&#xff1a;下面的z的值是多少。 #define…

JSP+SQL网上书店销售系统(论文+系统)

本次毕业设计的题目就是网上书店系统。 本论文就毕业设计的内容,系统地阐述了整个网上书店系统的功能及实现。我们小组人员在指导老师的带领下设计并实现了从商品管理,商品分类和查询,到购物车实现,用户订单处理,再到聊天室,管理员系统。基本上实现了电子商务的功能流程…

15 笔记本电脑进入BIOS/EFI方式汇总

BIOS即基本输入输出系统【Basic Input Output System】&#xff0c;是个人电脑启动之时加载的第一个软件。人为可以设置BIOS&#xff0c;而设置BIOS中的虚拟化开关也是电脑装VMware等虚拟化软件的必要步骤。与BIOS类似的还有一个是EFI【可扩展固件接口 Extensible Firmware Int…

dell n5110 触摸板驱动

http://ftp.dell.com/input/R305170.exe 从这个地址下载安装文件&#xff0c;安装重启

华为存储OceanStor 5110V5 CA证书即将过期告警处理

故障现象&#xff1a; 华为存储OceanStor 5110V5&#xff0c;告警灯亮红色&#xff0c;如下图&#xff1a; 故障分析&#xff1a; 登录存储管理平台&#xff0c;发现有告警如下&#xff1a; 根据提示&#xff0c;很明显&#xff0c;CA证书即将过期&#xff0c;需要更新证书。…

戴尔N5110光驱SSD安装系统(HDD+SSD双系统)

新买了SSD准备装到笔记本光驱位并安装系统&#xff0c;但是不想损坏原硬盘(HDD)中系统和数据&#xff0c;记录一下安装过程&#xff1a; 因为默认总是从主硬盘引导系统&#xff0c;所以要保证主硬盘有系统或在主硬盘分一片区域用于引导系统&#xff0c;这里主硬盘已有系统。 …

三星GT-N5110不充电问题解决案例分享

三星GT-N5110不充电问题解决案例分享 一、故障描述&#xff1a; 之前发布的GT-N5110刷机升级后&#xff0c;那个平板就一直丢在角落没有碰过了。有个网友希望我帮忙看看他升级后功耗大的问题&#xff0c;我再次拿出来发现不知道什么时候屏幕一角碰到过&#xff0c;然后屏幕中…

STM32驱动_NOKIA5110

文章目录 english_6x8_pixel.hwrite_chinese_string_pixel.hnokia_5110.hnokia_5110.cmain.c english_6x8_pixel.h english_6x8_pixel.h如下&#xff1a; const unsigned char font6x8[][6] {{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }, // sp{ 0x00, 0x00, 0x00, 0x2f, 0x00, …