背景
之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:
/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet
因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,
闲说杂谈
继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)val (writeSuccessful, compactionInstant, clusteringInstant) =commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,writeResult, parameters, writeClient, tableConfig, jsc,TableInstantInfo(basePath, instantTime, commitActionType, operation))
-
doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(resultRDD, instantTime, table); }
-
initTable 创建获取一个HoodieSparkMergeOnReadTable
-
validateSchema 校验Schema的兼容性
-
preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)
-
table.upsert 真正写入数据的操作
最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write:public HoodieWriteMetadata<O> write(String instantTime,I inputRecords,HoodieEngineContext context,HoodieTable<T, I, K, O> table,boolean shouldCombine,int shuffleParallelism,BaseCommitActionExecutor<T, I, K, O, R> executor,WriteOperationType operationType) {try {// De-dupe/merge if neededI dedupedRecords =combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);Instant lookupBegin = Instant.now();I taggedRecords = dedupedRecords;if (table.getIndex().requiresTagging(operationType)) {// perform index loop up to get existing location of recordscontext.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());taggedRecords = tag(dedupedRecords, context, table);}Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());HoodieWriteMetadata<O> result = executor.execute(taggedRecords);result.setIndexLookupDuration(indexLookupDuration);return result;} catch (Throwable e) {if (e instanceof HoodieUpsertException) {throw (HoodieUpsertException) e;}throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);}}
- combineOnCondition 数据去重
最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine) - taggedRecords = tag(dedupedRecords, context, table) 因为默认的index是HoodieSimpleIndex,所以这个时候会调用到打标记这个操作
最终调用到的是HoodieSimpleIndex的tagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的location为null) - executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {// Cache the tagged records, so we don't end up computing both// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handlingJavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());} else {LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());}WorkloadProfile workloadProfile = null;if (isWorkloadProfileNeeded()) {context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());LOG.info("Input workload profile :" + workloadProfile);}// partition using the insert partitionerfinal Partitioner partitioner = getPartitioner(workloadProfile);if (isWorkloadProfileNeeded()) {saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);}// handle records update with clusteringSet<HoodieFileGroupId> fileGroupsInPendingClustering =table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();updateIndexAndCommitIfNeeded(writeStatuses, result);return result;}
-
inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速
-
workloadProfile = new WorkloadProfile(buildProfile(inputRecords)
构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key, *Pair<instantTime,count>*为value的Map数据 -
final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.type为DEFAULT),
其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdates和assignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理 -
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)
这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,
之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂) -
updateIndexAndCommitIfNeeded(writeStatuses, result)
该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),
其次如果hoodie.auto.commit为true(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析
-
- combineOnCondition 数据去重
-
postWrite 这里的postCommit 和之前的table.upsert有重复?
-
-
commitAndPerformPostOperations
这里主要是异步Compcation和Clustering以及同步hive元数据,类似Apache Hudi初探(七)(与spark的结合)