04. Flink的状态管理与容错机制

embedded/2025/2/8 17:53:49/

本文主要介绍下Flink中的状态种类,Flink的检查点机制,状态后端,端到端的一致性保证还有Flink的任务重启和恢复策略

1.Flink中的状态分类

flink的状态主要分为,KeyedState,OperatorState,BroadcastState三类,其中KeyedState和BroadcastState是比较常用的,

  • keyedState: 又分为ValueState,ListState,ReducingState,AggregatingState,MapState这些state看名字就知道它们的作用这里就不多说了,需要注意下的是ReducinngState和AggregationState这两个state都是聚合数据的state而且都是通过add方法添加数据,数据的计算逻辑 按照定义的方法计算,不同的是AggregationState输入和输出可以是不同的类型,如图介绍下它的具体使用方法

  • 在open()里定义state的描述器并初始化它

  • 在processElement()这样的富函数中根据数据流中的每个元素使用和添加新元素到状态中
    在这里插入图片描述

  • BroadcastState: 它一般用于小规模数据集需要实时更新的应用场景,它的具体使用方法如下图所示,通过DataStream流调用broadcast方法,这就定义好了广播流,然后通过继承BroadcastProcessFunction抽象类,实现对应的processElement()和processBroadcastElement()方法即可
    在这里插入图片描述

2. Flink CheckPoint

checkpoint机制是flink实现容错和故障恢复的基石,它是通过间歇性的往正常的数据流中插入一种叫barrier的特殊数据流,barrier会随着正常数据流往各个算子中流转,当barrier流转到当前算子的时候,此算子就会把它自己运行时所有的状态保存到本地,就这样一直到barrier流转到最后一个算子并且它也做好了相应的状态保存,此时所有算子对应的TM就会通知JM此次分布式快照已经完成,JM就会做全局的checkpoint操作。就这样如果发生故障需要恢复,只需把所有算子的状态恢复到最近一次成功的checkpoint中记录的状态即可。
在这里插入图片描述

2.1 多并行度下的checkpoint机制

上面讲的是单个并行度的情况,在多并行度的情况下checkpoint机制是不一样的,需要注意的是,barrier往下游流转时是按照一个一个算子往下流的。所以就会出现上游流转到下游并行度变大这种情况,此时上游是通过广播流的形式把上游barrier广播到下游的所有分区中完成barrier到下个算子的流转,如果是上游多个分区往下游少数分区中发,此时就需要对下游的barrier进行对齐才行

  • barrier对齐: 在多并行度的情况下,各个并行度的性能不一样,到达下游的时间肯定会有差异,barrier对齐机制就要求,先到达的需要阻塞住自己的正常数据流,等最慢的并行度也到达这个算子才可以做算子状态的本地化存储操作。如果慢的并行度比较慢的话,这就会导致数据流整体的流速减慢,导致数据的反压,也会导致checkpoint时间延长等等恶性循环。
  • barrier不对齐: 针对上面出现的问题,flink引入了barrier不对齐机制,它有两种实现方式
    • exactly Once(精准一次)语义: 假设有3个并行度,往map算子里流,其中2号并行度优先达到map算子,此时map算子立马进行本地快照,同时记录2号并行度在map中的状态,然后记录1号和3号并行度barrier和map之间的所有元素(inputbuffers),还会记录此时经过map计算往下游输出的元素(output buffers),最后把1号和3号barrier停掉,到此为止map算子的一次性快照完成。
      在这里插入图片描述
    • at least Once(至少一次)语义:假设也是3个并行度往map算子里流,也是2号算子先到达map算子,此时barrier不阻塞,也不做本地快照,2号并行度barrier继续往下流转,一直到3个并行度中最慢的那个并行度也到了map之后才开始做本地快照,此时本地快照记录的是各个并行度在map中的状态,但是此刻快的并行度处理的数据肯定比它本身barrier到时刻处理多一些元素,这些元素在状态重启的时就会出现重复消费的情况,这也是符合至少一次消费的语义。

2.2 checkpoint的设置参数

  • checkpoint存储: CK可以主要的存储地方是内存和文件系统中,具体参考状态后端章节
env.getCheckpintConfig().setCheckpointStorage("hdfs://mycluster/flink/checkpoints");
  • checkpoint模式设置: CK的模式中常用的有如下几种,exactly-once 语义保证了端到端的一致,在数据要求比较高的场景(不能丢数据),但是它的性能相对较弱,两一种是at-least-once语义更适合时延和吞吐要求非常高但数据一致性要求不高的场景
env.getCheckpointConfig().setCheckpointMode(CheckpointMode.AT_LEAST_ONCE);
  • 其他重要参数:
    • 指定完成CK的超时时间:env.getCheckpointConfig().setCheckpointTimeout(10601000)默认10分钟
    • 指定两个CK之间的时间间隔:env.getCheckpointConfig().setCheckpointPauseBetweenCheckpoint(500);默认是0
    • CK的最大并行度:env.getCheckpointConfig().setMaxConcurrentCheckpoints(1),默认并行度是1
    • 可容忍CK的失败次数:env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0)没有默认值
    • CK不对齐设置:env.getCheckpointConfig().enableUnalignedCheckpoints()

3 状态后端

Checkpoint的的保存过程都是由状态后端协调和管理,具体来说状态后端有两个主要职责:

  • TaskManager本地状态管理: 状态后端负责任务在每个TM上的本地状态,并提供了状态的存储和访问接口
  • 检查点写入远程持久化存储: 它负责把Checkpoint的数据存储到指定的远程持久化存储介质中

3.1 状态后端分类

Flink提供了HashMapStateBackend,FSStateBackend,RockDBStateBackend三种状态后端

  • HashMapStateBackend: 基于Flink的堆内存存储状态数据,特点是高效,但是容易内存溢出和状态数据丢失等问题
  • FSStateBackend: 基于文件系统存储状态,可以是本地文件系统也可以是HDFS这样的分布式系统,特点是适合存储任务状态非常大的场景,但是效率相对低下
  • RocksDBStateBackend: 基于RockDB这样一个存储系统,也是把数据存储在本地,同时它比单纯的文件系统在性能上要好,从性能上来说是介于FSStateBackend和RockDBStateBackend之间,同时它是采用增量方式对快照进行存储的,基于以上几点推荐在生产环境中使用

4 Flink端到端一致性保证

4.1 数据处理的语义

分布式系统从严格程度会有以下三种语义,分别是,exactly-once,at-least-once,at-most-once

  • exactly-once: 它要求每笔数据都是精准一次的被处理
  • at-least-once: 要求数据至少被消费一次
  • at-most-once: 数据至多被消费一次

4.2 端到端的数据处理一致性

Flink中数据的一致性指数据的输入,Flink系统内部和数据的输出整体的一致性,系统内部和数据输入的一致性实现相对简单,数据输出的一致性当对于复杂

  • 输入端的一致性: 只需要保证数据源能按照不变的顺序重放数据即可,典型的就是文件系统是可以的
  • Flink系统内部处理的一致性: 通过checkpoint机制就可以保证
  • 输出端数据一致性: 由于数据输出到外部系统之后,成功与否本程序是无法保证的,所有就要求输出端必须具备幂等或支持事务。幂等是指重复写入相同数据只影响一次结果,如Redis写入HBase写入,然而幂等写入的场景比较受限,所以使用事务方式保证输出端数据一致性是主要的思路,Flink中通过两阶段提交的方式实现

5 Flink的两阶段提交

两阶段提交主要是针对支持事务的外部系统实现,输出端一致性的机制,顾名思义它主要是分为两个阶段,分别是pre-commit和commit阶段,Flink在具体实现上是通过结合checkpoint机制一起实现的。
总的来说它是把外部的事务当成是系统内部状态,然后通过checkpoint机制实现,具体实现步骤如下:

  • pre-commit阶段: 当checkpoint机制往数据流中插入barrier的时候pre-commit阶段开始,直到barrier流经整个job的最后一个算子,并且所有的算子都做好本地的快照,此时预提交阶段结束
  • commit阶段: 在JobManager收到所有算子都完成对应的本地快照之后,它会进行全局的checkpoint操作,当全局的checkpoint完成,此时commit阶段开始,结合外部系统,进行事务提交操作
    不管是在pre-commit或是commit阶段的任何一个节点没有成功,程序都会回滚到上一次成功的checkpoint的状态

5.1 两阶段提交代码实现

自定义实现两阶段提交逻辑,主要的步骤是继承TwoPhaseCommitSinkFunction这个抽象类,然后分别实现如下五个方法

  • beginTransaction: 开启事务,这个方法中一般实现外部系统的一些初始化操作
  • invoke: 每笔数据来都会调用的操作,一般实现数据的处理和写入操作
  • preCommit: 预提交阶段,当barrier到达后,会调用该方法并调用beginTransaction方法开启一个新事务
  • commit: 真正执行事务提交的方法,一般实现外部系统事务提交的方法
  • abort: job一般报错或是出现异常调用的方法,一般实现外部系统事务回滚和事务终止操作

http://www.ppmy.cn/embedded/160589.html

相关文章

ios应用想要下载到手机上只能苹果签名吗

ios APP想要下载到手机上有好几种方式,但是还是需要苹果签名的,苹果签名为ios应用的下载提供了最后一步的帮助,可以说苹果签名就是APP的终点站,是APP的核心。 第一种方式可以采取越狱的方式,但是我们也是知道的&#x…

Maven 依赖管理基础

Maven 是一个强大的构建工具,依赖管理是其核心功能之一。通过 Maven,开发者可以轻松管理项目中的外部库和框架的依赖,自动化地下载、更新和处理这些依赖。在本篇博客中,我们将深入探讨如何在 Maven 中管理项目的依赖,包…

tolua[一]框架搭建,运行example

一.安装tolua https://github.com/topameng/tolua 下载LuaFramework_UGUI的zip 将Assets目录拷贝到项目根目录下 提示确认注册,遇到这个对话框点确定即可 生成如下目录 二.LuaFramework->Build Windows Resource 接下来的目标是将这个main场景跑起来 需要先执行…

【开源免费】基于SpringBoot+Vue.JS健身房管理系统(JAVA毕业设计)

本文项目编号 T 180 ,文末自助获取源码 \color{red}{T180,文末自助获取源码} T180,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

【大数据技术】本机DataGrip远程连接虚拟机MySQL/Hive

本机DataGrip远程连接虚拟机MySQL/Hive datagrip-2024.3.4VMware Workstation Pro 16CentOS-Stream-10-latest-x86_64-dvd1.iso写在前面 本文主要介绍如何使用本机的DataGrip连接虚拟机的MySQL数据库和Hive数据库,提高编程效率。 安装DataGrip 请按照以下步骤安装DataGrip软…

async-http-client使用示例

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 async-http-client是一个用于 Java 平台的高性能、非阻塞 HTTP 客户端库,它允许开发者以异步的方式发送 HTTP 请求并处理响应,从而提高应用程序的性能和响应性。 主要特点 异步处理&#xff…

蓝耘智算平台使用DeepSeek教程

目录 一.平台架构与技术特点 二、DeepSeek R1模型介绍与优势 DeepSeek R1 模型简介 DeepSeek R1 模型优势 三.蓝耘智算平台使用DeepSeek教程 展望未来 耘元生代智算云是蓝耘科技推出的一款智算云平台有着以下特点: 一.平台架构与技术特点 基于 Kubernetes 原…

Linux之安装docker

一、检查版本和内核是否合格 Docker支持64位版本的CentOS 7和CentOS 8及更高版本,它要求Linux内核版本不低于3.10。 检查版本 cat /etc/redhat-release检查内核 uname -r二、Docker的安装 1、自动安装 Docker官方和国内daocloud都提供了一键安装的脚本&#x…