Flink checkpoint 源码分析

news/2024/9/23 18:27:43/

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay, baseInterval, TimeUnit.MILLISECONDS);}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {checkNotNull(options);checkNotNull(metrics);// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) {LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());channelStateWriter.abort(metadata.getCheckpointId(),new CancellationException("checkpoint aborted via notification"),true);checkAndClearAbortedStatus(metadata.getCheckpointId());return;}// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise// connection), revert it here so that it can jump over output dataif (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {options = options.withUnalignedSupported();initInputsCheckpoint(metadata.getCheckpointId(), options);}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamLOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}",taskName,System.currentTimeMillis(),metadata.getTimestamp(),System.currentTimeMillis() - metadata.getTimestamp());CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());// Step (3): Register alignment timer to timeout aligned barrier to unaligned barrierregisterAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);// Step (4): Prepare to spill the in-flight buffers for input and outputif (options.needsChannelState()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (5): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}


http://www.ppmy.cn/news/1444874.html

相关文章

构建嵌入空间

1、如何构建嵌入空间 嵌入空间的核心思想正是将高维数据映射到一个低维的连续空间中&#xff0c;同时尽可能保留数据的重要特征和结构。这一过程通常涉及以下几个关键步骤和考虑因素&#xff1a; 选择映射函数&#xff1a;嵌入空间的构建需要一个映射函数&#xff0c;它将原始的…

若依前后端分离版 集成 腾讯云 OS

原因&#xff1a; 最近在根据一个若依二开的项目继续进行开发&#xff0c;当添加到轮播图模块的时候&#xff0c;涉及到了图片上传&#xff0c;由于公司以前一直使用的是腾讯云COS&#xff08;不是阿里云OSS&#xff09;&#xff0c;在网上搜索一番后&#xff0c;没有找到 若依…

DevOps(十四)怎么实现Gitlab更新后Jenkins自动发布

目录 1、在 Jenkins 中安装 GitLab 插件 2、在 GitLab 中创建一个访问令牌(Access Token) 3、在 Jenkins 中配置 GitLab 连接 4、在 Jenkins 中创建一个新的任务(Job) 5、在 GitLab 中配置 Webhook 6、以下是一些补充说明和建议 持续集成的一个特点就是开发可以随时提交&…

Vue的SetUp函数

在Vue 3中&#xff0c;引入了一个名为 setup 的新函数&#xff0c;它是使用组合式API时的一个主要功能。setup 函数是组件的入口点&#xff0c;它在组件创建之前执行&#xff0c;允许你定义组件的响应式状态、计算属性、侦听器和其他函数。这标志着Vue对于更具函数风格编程的支…

使用ULID而不是UUID

什么是ULID&#xff1f; ULID是一种通用唯一字典排序标识符&#xff0c;它比UUID的优势在于可排序性和性能。 ULID&#xff08;Universally Unique Lexicographically Sortable Identifier&#xff09;是一种新型的唯一标识符&#xff0c;由Alizain Feerasta在2016年提出。不…

环状串的字典序

【题目描述】 长度为n的环状串有n种表示法&#xff0c;分别为从某个位置开始顺时针得到。例如&#xff0c;图3-4的环状串有10种表示&#xff1a; CGAGTCAGCT&#xff0c;GAGTCAGCTC&#xff0c;AGTCAGCTCG等。在这些表示法中&#xff0c;字典序最小的称为"最小表示"…

Github Action Bot 开发教程

Github Action Bot 开发教程 在使用 Github 时&#xff0c;你可能在一些著名的开源项目&#xff0c;例如 Kubernetes&#xff0c;Istio 中看到如下的一些评论&#xff1a; /lgtm /retest /area bug /assign xxxx ...等等&#xff0c;诸如此类的一些功能性评论。在这些评论出现…

python学习笔记----函数(五)

一、函数介绍 在 Python 中&#xff0c;函数是一个组织好的、可重用的代码块&#xff0c;用来执行一个单一的、相关的动作。函数提供了代码的模块化和代码复用的能力。它可以接受输入参数&#xff0c;并可以返回一个结果。函数在 Python 编程中是基本的构建块之一。 二、函数…