Flink算子状态为何只能用ListState?

news/2024/10/21 22:44:33/

前言

Flink 将状态是否要按照 key 进行分类,将状态分为键值状态(Keyed State)和算子状态(Operator State)两种,两者除了状态本身的作用域不同外,其中算子状态的状态类型更是被 Flink 限制为 ListState,这是为什么呢?

使用算子状态

算子状态的作用域为当前 subTask,使用算子状态,Flink 算子的每个subTask只能访问当前subTask的数据,不能夸subTask访问。典型的应用场景就是 FlinkKafkaConsumer 使用算子状态保存 Kafka Topic 中的每个分区的消费偏移量。

在Flink中,要想使用算子状态,可以选择实现 CheckpointedFunction 接口

public interface CheckpointedFunction {void snapshotState(FunctionSnapshotContext var1) throws Exception;void initializeState(FunctionInitializationContext var1) throws Exception;
}
  • snapshotState Flink作业执行快照时调用该方法,开发者可以控制往ListState写入哪些数据
  • initializeState Flink作业启动或者异常容错从快照恢复时调用这个方法

Flink作业启动或异常恢复时会调用 CheckpointedFunction#initializeState,通过入参 FunctionInitializationContext 来获取算子状态。

要想获取算子状态,首先得先定义状态描述符,因为算子状态被强制限定为列表状态,所以只能用 ListStateDescriptor。然后通过入参 FunctionInitializationContext#getOperatorStateStore 对象来获取 ListState。

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {this.elementsState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor<>("elements", Integer.class));
}

算子状态实战

算子状态在业务场景中并不常用,除了 FlinkKafkaConsumer 使用算子状态保存 Kafka Topic 中分区的消费偏移量外,Sink 算子使用算子状态作为写出数据的缓冲区也是一个较为常用的场景。

MySQL 是常用的关系型数据库,在流计算场景中,它也是一种常用的数据汇存储引擎,用来保存流计算的结果。但是MySQL的写入TPS通常不高,一般在几百甚至几千,上万已经是很夸张了。但是Flink作为一款高性能的流计算引擎,动辄十万百万的TPS数据流入,如果计算结果每次都写入MySQL,势必会压垮MySQL。此时可以在 Sink 算子上使用算子状态作为缓冲区,先缓存一部分数据,最后再一次性批量写MySQL,以此来减轻MySQL的压力。

举个例子,现在有一个数据源,会源源不断的产生一批数字,现在要开发一个 Flink 作业,计算这些数字的和,然后把结果写入到 MySQL,为了减轻MySQL的写入压力,要求 Sink 算子可以缓冲一部分数据再批量写。

如下代码所示,SumResultBufferingSink 实现了CheckpointedFunction 接口,元素到达时会先写入缓冲区 elements,缓冲区满才会累计求和后写入MySQL。同时,在执行快照时,也会把elements缓冲区的数据写入到elementsState,异常恢复时,再将elementsState数据恢复到缓冲区。

public class OperatorStateFuature {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.fromElements(1, 2, 3, 4, 5, 6).keyBy(i -> "all").sum(0).addSink(new SumResultBufferingSink(3));environment.execute();}public static class SumResultBufferingSink implements SinkFunction<Integer>, CheckpointedFunction {private final int bufferSize;private final List<Integer> elements;private ListState<Integer> elementsState;public SumResultBufferingSink(int bufferSize) {this.bufferSize = bufferSize;this.elements = new ArrayList<>(bufferSize);}@Overridepublic void invoke(Integer value, Context context) throws Exception {elements.add(value);if (elements.size() >= bufferSize) {int sum = elements.stream().mapToInt(Integer::intValue).sum();System.err.println("write to db : sum=" + sum);elements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.err.println("---snapshotState start---");elementsState.clear();elementsState.addAll(elements);System.err.println("---snapshotState end---");}@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {System.err.println("---initializeState start---");this.elementsState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor<>("elements", Integer.class));// 是否从故障中恢复if (functionInitializationContext.isRestored()) {Iterator<Integer> iterator = elementsState.get().iterator();while (iterator.hasNext()) {elements.add(iterator.next());}}System.err.println("---initializeState end---");}}
}

使用 SumResultBufferingSink 后,缓冲区大小为3,六个元素只会写两次DB。

ListState和UnionListState

OperatorStateStore 提供了两个方法获取 ListState

public interface OperatorStateStore {<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
}

ListState和UnionListState 有什么区别呢?

两者的区别在于,快照恢复或者算子并行度发生改变时,算子状态值的分配方式是不同的。

  • ListState 采用平均分割分配,状态重新分配时,所有subTask的ListState会先合并到一起,再采用 Round-Robin 策略将列表中的状态分配到各个subTask
  • UnionListState 采用合并分配,状态重新分配时,所有subTask的ListState合并到一起得到一个完整的列表,再将这个完整的列表发给每个subTask。

Tips:UnionListState要慎用,当列表中的元素非常多时,有内存溢出的风险。

算子状态为什么限制ListState

回到开篇提出的问题,为什么Flink要限制算子状态只能使用 ListState 类型?

本质上,是Flink异常恢复,或者算子并行度发生变化时,算子状态数据如何分配的问题。最简单公平的分配算法就是平均分配,那么除了 ListState 这种列表类型,其它如 ValueState,MapState 等数据结构实在是不方便数据划分啊,所以Flink才限制算子状态必须是 ListState 类型。当然,Flink 也给了开发者两种选择,一是 ListState 的平均分配,二是 UnionListState 给你全量的状态,程序自己来分配,更加灵活。


http://www.ppmy.cn/news/1540918.html

相关文章

算法训练(leetcode)二刷第五天 | 242. 有效的字母异位词、349. 两个数组的交集、202. 快乐数、1. 两数之和

刷题记录 242. 有效的字母异位词349. 两个数组的交集202. 快乐数1. 两数之和 242. 有效的字母异位词 leetcode题目地址 简单题&#xff0c;哈希表。数组长度为常量&#xff0c;因此空间复杂度为O(1)。 时间复杂度&#xff1a; O ( n ) O(n) O(n) 空间复杂度&#xff1a; O…

# Excel 操作大全

Excel 操作大全 文章目录 Excel 操作大全单元格文本换行计算SUM 单元格 文本换行 设置自动换行&#xff0c;在文本前面使用 AltEnter键即可换行文本前面可以输入空格实现段前缩进的效果 计算SUM 求和函数

Linux——shell 编程基础

基本介绍 shell 变量 环境变量&#xff08;也叫全局变量&#xff09; 位置参数变量 预定义变量 运算符 条件判断 流程控制 if 单分支&多分支 case 语句 for循环 while 循环 read 读取控制台输入 函数 系统函数 basename 获取文件名 dirname 获取目录路径 自定义函数 综…

WebSocket Secure (WSS)

使用代理浏览器时&#xff0c;WebSocket Secure (WSS) 链接失败可能由以下原因引起&#xff1a; 代理设置问题&#xff1a; 确保代理配置正确&#xff0c;包括代理地址和端口。有些代理服务器不支持WebSocket连接&#xff0c;您需要确认您的代理服务是否支持WSS。 SSL/TLS 问题…

用动态IP软件改变IP地址:探索原理与实用指南‌

在数字时代&#xff0c;网络的普及让我们的生活与工作更加便捷&#xff0c;但同时也带来了一系列新的挑战。地域限制、反爬虫机制等问题逐渐凸显&#xff0c;成为了许多网络用户和企业在享受网络便利时必须面对的难题。为了解决这些问题&#xff0c;动态IP软件应运而生&#xf…

数据库权限提升GetShell

数据库提权总结 - 随风kali - 博客园 (cnblogs.com) MySQL 漏洞利用与提权 | 国光 (sqlsec.com) sql注入getshell的几种方式 第99天&#xff1a;权限提升-数据库提权&口令获取&MYSQL&MSSQL&Oracle&MSF SQL注入拿shell的方式应该是通用的得到连接数据库…

每日一题——第一百一十八题

题目&#xff1a;进制转换合集 #pragma once #include<stdio.h> #include<ctype.h> #include<stdbool.h> #include<string>/// <summary> /// 将字符串表示的任意进制数转为十进制 /// </summary> /// <param name"str">…

全面了解 NGINX 的负载均衡算法

NGINX 提供多种负载均衡方法&#xff0c;以应对不同的流量分发需求。常用的算法包括&#xff1a;最少连接、最短时间、通用哈希、随机算法和 IP 哈希。这些负载均衡算法都通过独立指令来定义&#xff0c;每种算法都有其独特的应用场景。 以下负载均衡方法&#xff08;IP 哈希除…