22、Flink 背压下的 Checkpoint处理

devtools/2024/9/25 2:33:09/
1.概述

通常,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响;但当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间;可以通过高 alignment time and start delay metrics 观察到,解决方案如下:

  • 解决背压问题。优化 Flink 作业,调整 Flink 或 JVM 参数,通过扩容。
  • 减少 Flink 作业中缓冲在 In-flight 数据的数据量。
  • 启用非对齐 Checkpoints。

上述选项并不是互斥的,可以组合使用。

2.缓冲区 Debloating

Flink 1.14 引入了缓冲区 Debloating 机制,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量;可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。

此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 更明显;当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。

3.非对齐 Checkpoints
a)概述

从 Flink 1.11开始,Checkpoint 可以是非对齐的;Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区,使 Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。

如果 Checkpointing 由于背压导致周期非常的长,应该使用非对齐 Checkpoint,这样 Checkpointing 时间基本上就与 端到端延迟无关;但是非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真正的瓶颈时,不应当使用非对齐 Checkpointing。

启用非对齐 Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用非对齐 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

或者在 flink-conf.yml 配置文件中增加配置

execution.checkpointing.unaligned: true
b)对齐 Checkpoint 超时

在启用非对齐 Checkpoint 后,依然可以通过编程的方式指定对齐 Checkpoint 的超时时间

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

或是在 flink-conf.yml 配置文件中配置:

execution.checkpointing.aligned-checkpoint-timeout: 30 s

在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。

c)限制

并发 Checkpoint

Flink 当前并不支持并发的非对齐 Checkpoint;Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费更长的时间。

与 Watermark 的相互影响

非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证;目前,Flink 确保 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。

在非对齐 Checkpoint 中,当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark,如果 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于使用对齐 Checkpoint 产生不同的结果;如果 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中;此时 Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。

与长时间运行的记录处理交互

尽管有未对齐的检查点,但 barrier 能够越过队列中的其它记录;如果当前记录需要大量时间进行处理,则此 barrier 的处理仍可能延迟,例如在窗口操作中同时触发多个定时器时。

当系统处理单个输入记录时被阻止,在等待多个网络缓冲区可用时,可能会出现 Flink 不能中断对单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理;

原因要么是由于不适合单个网络缓冲区的大型记录的串行化,要么是在 flatMap 操作中,为一个输入记录生成许多输出记录;此时背压可以阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区可用;当单个记录的处理需要一段时间时,也可能发生在其它情况下,导致检查点的时间可能高于预期。

某些数据分布模式不是检查点式的

有一部分包含属性的的连接无法与 Channel 中的数据一样保存在 Checkpoint 中;为了保留这些特性并且确保没有状态冲突或 非预期的行为,非对齐 Checkpoint 对于这些类型的连接是禁用的,所有其他的交换仍然执行非对齐 Checkpoint。

点对点连接

目前没有任何对于点对点连接中有关数据有序性的强保证;由于数据已经被前置的 Source 或是 KeyBy 相同的方式隐式组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。

只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性,但是如果加上 UC 的伸缩容,这些特性将会被改变。

针对如下任务

image-20240509104253843

如果想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据划分到3个 Channel 中去;通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法可以实现;但对于 Forward 的 Channel,没有 KeyContext,Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key 仍然存在。

广播 Connections

广播 Connection 无法保证所有 Channel 中的记录都以相同的速率被消费,可能导致某些 Task 已经应用了与特定广播事件对应的状态变更,而其他任务则没有。

image-20240509104545856

广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同;Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。

在恢复时,将该份副本发往所有的 Operator,可能导致某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应用于记录来获得状态。

4.故障排除

in-flight 中的数据损坏。

注意:以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。

为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。

该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略;除非已经持久化的 In-flight 数据内部的损坏导致无法恢复的情况,否则不要设置该属性。

只有在重新部署作业后该属性才会生效,只有启用 externalized checkpoint 时,此操作才有意义。


http://www.ppmy.cn/devtools/38983.html

相关文章

89C52单片机+ESP8266做的物联网+反馈 e4a手机客户端源程序

资料下载地址:89C52单片机ESP8266做的物联网反馈 e4a手机客户端源程序 MCU是89C52单片机 WiFi模块是ESP8266 其他 8路继电器 电源模块 使用贝壳物联做服务器 还有客户端。 也可以用花生壳做内网穿透,8266做服务器,也可以实现物联以及反馈&a…

替换spring-boot中的组件版本

spring-boot是一个用于简化开发的框架,引入spring-boot后会自动包含spring框架,通过引入xxx-start来完成指定组件的功能。比如: spring-boot-starter-web(嵌入 Tomcat 和 web 开发需要的 servlet 和 jsp 支持)spring-boot-starter-data-jpa(…

17、Flink 的 Checkpointing 配置详解

Checkpointing 1.概述 Flink 中的每个方法或算子都能够是有状态的,状态化的方法在处理单个 元素/事件 的时候存储数据,为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。 2.开启与配置 Checkpoint 默认 c…

最简单的自动化爬虫工具--Playwright

Playwright Playwright是一款强大的自动化库,提供了一种简便易用、高性能的网页自动化解决方案。它支持同步和异步两种操作方式,用户无需为不同浏览器单独下载驱动程序,因为Playwright内置了对Chrome、Firefox、Safari等多种浏览器的支持。此…

等保测评—Linux-CentOS标准范例截图

密码输入错误无法登录 用户账户情况包含root、guanli、shenji 查看审计用户权限 身份鉴别: cat /etc/passwd,核查用户名和 UID,是否存在同样的用户名和 UID cat /etc/shadow,查看文件中各用户名状态 , 核查密码一栏为…

搜索算法系列之三(插值查找)

前言 插值查找仅适用于有序数据、有序数组,和二分查找类似,更讲究数据有序均匀分布。 算法原理 插值查找(interpolation search)是一种查找算法,它与二分查找类似,但在寻找元素时更加智能化。这种算法假设数据集是等距的或者有…

用 C 语言进行大模型推理:探索 llama2.c 仓库(二)

文章目录 前提如何构建一个Transformer Model模型定义模型初始化 如何构建tokenzier 和 sampler如何进行推理总结 前提 上一节我们介绍了llama2.c中如何对hugging face的权重进行处理,拿到了llama2.c想要的权重格式和tokenizer.bin格式。这一节我们分析下在llama2.…

华为eNSP学习—IP编址

IP编址 IP编址子网划分例题展示第一步:机房1的子网划分第二步:机房2的子网划分第三步:机房3的子网划分IP编址 明确:IPv4地址长度32bit,点分十进制的形式 ip地址构成=网络位+主机位 子网掩码区分网络位和主机位 学此篇基础: ①学会十进制与二进制转换 ②学会区分网络位和…