Flink Connector 写入 Iceberg 流程源码解析_confluent icebergsinkconnector

ops/2025/2/3 2:18:24/
  // 添加 Writer 算子,有并行度SingleOutputStreamOperator<WriteResult> writerStream =appendWriter(distributeStream, flinkRowType, equalityFieldIds);// 添加 Commit 算子,并行度固定为 1 SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);// 添加 sinkreturn appendDummySink(committerStream);
}

### appendWriter 方法

private SingleOutputStreamOperator appendWriter(
DataStream input, RowType flinkRowType, List equalityFieldIds) {

 .... if (flinkWriteConf.upsertMode()) {if (!table.spec().isUnpartitioned()) {// 在 upser 模式下检查分区建必须在 equalityFieldIds 中 for (PartitionField partitionField : table.spec().fields()) {Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",partitionField,equalityFieldColumns);}}}// 创建 streamWriter IcebergStreamWriter<RowData> streamWriter =createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);// 设置并行度 如果没有指定则和输入流的并行度一样int parallelism =flinkWriteConf.writeParallelism() == null? input.getParallelism(): flinkWriteConf.writeParallelism();.... return writerStream;
}

### createStreamWriter 方法

static IcebergStreamWriter createStreamWriter(
Table table,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List equalityFieldIds) {

Table serializableTable = SerializableTable.copyOf(table);
FileFormat format = flinkWriteConf.dataFileFormat();
// 创建 TaskWriterFactory 根据 表的 Schema 创建对应的 Writer
TaskWriterFactory<RowData> taskWriterFactory =new RowDataTaskWriterFactory(serializableTable,flinkRowType,flinkWriteConf.targetDataFileSize(),format,writeProperties(table, format, flinkWriteConf),equalityFieldIds,flinkWriteConf.upsertMode());
// 新建 IcebergStreamWriter
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);

}


### IcebergStreamWriter 类该类为一个 Flink 内部的 OneInputStreamOperator 类,拥有 Flink 算子相关特性

class IcebergStreamWriter extends AbstractStreamOperator
implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {

@Override
public void open() {

// 初始化相关监控类
this.writerMetrics = new IcebergStreamWriterMetrics(super.metrics, fullTableName);

// 初始化 taskWriterFactory 用于创建 writer 
this.taskWriterFactory.initialize(subTaskId, attemptId);// 创建 writer // 主要分成四类// 根据 Iceberg 表是否有分区和开启Upsert模式// UnpartitionedWriter : 无分区 insert only // RowDataPartitionedFanoutWriter : 分区 insert only // UnpartitionedDeltaWriter :无分区 Upsert// PartitionedDeltaWriter :有分区 Upsert
this.writer = taskWriterFactory.create();

}

@Override
public void processElement(StreamRecord element) throws Exception {
// 处理数据写入
writer.write(element.getValue());
}

// 将本次写入数据文件下发至 Commit 进行统一提交
private void flush() throws IOException {
if (writer == null) {
return;
}

long startNano = System.nanoTime();
WriteResult result = writer.complete();
writerMetrics.updateFlushResult(result);
output.collect(new StreamRecord<>(result));
writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));writer = null;

}

}


### IcebergFilesCommitter 类

class IcebergFilesCommitter extends AbstractStreamOperator
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {


@Override
public void initializeState(StateInitializationContext context) throws Exception {

// 最大连续空提交 
// 在间断指定次数 Checkpoint 都没有数据后才真正触发 Commit,生成 Snapshot。// 减少空快照生成
maxContinuousEmptyCommits =PropertyUtil.propertyAsInt(table.properties(), MAX\_CONTINUOUS\_EMPTY\_COMMITS, 10);// 创建 文件输出 OutputFileFactory
this.manifestOutputFileFactory =FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId, subTaskId, attemptId);

if (context.isRestored()) {

// 从状态中恢复未提交的数据文件
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
// 如果存在未提交的文件 进行提交
commitUpToCheckpoint(
uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, maxUncommittedCheckpointId);
}
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {

  // 将 checkpointId 对应的写入完成的DATA-FILE生成清单文件并放入 dataFilesPerCheckpoint
dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
// Reset the snapshot state to the latest state.
checkpointsState.clear();// 存入状态
checkpointsState.add(dataFilesPerCheckpoint);jobIdState.clear();
jobIdState.add(flinkJobId);// Clear the local buffer for current checkpoint.
writeResultsOfCurrentCkpt.clear();
committerMetrics.checkpointDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));

}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {

if (checkpointId > maxCommittedCheckpointId) {
LOG.info(“Checkpoint {} completed. Attempting commit.”, checkpointId);
// 完成 checkpoint 对数据进行 COMMIT
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
LOG.info(
“Skipping committing checkpoint {}. {} is already committed.”,
checkpointId,
maxCommittedCheckpointId);
}
}

private void commitUpToCheckpoint(
NavigableMap<Long, byte[]> deltaManifestsMap,
String newFlinkJobId,
String operatorId,
long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap =
// 获取等待提交的数据文件
deltaManifestsMap.headMap(checkpointId, true);
List manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
// 数据文件为空则跳过
if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
continue;
}

  DeltaManifests deltaManifests =SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());pendingResults.put(e.getKey(),FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));manifests.addAll(deltaManifests.manifests());
}// 获取当前待提交文件的 数据条数及数据文件大小 
CommitSummary summary = new CommitSummary(pendingResults);
// 提交数据
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
// 清除已提交数据
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);

}

private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {

continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
// 数据文件不问 0 或者 连续最大空提交到达了配置的参数阈值触发提交
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
// replace 提交
// 使用 newReplacePartitions()
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
} else {
// 普通提交
// 使用 newAppend()
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;

}

private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, “Cannot overwrite partitions with delete files.”);
// 使用 newReplacePartitions 提交
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0, “Should have no referenced data files.”);
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}

}

private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(
result.referencedDataFiles().length == 0,
“Should have no referenced data files for append.”);
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, summary, “append”, newFlinkJobId, operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
// We don’t commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
WriteResult result = e.getValue();

    RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);// 分别写入 DataFile 和 Delete File Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey());}
}

}

private void commitOperation(
SnapshotUpdate<?> operation,
CommitSummary summary,
String description,
String newFlinkJobId,
String operatorId,
long checkpointId) {

// 提交操作
operation.commit(); // abort is automatically called if this fails.

committerMetrics.commitDuration(durationMs);
}

@Override
public void processElement(StreamRecord element) throws Exception{
final WriteResult value = element.getValue();
if (“DDL”.equalsIgnoreCase(value.getType())) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
doCommit();
} else {
this.writeResultsOfCurrentCkpt.add(element.getValue());
}
}


}



http://www.ppmy.cn/ops/155189.html

相关文章

深入理解若依RuoYi-Vue数据字典设计与实现

深入理解若依数据字典设计与实现 一、Vue2版本主要文件目录 组件目录src/components&#xff1a;数据字典组件、字典标签组件 工具目录src/utils&#xff1a;字典工具类 store目录src/store&#xff1a;字典数据 main.js&#xff1a;字典数据初始化 页面使用字典例子&#xf…

“开源AI智能名片2+1链动模式S2B2C商城小程序源码”在市场推广中的应用与策略

摘要&#xff1a;本文旨在探讨“开源AI智能名片21链动模式S2B2C商城小程序源码”在市场推广中的应用策略。通过分析品牌与企业实力的展示、产品体验分享以及赋能B端合作伙伴的重要性&#xff0c;本文提出了一套系统的市场推广方案。该方案强调以信任为基础&#xff0c;以自用体…

基于GS(Gaussian Splatting)的机器人Sim2Real2Sim仿真平台

项目地址&#xff1a;RoboGSim 背景简介 已有的数据采集方法中&#xff0c;遥操作&#xff08;下左&#xff09;是数据质量高&#xff0c;但采集成本高、效率低下&#xff1b;传统仿真流程成本低&#xff08;下右&#xff09;&#xff0c;但真实度&#xff08;如纹理、物理&…

分布式服务接口的幂等性如何设计(比如不能重复扣款)?

面试题 分布式服务接口的幂等性如何设计&#xff08;比如不能重复扣款&#xff09;&#xff1f; 面试官心理分析 从这个问题开始&#xff0c;面试官就已经进入了实际的生产问题的面试了。 一个分布式系统中的某个接口&#xff0c;该如何保证幂等性&#xff1f;这个事儿其实是…

论文和代码解读:RF-Inversion 图像/视频编辑技术

Diffusion Models专栏文章汇总:入门与实战 前言:Rectified Flow的反演和DDIM这些不太一样,上一篇博客中介绍了腾讯提出的一种方法《基于Rectified Flow FLUX的图像编辑方法 RF-Solver》,主要就是用泰勒展开和一阶导数近似来分解反演公式。这篇博客介绍谷歌提出的方法RF-Inv…

快速提升网站收录:利用网站FAQ页面

本文转自&#xff1a;百万收录网 原文链接&#xff1a;https://www.baiwanshoulu.com/48.html 利用网站FAQ&#xff08;FrequentlyAskedQuestions&#xff0c;常见问题解答&#xff09;页面是快速提升网站收录的有效策略之一。以下是一些具体的方法和建议&#xff0c;以帮助你…

技术速递|.NET 9 中的 OpenAPI 文档生成

作者&#xff1a;Mike Kistler 排版&#xff1a;Alan Wang .NET 9 中的 ASP.NET Core 通过引入全新的对 OpenAPI 文档生成功能的内置支持&#xff0c;简化了为 API 端点创建 OpenAPI 文档的过程。这项新功能旨在简化开发工作流程&#xff0c;并改善 OpenAPI 定义在 ASP.NET 应用…

本地部署DeepSeek开源多模态大模型Janus-Pro-7B实操

本地部署DeepSeek开源多模态大模型Janus-Pro-7B实操 Janus-Pro-7B介绍 Janus-Pro-7B 是由 DeepSeek 开发的多模态 AI 模型&#xff0c;它在理解和生成方面取得了显著的进步。这意味着它不仅可以处理文本&#xff0c;还可以处理图像等其他模态的信息。 模型主要特点:Permalink…