Flink有状态计算

news/2024/10/17 14:31:25/

前言

状态是什么?状态就是数据,准确点说,状态是指 Flink 作业计算时依赖的历史数据或中间数据。如果一个 Flink 作业计算依赖状态,那它就是有状态计算的作业,反之就是无状态计算的作业。

举个例子,服务端应用为了方便扩缩容,一般会设计成无状态的,但是对外服务的接口又是有状态的,这是因为服务端应用本身不存储数据,数据存储在关系型或非关系型数据库中,此时的“状态”就从服务端迁移到数据库中了。

Flink 同理,一个稍微复杂一点的作业,基本都会使用到状态。Flink 作为一款强大的开源流处理框架,以其卓越的性能和丰富的功能备受瞩目,如何实现状态的高效访问和容错恢复,是 Flink 不得不解决的问题。

Flink有状态计算方案

Flink 是不是也可以效仿服务端应用,把状态数据存储在数据库中呢?这么做当然可以,但是会存在以下几个问题:

  • 数据库种类这么多,Flink 难以适配所有数据库,且容错恢复的成本很高
  • 开发者使用状态,必须了解状态存储的细节,使用门槛较高
  • 状态访问难以形成统一的接口,徒增使用门槛
  • 数据库的访问性能会增加 Flink 作业的延迟

以上这些问题里,最最重要的是性能问题。在大数据流处理场景中,处理的数据量是非常庞大的,单单是动辄几十万甚至百万的TPS,就不是传统数据库能承受的,况且还要考虑到Flink和数据库交互产生网络IO的额外开销。

基于这些问题,Flink 自己实现了一套状态的访问和存储方案:

  • 状态本地化 如果Flink通过网络去访问状态,必然会导致较高的延迟和低吞吐问题。Flink 直接状态本地化,将状态存储在subTask本地内存或磁盘上,这样就可以将状态的访问耗时从毫秒级直接优化到微妙甚至纳秒级,实现状态的极致访问速度。
  • 一致性快照实现容错 传统的有状态计算方案,为了实现异常容错时的数据处理和状态结果满足精准一次的一致性要求,往往会使用事务机制,大大增加用户的开发成本。Flink 自身实现了状态一致性的异常容错的逻辑,用户无需参与。Flink 以 Chandy-Lamport 分布式系统快照算法作为理论基础,实现了名为 Checkpoint 的分布式轻量级异步快照,保证了精确一次的数据处理和一致性状态,数据既不会多算,也不会少算。
  • 统一的状态访问接口 Flink提供了一套统一的状态访问接口,用户基于这套接口,不但能享受状态本地化带来的极致的访问速度,还够得到状态持久化和一致性快照带来的异常容错场景下精确一次的数据处理保证。

状态接口

Flink 状态的顶层接口是org.apache.flink.api.common.state.State,基于此派生出五个常用的子接口。

画板

  • ValueState 用于存储单个值的状态接口
  • MapState 用于存储键值对的状态接口
  • ListState 用于存储列表值的状态接口
  • ReducingState 用于存储归约状态的接口,添加进去的状态会先经过ReduceFunction和旧值进行归约计算并保存
  • AggregatingState 用于存储归约状态的接口,添加进去的状态会先经过AggregateFunction和旧值进行归约计算并保存,和ReducingState的区别是中间数据可以和输入数据类型不一致

键值状态和算子状态

Flink 将状态是否要根据Key分组,将状态划分为 **键值状态(Keyed State)算子状态(Operator State)**两类。

键值状态只能在 KeyedStream 上使用,数据先经过 keyBy 分组,相同key的数据共享同一个键值状态。算子状态的作用范围是当前subTask,同一个subTask共享同一个算子状态。另外,键值状态支持的状态类型更丰富,算子状态只支持 ListState 状态类型,这主要是为了算子并行度发生变化时方便状态的重分配。

要想使用算子状态,只需要在 KeyedStream 上应用 ProcessFunction,通过RuntimeContext 获取状态对象来访问状态即可。

keyedStream.process(new ProcessFunction<Integer, Integer>() {ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {// 获取状态对象sumState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("sum", Integer.class));}@Overridepublic void processElement(Integer value, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {sumState.value();// 访问状态sumState.update();// 更新状态}
})

算子状态可以在任意算子中使用,但是被限制只能用 ListState 状态类型。要使用算子状态,要实现 CheckpointedFunction 接口,通过重写 initializeState() 来恢复算子状态,重写 snapshotState() 在执行快照时存储状态。

public class MyProcess implements CheckpointedFunction {ListState<Integer> listState;@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {// 执行快照,存储状态listState.add();}@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {listState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>("elements", Integer.class));// 从异常中恢复状态if (functionInitializationContext.isRestored()) {Iterator<Integer> iterator = listState.get().iterator();while (iterator.hasNext()) {iterator.next();}}}
}

状态后端

Flink 状态本地化后,状态直接存储在subTask内存或本地磁盘中,避免了通过网络来访问状态,实现了极致的访问速度。但是随之而来的问题就是,subTask 崩溃后的数据容错和恢复。Flink 基于 Chandy-Lamport 分布式系统快照算法实现了名为 Checkpoint 的分布式轻量级异步快照,Flink 会周期性的触发 Checkpoint 操作,将subTask本地的状态数据持久化到远程分布式文件系统中,这个部分被 Flink 设计成可插拔的组件:后端组件(State Backend)。

下面是 Flink 支持的几种常用 State Backend:

  • HashMapStateBackend 底层使用哈希表将状态数据存储在subTask内存中,状态的访问效率特别高,但是受限于机器自身的内存限制,存储的状态数据量有限。
  • EmbeddedRocksDBStateBackend 将状态数据存储到内嵌的 RocksDB 数据库中,RocksDB是Facebook基于levelDB使用C编写的嵌入式K-V存储引擎,因为数据是保存在磁盘上的,它的状态访问性能虽然不如HashMapStateBackend,但它的存储能力是惊人的,甚至可以达到TB级别,非常适合处理大状态、长窗口的有状态计算作业,Checkpoint 时将数据快照写入远程分布式文件系统。
  • FsStateBackend 基于文件系统的状态后端,subTask将数据存储在内存中,Checkpoint 时将数据快照写入远程分布式文件系统。

以 FsStateBackend 为例,在作业中指定状态后端的示例代码如下:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint频率
environment.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 状态后端配置
environment.setStateBackend(new FsStateBackend("file:///Users/panchanghe/temp/flink/state"));

尾巴

Flink 提供了一套统一且易用的状态接口API,基于这套接口开发者可以方便地开发出一个精准处理一次的有状态计算作业。Flink 通过将状态本地化,实现了极致的状态访问速度,避免了通过网络访问状态数据导致的高延时和低吞吐的问题。为了实现数据的精准一次处理,保证数据的不多算也不少算,Flink 实现了 Checkpoint 轻量级分布式快照算法,通过定时把subTask本地的状态数据持久化到远程的分布式文件系统来实现异常容错恢复。


http://www.ppmy.cn/news/1539727.html

相关文章

Springboot连接多数据库

需求&#xff1a;springboot后台需要访问其他数据库的数据&#xff0c;同时操作多个数据库的数据。 pom.xml文件配置&#xff1a; <dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId&…

PostgreSQL技术内幕14:从插件来看PG扩展性-FDW插件

文章目录 0.简介1.FDW介绍2.使用方式2.1 创建过程2.1.1 创建插件2.1.2 创建 Foreign Server2.1.3 创建 User Mapping(外部服务器映射&#xff0c;本地文件可以不需要&#xff09;2.1.4 创建外部表 2.2 查询流程 3.源码分析3.1 扩展接口分析3.2 和其他部分关联3.2.1 和计划的关联…

020_FEM_Meshing_in_Matlab工具箱PDE之网格划分

Matlab FEM系列 PDE工具箱的网格数据 PDE工具箱对2D几何体&#xff0c;采用三角形网格&#xff0c;对于3D几何体采取四面体网格。 在这两种情况下&#xff0c;网格单元的可以采取二次单元也可以采用一次单元&#xff08;线性&#xff09;。这两个概念在有限元中间指的都是插值…

客户端、PC端、移动端集成发票真伪验真API接口返回值说明

客户端、pc端、移动端集成翔云发票查验接口&#xff0c;在现如今财务管理喝税务合规越来越严的背景下&#xff0c;集成发票真伪验真接口有助于提升财务工作效率&#xff0c;防止税务风险&#xff0c;简化发票管理流程&#xff0c;提升发票管理与报销效率&#xff0c;合规管理税…

Python办公自动化案例:实现word表格转换成Excel表格

案例:通过Python实现word表格转换成Excel表格。 准备工作:一份word文件,里面存放word表格。 Python提供了多种方法来实现Word表格的批量转换成Excel表格,这通常涉及到读取Word文档中的表格数据,然后将这些数据写入到Excel文件中。要通过Python实现Word表格批量转换成Exce…

【牛客刷题】笔记1

目录 1、数组中两个字符串的最小距离 2、dd爱框框 3、除2&#xff01; 1、数组中两个字符串的最小距离 数组中两个字符串的最小距离_牛客题霸_牛客网 (nowcoder.com) 我们可以使用一个i来遍历数组&#xff0c;并用prev1来记录字符串1在上一次出现的下标&#xff0c;prev2记…

统一修改UI库样式的几种方式

统一修改element组件库样式的几种方式。主题 | Element Plus 通过css变量设置 【CSS扩展】VUE如何使用或修改element plus中自带的CSS全局变量来定义样式:root {--hc-text-color-placeholder: #5f84a2;--hc-text-color-regular: #fff;--hc-text-color-primary: #fff;--hc-bg-c…

开源视觉大模型的部署与应用测试实验

继去年的大模型对外提供的三种应用服务blog基础上,根据对文本生成大模型的研究,基本上实现了本地部署,应用服务设计实现,文本大模型微调等工作。最近结合实际需求,开展了图像识别,特别是图像生成文本,图像整体描述,图像要素描述,图像属性描述,多张图像对比和图像文字…