17、Flink 的 Checkpointing 配置详解

devtools/2024/9/25 2:31:17/
Checkpointing
1.概述

Flink 中的每个方法或算子都能够是有状态的,状态化的方法在处理单个 元素/事件 的时候存储数据,为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)

2.开启与配置 Checkpoint

默认 checkpoint 是禁用的,通过调用 StreamExecutionEnvironmentenableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

checkpoint 其它属性包括

  • checkpoint 存储: 可以设置检查点快照的持久化位置,默认使用 JobManager 堆内存,建议在生产中使用持久性文件系统。

  • 精确一次(exactly-once)对比至少一次(at-least-once):可以在 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入 checkpoint 模式,推荐精确一次,至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。

  • checkpoint 超时:如果 checkpoint 执行的时间超过了配置的阈值,进行中的 checkpoint 会被抛弃。

  • checkpoints 之间的最小时间:在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展,如果值设为 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。

    使用 checkpoints 之间的最小时间,在 checkpoint 的执行时间超过平均值时不会受到影响(例如目标存储系统忽然变得很慢)这个值也意味着并发 checkpoint 的数目是一

  • checkpoint 可容忍连续失败次数:可容忍多少次连续的 checkpoint 失败,超过阈值之后会触发作业错误 fail over,默认次数为 0,不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over;可容忍的 checkpoint 失败情形:Job Manager的IOException,TaskManager 做 checkpoint 时异步部分的失败, checkpoint 超时等;TaskManager 做 checkpoint 时同步部分的失败会直接触发作业fail over;其它的 checkpoint 失败(如一个 checkpoint 被另一个 checkpoint 包含)会被忽略掉。

  • 并发 checkpoint 的数目: 默认在上一个 checkpoint 未完成(失败或成功)的情况下,系统不会触发另一个 checkpoint,以便拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程;允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法调用比较耗时的外部服务),该选项不能和 “checkpoints 间的最小时间” 同时使用。

  • externalized checkpoints: 配置周期存储 checkpoint 到外部系统中,Externalized checkpoints 将元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。

  • 非对齐 checkpoints: 启用非对齐 checkpoints 以便在背压时大大减少创建 checkpoint 的时间,仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。

  • 部分任务结束的 checkpoints: 默认,即使 DAG 的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);// 高级选项:// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
3.相关配置选项
KeyDefaultTypeDescription
state.backend.incrementalfalseBoolean配置状态后端是否应创建增量检查点(如果可能);对于增量检查点,只存储与前一个检查点的diff,而不是完整的检查点状态,启用后,web UI中显示的或从其余API获取的状态大小仅表示增量检查点大小,而不是完整检查点大小,某些状态后端可能不支持增量检查点并忽略此选项。
state.backend.local-recoveryfalseBoolean配置状态后端的本地恢复,默认禁用,本地恢复目前只支持keyed state backends(包括EmbeddedRocksDBStateBackend和HashMapStateBackend)。
state.checkpoint-storage(none)String配置检查点存储,可通过名称指定 [“jobmanager” 或“filesystem”],也可以通过“CheckpointStorageFactory”的类名指定。
state.checkpoint.cleaner.parallel-modetrueBoolean是否使用传递到 cleaner 中的 ExecutorService 并行丢弃检查点的状态
state.checkpoints.create-subdirtrueBoolean是否在“state.checkpoints.dir”下创建以作业id命名的子目录,以存储检查点的数据文件和元数据,默认为true。
state.checkpoints.dir(none)String默认目录,用于在Flink支持的文件系统中存储检查点的数据文件和元数据,存储路径必须可从所有参与的进程/节点(即所有TaskManager和JobManager)访问。
state.checkpoints.num-retained1Integer要保留的已完成检查点的最大数量。
state.savepoints.dir(none)String保存点的默认目录,由将存储点写入文件系统的状态后端(HashMapStateBackend、EmbeddedRocksDBStateBackend)使用。
state.storage.fs.memory-threshold20 kbMemorySize状态数据文件的最小大小,所有小于的状态块都内联存储在根检查点元数据文件中,此配置的最大内存阈值为1MB。
state.storage.fs.write-buffer-size4096Integer写入文件系统的检查点流的写入缓冲区的默认大小,实际写入缓冲区大小被确定为此选项和选项“state.storage.fs.memory threshold”的最大值。
taskmanager.state.local.root-dirs(none)Stringconfig 参数定义根目录,存储用于本地恢复的基于文件的状态;本地恢复目前只覆盖keyed state backends,如果未配置,它将默认为<WORKING_DIR>/localState,<WORKING_DIR>可以通过process.taskmanager.WORKING-dir进行配置
4.配置 State Backend

Flink 的 checkpointing 机制会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及用户自定义的状态。

Checkpoint 存储在哪里取决于配置的 State Backend(比如 JobManager memory、 file system、 database)。

默认情况下,状态是保存在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中,为了持久化大体量状态, Flink 支持存储 checkpoint 状态到其他的 state backends 上。

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
5.迭代作业中的状态和 checkpoint

Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证,在迭代作业上开启 checkpoint 会导致异常。

为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志:

 env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true);

请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。

6.部分任务结束后的 Checkpoint
a)概述

从 1.14 版本开始 Flink 支持在部分任务结束后继续进行Checkpoint,如果一部分数据源是有限数据集,那么就可以。

从 1.15 版本开始,这一特性被默认打开,如果想要关闭这一功能,可以执行:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

此时,结束的任务不会参与 Checkpoint 的过程,在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。

为了支持部分任务结束后的 Checkpoint 操作,调整了 任务的生命周期 并且引入了 StreamOperator#finish 方法,在这一方法中,用户需要写出所有缓冲区中的数据。

在 finish 方法调用后的 checkpoint 中,这一任务不能再有缓冲区中的数据,因为在 finish() 后没有办法输出这些数据,大部分情况下,finish() 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), 在这种情况下,在 finish() 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。

b)对 operator state 的影响

在部分 Task 结束后的 checkpoint 中,Flink 对 UnionListState 进行了特殊的处理, UnionListState 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。

如果在算子的某个并发调用 close() 方法后丢弃它的状态,就会丢失它所分配的分区的偏移量信息,为了解决这一问题,对于使用 UnionListState 的算子,只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。

ListState 一般不会用于类似的场景,仍然需要注意在调用 close() 方法后进行的 checkpoint 会丢弃算子的状态并且 这些状态在算子重启后不可用。

任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作,从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。

c)任务结束前等待最后一次 Checkpoint

为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。

注意:这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显,极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。


http://www.ppmy.cn/devtools/38980.html

相关文章

最简单的自动化爬虫工具--Playwright

Playwright Playwright是一款强大的自动化库&#xff0c;提供了一种简便易用、高性能的网页自动化解决方案。它支持同步和异步两种操作方式&#xff0c;用户无需为不同浏览器单独下载驱动程序&#xff0c;因为Playwright内置了对Chrome、Firefox、Safari等多种浏览器的支持。此…

等保测评—Linux-CentOS标准范例截图

密码输入错误无法登录 用户账户情况包含root、guanli、shenji 查看审计用户权限 身份鉴别&#xff1a; cat /etc/passwd&#xff0c;核查用户名和 UID&#xff0c;是否存在同样的用户名和 UID cat /etc/shadow&#xff0c;查看文件中各用户名状态 &#xff0c; 核查密码一栏为…

搜索算法系列之三(插值查找)

前言 插值查找仅适用于有序数据、有序数组&#xff0c;和二分查找类似&#xff0c;更讲究数据有序均匀分布。 算法原理 插值查找(interpolation search)是一种查找算法&#xff0c;它与二分查找类似&#xff0c;但在寻找元素时更加智能化。这种算法假设数据集是等距的或者有…

用 C 语言进行大模型推理:探索 llama2.c 仓库(二)

文章目录 前提如何构建一个Transformer Model模型定义模型初始化 如何构建tokenzier 和 sampler如何进行推理总结 前提 上一节我们介绍了llama2.c中如何对hugging face的权重进行处理&#xff0c;拿到了llama2.c想要的权重格式和tokenizer.bin格式。这一节我们分析下在llama2.…

华为eNSP学习—IP编址

IP编址 IP编址子网划分例题展示第一步:机房1的子网划分第二步:机房2的子网划分第三步:机房3的子网划分IP编址 明确:IPv4地址长度32bit,点分十进制的形式 ip地址构成=网络位+主机位 子网掩码区分网络位和主机位 学此篇基础: ①学会十进制与二进制转换 ②学会区分网络位和…

elementUi中的el-table合计行添加点击事件

elementUi 文档中&#xff0c;合计行并没有点击事件&#xff0c;这里自己实现了合计行的点击事件。 created() {this.propertyList [{ property: order, label: 序号 },{ property: deptName, label: 单位名称 },{ property: contentPublishQuantity, label: 文章数量 },{ pro…

Nanopc T4 使用OpenCV

识别长方形&#xff1a; import cv2 import cv2 as cv import time import platform import os# 获取操作系统类型 os_type platform.system() if os_type "Windows":# Windows系统cap cv.VideoCapture(0) # 使用第零个摄像头 elif os_type "Linux"…

数据结构之单链表之环形链表

1.题目 题目&#xff1a;给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 2.分析 首先&#xff0c;我们应该判断链表是否有环&#xff0c;这个可以根据我的上一篇文章的快慢指针来判断。 bool hasCycle(stru…