20250120 深入了解 Apache Flink 的 Checkpointing

devtools/2025/1/23 1:03:09/

Apache Flink 是一种用于实时流处理和批处理的分布式计算框架。在实时流处理任务中,保证数据的一致性和任务的容错性是至关重要的,而 Flink 的 Checkpointing 机制正是实现这一目标的核心技术。

本文将详细介绍 Flink 的 Checkpointing,包括其概念、原理、配置和实际应用。


什么是 Checkpointing?

Checkpointing 是 Flink 提供的一种用于容错的机制。它会在流处理任务运行过程中,定期将作业的状态流的处理进度保存到外部持久化存储(例如 HDFS 或 S3)中。当任务因故障而中断时,Flink 可以从最近一次成功的 Checkpoint 恢复,继续任务执行,而无需重新处理已经完成的数据。

Checkpointing 的核心功能
  1. 状态保存
    • 保存任务中所有算子的状态,例如窗口聚合、累加器或其他操作的中间结果。
  2. 进度保存
    • 保存流处理中数据源的消费位置(如 Kafka 的偏移量)。
  3. 故障恢复
    • 任务失败时,从最近的 Checkpoint 恢复状态和进度,保证作业的一致性。

Checkpointing 的原理

Flink 的 Checkpointing 采用 两阶段提交协议(Two-Phase Commit Protocol) 来确保状态的一致性。这一过程分为以下几个阶段:

1. 触发 Checkpoint
  • JobManager 定期触发 Checkpoint(由 enableCheckpointing 配置间隔时间),向所有并行任务发送 Checkpoint 触发信号。
2. 保存状态
  • 每个算子将其当前状态保存到本地或远程存储(如 HDFS、S3)。
  • 数据源(如 Kafka)会记录当前消费的偏移量。
3. 提交 Checkpoint
  • 当所有算子成功完成状态保存后,JobManager 将 Checkpoint 标记为成功。
  • 任务的恢复点会更新为该 Checkpoint。
4. 故障恢复
  • 如果任务失败,Flink 会从最近一次成功的 Checkpoint 恢复作业状态和数据流进度,确保任务继续执行。

如何启用 Checkpointing

在 Flink 程序中,启用 Checkpoint 非常简单,只需在执行环境中调用 enableCheckpointing 方法:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CheckpointExample {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启 Checkpoint,每隔 5000 毫秒触发一次env.enableCheckpointing(5000);// 配置 Checkpoint 的额外参数env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置超时时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只允许一个 Checkpointenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 两次 Checkpoint 之间的最小间隔// 添加数据源和作业逻辑env.fromElements("hello", "flink", "checkpointing").map(String::toUpperCase).print();// 执行作业env.execute("Flink Checkpoint Example");}
}

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class CheckpointExample { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,每隔 5000 毫秒触发一次 env.enableCheckpointing(5000); // 配置 Checkpoint 的额外参数 env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只允许一个 Checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 两次 Checkpoint 之间的最小间隔 // 添加数据源和作业逻辑 env.fromElements("hello", "flink", "checkpointing") .map(String::toUpperCase) .print(); // 执行作业 env.execute("Flink Checkpoint Example"); } }

重要配置
  • enableCheckpointing(interval):设置 Checkpoint 的触发间隔,单位为毫秒。
  • setCheckpointTimeout(timeout):设置单个 Checkpoint 的最大超时时间。
  • setMaxConcurrentCheckpoints(n):设置同时允许进行的最大 Checkpoint 数量。
  • setMinPauseBetweenCheckpoints(milliseconds):两次 Checkpoint 之间的最小间隔时间。

Checkpointing 的应用场景

1. Kafka 数据消费

在使用 Kafka 作为数据源时,Checkpoint 会保存 Kafka 的偏移量。当任务重启时,Flink 会从最近的偏移量开始重新消费数据,确保数据不会丢失或重复处理。

2. 窗口操作

对于基于窗口的聚合操作(如实时统计点击量),Checkpoint 保存中间结果。当任务失败后,中间结果可以恢复,不需要重新计算。

3. 用户状态管理

用户自定义的状态(例如计数器、缓存)也可以通过 Checkpoint 保存。通过恢复这些状态,确保任务逻辑的一致性。


Checkpointing 与 Savepoint 的区别

特性CheckpointSavepoint
触发方式自动触发(定期执行)手动触发
用途故障恢复程序升级、迁移、测试
存储生命周期短期(任务失败后自动清理)长期(由用户管理,手动删除)
操作复杂度无需手动操作需要用户显式触发

Checkpointing 的注意事项

1. 存储路径
  • Checkpoint 的数据通常会存储在外部持久化存储中,如 HDFS、S3 或本地文件系统。
  • 配置存储路径:
    env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:port/flink/checkpoints");
    

2. 性能影响
  • Checkpoint 是一个重量级操作,频率过高可能会影响任务性能。
  • 通常设置为每 5-10 秒触发一次,根据任务需求进行调整。
3. 容错机制
  • Checkpoint 默认提供 精确一次(Exactly Once) 的语义。如果对性能要求较高,可以选择 至少一次(At Least Once)

总结

Flink 的 Checkpointing 是流处理容错的核心技术,具备以下特点:

  1. 定期保存任务的状态和进度,确保数据一致性。
  2. 支持任务的快速恢复,避免重新处理已完成的数据。
  3. 与外部存储(如 HDFS、S3)的集成,为分布式任务提供强大的容错能力。

在实际使用中,Checkpointing 是实现 高可用性数据一致性 的基础。通过合理配置 Checkpoint,可以确保 Flink 作业在高负载和分布式环境下的可靠运行。

如果你正在使用 Flink 进行实时流处理任务,Checkpoint 是你必须深入了解和掌握的关键机制! 😊


http://www.ppmy.cn/devtools/152733.html

相关文章

Dockerfile另一种使用普通用户启动的方式

基础镜像的Dockerfile # 使用 Debian 11.9 的最小化版本作为基础镜像 FROM debian:11.11# 维护者信息 LABEL maintainer"caibingsen" # 复制自定义的 sources.list 文件(如果有的话) COPY sources.list /etc/apt/sources.list # 创建…

【深度学习】神经网络之Softmax

Softmax 函数是神经网络中常用的一种激活函数,尤其在分类问题中广泛应用。它将一个实数向量转换为概率分布,使得每个输出值都位于 [0, 1] 之间,并且所有输出值的和为 1。这样,Softmax 可以用来表示各类别的预测概率。 Softmax 函…

Docker:基于自制openjdk8镜像 or 官方openjdk8镜像,制作tomcat镜像

一、制作openjdk8基础镜像【基于自定义alpine-3.18.0:v1 】 docker pull maven:3.5.0-jdk-8-alpine 78.56 MB https://hub.docker.com/_/maven/tagspage8&namealpine openjdk二进制下载地址 https://blog.csdn.net/fenglllle/article/details/124786948 https://adoptope…

商汤善惠获金沙江创投领投A轮融资,聚焦零售AI业务

1月20日,商汤善惠宣布完成A轮融资,本轮融资由金沙江创投数千万元领投,微木资本、嘉实基金和金弘基金等知名资管平台和产业资本数千万元跟投,鞍羽资本担任长期财务顾问。 此次融资将重点投向零售AI算法研发创新、海外市场拓展战略…

机器学习皮马印第安人糖尿病数据集预测报告

目录 1.项目选题与设计方案 1.1项目选题 1.2设计方案 2.功能实现 2.1 主要功能介绍 2.2 开发环境及平台介绍 2.3 实现过程 2.3.1数据分析 2.3.2算法设计 2.3.3 python代码 3.个人心得体会 1.项目选题与设计方案 1.1项目选题 我国的糖尿病患者初诊时约8&a…

【计算机视觉】人脸识别

一、简介 人脸识别是将图像或者视频帧中的人脸与数据库中的人脸进行对比,判断输入人脸是否与数据库中的某一张人脸匹配,即判断输入人脸是谁或者判断输入人脸是否是数据库中的某个人。 人脸识别属于1:N的比对,输入人脸身份是1&…

【深度学习】3.损失函数的作用

损失函数的作用 假设把猫这张图片分成四个像素点,分别为:56、231、24、2(实际应该是三维的,因为还有颜色通道的维度,这里简化成二维)。 像素点拿到以后,进行三分类,粉红色为第一组W…

kafka学习笔记2 —— 筑梦之路

KRaft模式 Kafka的KRaft模式是一种新的元数据管理方式,旨在去除对ZooKeeper的依赖,使Kafka成为一个完全自包含的系统。在Kafka的传统模式下,元数据管理依赖于ZooKeeper,这增加了部署和运维的复杂性。为了解决这个问题,…