Flink 学习七 Flink 状态(flink state)

news/2025/1/31 0:46:45/

Flink 学习七 Flink 状态(flink state)

1.状态简介

流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据,

状态就是:用户在程序逻辑中用于记录信息的变量

在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行

1.1 row state

我们自定义变量来保存数据

public class _01_status_row {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);DataStream<String> dataStream = dataStreamSource.map(new MapFunction<String, String>() {//自己定义的 变量来保存中间值:这里就无法有效的持久化和恢复//状态: raw state  状态String oldString = "";//如何让flink 来托管我们的状态变量,完成持久化和恢复??@Overridepublic String map(String value) throws Exception {oldString = oldString + value;return oldString;}});dataStream.print();env.execute();}
}

1.2 flink state 托管状态

flink 提供了内置的状态数据管理机制,也叫状态机制: 状态一致性维护,状态数据的访问和存储;

1.3 恢复

Flink 任务是一个JOB .JOB 范围很多Task ,Task 对应示例subtask

是subtask 出错的时候,flink 底层会自动的从帮我们恢复task 的运行

如果是Job失败了 从 flink state 恢复,需要在特殊指定一些参数

2.状态分类

算子状态:

  • 每个subtask 自己持有一份独立的状态数据
  • 算子函数实现CheckpointFunction 后,既可使用算子状态
  • 算子状态: 一般是用于source算子中, 其他场景下建议使用keyedState (键控状态)

键控状态 Keyed State

  • 键控状态,只能使用于KeyedStream 的算子中
  • 算子为每一个key绑定一份独立的状态数据

更多的使用场景是键控状态 Keyed State

3.算子状态 Operator State

每个subtask 自己持有一份独立的状态数据;算子状态,在逻辑上,由算子 task下所有subtask共享;

如何理解:正常运行时,subtask自己读写自己的状态数据;而一旦job重启且带状态算子发生了并行度的变化,则之前的状态数据将在新的一批subtask 间均匀分配

在这里插入图片描述

public class _02_operator_flink_status {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//=============配置 ===============//需要开启 Checkpoint 机制env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);//需要开启持久化的路径  可选hdfs 本地env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout2/");//task级别的failover//一个task 失败 job 失败 ,有很多重启策略//env.setRestartStrategy(RestartStrategies.noRestart());//task 失败 重启最多3次 , 失败后1秒重启env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));//=============配置 ===============DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);DataStream<String> dataStream = dataStreamSource.map(new StateMapFunction());dataStream.print();env.execute();}
}class StateMapFunction implements MapFunction<String,String> , CheckpointedFunction {ListState<String> listState;//正常的处理逻辑@Overridepublic String map(String value) throws Exception {listState.add(value);Iterable<String> strings = listState.get();StringBuilder sb = new StringBuilder();for (String string : strings) {sb.append(string);}//写一个异常if(value.length()==5){int a = 1/ 0;}return sb.toString();}//持久化之前会调用的方法@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();System.out.println("执行快照!!!!!"+ checkpointId);}//算子的任务在启动之前,会调用下面的方法,为用户的状态初始化@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {//context 获取状态存储器OperatorStateStore operatorStateStore = context.getOperatorStateStore();//定义一个昨天存储结构的描述器ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);//获取状态存储器 中获取容器来存储器//getListState 方法还会加载之前存储的状态数据listState = operatorStateStore.getListState(listStateDescriptor);}
}

3.键控状态 Keyed State

3.1 基础概念

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-981L9koP-1687272668448)(flink7手绘/state_partitioning.svg)]1

不同点:

算子状态中,一个算子有一个状态存储空间

Keyed State:每个Key 都是有自己的状态存储空间

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wq7IOvvT-1687272668448)(flink7手绘/state_keyed.png)]

3.2 示例

public class _03_keyed_flink_status {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//需要开启 Checkpoint 机制env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);//需要开启持久化的路径  可选hdfs 本地env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout4/");//task级别的failover//一个task 失败 job 失败env.setRestartStrategy(RestartStrategies.noRestart());//task 失败 重启最多3次 , 失败后1秒重启env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);DataStream<String> dataStream = dataStreamSource.keyBy(x -> x).map(new KeyedStateMapFunction()).setParallelism(2);dataStream.print("===>").setParallelism(3);env.execute();}
}//flink 状态管理 算子需要实现CheckpointedFunction
class KeyedStateMapFunction extends RichMapFunction<String, String>{ListState<String> listState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext runtimeContext = getRuntimeContext();ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);listState = runtimeContext.getListState(listStateDescriptor);}//正常的处理逻辑@Overridepublic String map(String value) throws Exception {listState.add(value);Iterable<String> strings = listState.get();StringBuilder sb = new StringBuilder();for (String string : strings) {sb.append(string);}//写一个异常if(value.length()==5){int a = 1/ 0;}return sb.toString();}
}//======
[root@localhost ~]# nc -lk 9000
a
a
a
b
b
b
c
c
c
c
d
d
d控制台数据输出为
===>:2> a
===>:3> aa
===>:1> aaa
===>:1> b
===>:2> bb
===>:3> bbb
===>:1> c
===>:2> cc
===>:3> ccc
===>:1> cccc    ========> 每个key 都有一个自己的ListState<String> listState;

3.3 状态API 使用

class KeyedStateMapFunction_2 extends RichMapFunction<String, String>{ValueState<String> valueState;ListState<String> listState;MapState<String, String> mapState;ReducingState<Integer> reducingState;AggregatingState<Integer, Double> aggState;@Overridepublic void open(Configuration parameters) throws Exception {RuntimeContext runtimeContext = getRuntimeContext();//单值状态存储器valueState = runtimeContext.getState(new ValueStateDescriptor<String>("string", String.class));//列表状态存储器listState = runtimeContext.getListState(new ListStateDescriptor<>("list", String.class));//map 状态存储器mapState = runtimeContext.getMapState(new MapStateDescriptor<String, String>("map", String.class, String.class));//做累加 reducereducingState = runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduce", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1+value2;}}, Integer.class));//记录聚合状态  --> 平均值AggregatingState<Integer, Double> aggState = runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return Double.valueOf(accumulator.f1 / accumulator.f0);}//批处理会使用@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return Tuple2.of(a.f0 + b.f0, b.f0 + b.f1);}}, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})));}//正常的处理逻辑@Overridepublic String map(String value) throws Exception {//valueStatevalueState.update("new value");//更新值String value1 = valueState.value();//q取值//listStatelistState.add(value); //添加一个数据listState.addAll(Arrays.asList("1","2")); //添加多个数据listState.update(Arrays.asList("1","2")); //替换原有数据//mapStateIterable<String> keys = mapState.keys(); boolean contains = mapState.contains("1");mapState.put("1","2");  //添加数据Map<String,String> map = new HashMap<>();map.put("1","2");mapState.putAll(map);//批量添加数据//reducingState//做累加reducingState.add(Integer.valueOf(value));Integer integer = reducingState.get(); //取值//计算平均值aggState.add(Integer.valueOf(value));Double aDouble = aggState.get();//取值return value1;}
}

3.4 状态的TTL 管理

        RuntimeContext runtimeContext = getRuntimeContext();//单值状态存储器ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string", String.class);//存活时间和过期 参考StateTtlConfig build = StateTtlConfig.newBuilder(Time.milliseconds(5000))  //数据存活时间.setTtl(Time.milliseconds(5000)) //数据存活时间 和上面效果一样.updateTtlOnCreateAndWrite() //插入和更新时 TTL 重新计算存活时间.updateTtlOnReadAndWrite()  //读或者写 TTL 重新计算存活时间  //比如List 是单条数据  Map 则是一个Key value 是一个单独的TTL.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //返回已经过期的数据.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //没清楚可以返回过期数据.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)//TTL处理时间语义.useProcessingTime() //效果同上.cleanupFullSnapshot()//清理过期状态数据 在checkpoint 的时候.cleanupInRocksdbCompactFilter(1000) //只对rocksdb 生效 在rockdb Compact机制在Compact 时过期时间清理.build();valueStateDescriptor.enableTimeToLive(build);valueState = runtimeContext.getState(valueStateDescriptor);

4.状态后端

4.1 基础概念

状态数据的存储管理的实现,状态数据的本地读写,远端快照数据存储

状态后端是可插拔替换的,它对上层屏蔽了底层的差异,因为在更换状态后端时,用户的代码不需要做任何更改

4.2 可用的状态后端

  • HashMapStateBacked

    • heap 堆内存,溢出的话就是本地磁盘,对象的形式存在
    • 大规模数据内存不够会溢出到磁盘
    • 支持大规模数据状态,若有溢出到磁盘,则效率会明显降低
  • EmbeddedRocksDBStateBackend

    • 数据状态交给RocksDb 管理和存储
    • 数据是序列化的KV 字节存储 ,
    • RocksDb 中的数据,会存在内存缓存和磁盘
    • RocksDb 对磁盘数据读取较快,性能不会有较大印象

    两种状态后端策略 生成快照checkpoint 文件是一样的 ,重启后改变StateBacked 可以兼容运行;程序在重启后改变状态后端的方式不影响程序运行;

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

状态后端的方式不影响程序运行;**

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

http://www.ppmy.cn/news/490755.html

相关文章

PHP 使用 QueryList + Redis 批量下载壁纸

完整代码&#xff1a;https://github.com/her-cat/wallpaper_crawler 页面分析 壁纸站地址&#xff1a;http://www.huanse.net/ 打开网站首页&#xff0c;通过审查元素找到详情页面和略缩图的地址&#xff08;下图红框部分&#xff09;。 - 详情页面地址&#xff1a; ./wal…

利用站点抓取功能批量下载壁纸

IDM是一款简单、易上手的下载器&#xff0c;&#xff0c;使用IDM下载器可以进行文档、图片、网页等等类型文件的下载&#xff0c;十分方便快捷&#xff0c;可以极大地便利我们的生活与工作。电脑壁纸可以美化桌面&#xff0c;优化我们的使用体验。那么&#xff0c;当你有多张壁…

支持向量机(Support Vector Machine, SVM)从线性分类到核函数扩展

目录 1. 线性分类与最大间隔2. 支持向量3. 软间隔与惩罚因子4. 核函数扩展5. SVM的优缺点6. SVM代码示例 支持向量机&#xff08;Support Vector Machine&#xff0c;简称SVM&#xff09;是一种强大而广泛应用的监督学习算法&#xff0c;用于分类和回归任务。本文将深入解析SVM…

CMD命令查看IP地址

想要查看你的IP地址&#xff0c;很简单&#xff0c;只需要输入一个命令即可。打开C:\Windows\System32\cmd.exe&#xff0c;输入以下命令&#xff1a; ipconfig&#xff08;下面的保密&#xff0c;是隐私&#xff0c;不要偷看&#xff01;&#xff09;如果想要看全的&#xff…

如何使用cmd查看本机IP地址

1、打开Command Prompt&#xff08;命令提示符界面&#xff09; 方法一&#xff1a;在开始菜单搜索框输入cmd→Command Prompt&#xff08;命令提示符&#xff09; 方法二&#xff1a;按【WinR】快捷键→输入cmd→OK 2、输入查询命令ipconfig查看

CMD查找域名对应的IP地址

进入cmd终端 在终端输入nslookup Address就会显示你的ip 以百度为例

cmd查看IP地址指令

ipconfig /all ipconfig /all

cmd下查询公网ip地址

telnet cip.cc 另&#xff1a;tracert http://www.baidu.com 你可以看到 你公网地址的网关但看不到你当前的公网地址 nslookup命令可以查看DNS配置