12、Flink 的 Keyed State 代码示例

server/2024/9/20 7:22:56/ 标签: flink, 大数据

1、KeyedState 用例

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _01_KeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;private ListState<String> listState;private ReducingState<Integer> reducingState;private AggregatingState<Integer, String> aggregatingState;private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("listState", String.class);ReducingStateDescriptor<Integer> reducingStateDescriptor = new ReducingStateDescriptor<Integer>("reduceingState", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class);AggregatingStateDescriptor<Integer, Integer, String> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("aggregatingState", new AggregateFunction<Integer, Integer, String>() {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(Integer value, Integer accumulator) {return value + accumulator;}@Overridepublic String getResult(Integer accumulator) {return "res=>" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}, Integer.class);MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Integer.class);valueState = getRuntimeContext().getState(valueStateDescriptor);listState = getRuntimeContext().getListState(listStateDescriptor);reducingState = getRuntimeContext().getReducingState(reducingStateDescriptor);aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);} else {res.f1 += 1;valueState.update(res);}listState.add(input);reducingState.add(1);aggregatingState.add(1);if (mapState.contains(input)) {Integer beforeNum = mapState.get(input);mapState.put(input, beforeNum + 1);} else {mapState.put(input, 1);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>"+valueState.value());valueState.clear();System.out.println("listState=>"+listState.get());listState.clear();System.out.println("reduceState=>"+reducingState.get());reducingState.clear();System.out.println("aggregatingState=>"+aggregatingState.get());aggregatingState.clear();System.out.println("mapState=>"+mapState.entries().toString());mapState.clear();}});env.execute();}
}

输入与输出

 依次输出:a,b,c,a,b,c预期输出结果:每个key的数据明细 listState:[a,a],[b,b],[c,c]每个key的数量带key valueState:[a,2],[b,2],[c,2]每个key的数量不带key reduceState:2,2,2每个key的数量不带key,且输入和输出数据类型不同 aggregatingState:res=>2,res=>2,res=>2每个key的数量带key mapState:[a,2],[b,2],[c,2]实际输出结果:valueState=>(a,2)valueState=>(b,2)valueState=>(c,2)listState=>[a, a]listState=>[b, b]listState=>[c, c]reduceState=>2reduceState=>2reduceState=>2aggregatingState=>res=>2aggregatingState=>res=>2aggregatingState=>res=>2mapState=>[a=2]mapState=>[b=2]mapState=>[c=2]

2、KeyedStateTTL

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class _02_KeyedStateTTL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 只支持 KeyedState 的 TTL// 只支持 processing time 的 TTLStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(5))// TTL 的更新策略// StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新// StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)// 状态的可见性// StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据// StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 状态的清理策略,默认过期数据会在读取的时候被删除// cleanupFullSnapshot() 全量快照时进行清理// cleanupIncrementally(10, true) 增量数据清理// cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) RocksDB 压缩过滤器.cleanupFullSnapshot().build();valueStateDescriptor.enableTimeToLive(stateTtlConfig);valueState = getRuntimeContext().getState(valueStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);out.collect(res);} else {res.f1 += 1;valueState.update(res);out.collect(res);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>" + valueState.value());valueState.clear();}}).print("res=>");env.execute();}
}

http://www.ppmy.cn/server/27716.html

相关文章

Java中的模版方法设计模式详解

Java中的模版方法设计模式详解 在Java编程中&#xff0c;设计模式是一种解决常见问题的最佳实践。其中&#xff0c;模版方法设计模式是一种行为设计模式&#xff0c;它定义了一个操作中的算法骨架&#xff0c;而将一些步骤延迟到子类中。这样可以使子类在不改变算法结构的情况…

社交媒体数据恢复:Soul

Soul数据恢复方法 在Soul这款社交软件中&#xff0c;如果您的聊天记录不小心被删除&#xff0c;是否还能恢复呢&#xff1f;以下是根据搜索结果整理出的Soul数据恢复方法。 方法一&#xff1a;使用Soul的备份和恢复功能 开启备份功能&#xff1a;首先&#xff0c;您需要确保…

PS证件照

证件照尺寸 小一寸&#xff1a;2.2cm*3.3cm 一寸&#xff1a;2.5cm*3.5cm 像素413*295 &#xff08;分辨率为300像素/英寸&#xff09; 比例5&#xff1a;7 二寸&#xff1a;3.5cm*4.9cm 二寸照相比例是4&#xff1a;3&#xff0c;像素是626*413 蓝底&#xff1a;R&a…

Java面试八股之简述Java中assert的作用

简述Java中assert的作用 Java中的assert关键字用于在代码中插入断言&#xff08;Assertion&#xff09;&#xff0c;断言是一种在开发和测试阶段用于验证程序内部状态或假设的机制。其主要作用包括&#xff1a; 条件检查&#xff1a; assert语句用于在特定代码点上检查一个布…

【prometheus】监控MySQL并实现可视化

目录 一、概述 1.1下载解压mysqld_exporter 1.2创建MySQL授权用户 1.3配置my.cnf 1.4启动mysqld_exporter 1.5prometheus配置修改 二、Grafana展示 【Prometheus】概念和工作原理介绍_prometheus工作原理 【Prometheus】k8s集群部署node-exporter 【prometheus】k8s集…

OneFlow快速上手:深度学习初学者必备【AI写作】

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

Mysql优化之参数调优

前言 MySQL参数优化是针对数据库配置参数的调整和优化&#xff0c;以提高数据库系统的性能、稳定性和可用性。它和Mysql的应用环境例如项目的用户量在线情况、访问情况、存储资源量等以及服务硬件配置都有关系&#xff0c;优化也不可能一次性完成&#xff0c;需要不断的观察以…

Flink 实时数仓(一)【实时数仓离线数仓对比】

前言 昨天技术面的时候&#xff0c;面试官说人家公司现在用的都是最新的技术&#xff0c;比如 Doris 等一些最新的工具&#xff0c;确实这些课是学校永远不会开设的&#xff0c;好在他说去了会带着我做一做。可是 ...... 学院这边确实不允许放人&#xff0c;唉&#xff0c;可惜…

qt对话框功能介绍

1、颜色对话框 //方式一QColor color QColorDialog::getColor(Qt::red, this, QString::fromLocal8Bit("颜色对话框"),QColorDialog::ShowAlphaChannel);qDebug() <<"color:" <<color;//方式二QColorDialog dialog(Qt::red, this); // 创建对…

3.9设计模式——Strategy 策略模式(行为型)

意图 定义一系列的算法&#xff0c;把它们一个个封装起来&#xff0c;并且使他们可以相互替换此模式使得算法可以独立于使用它们的客户而变化 结构 Strategy&#xff08;策略&#xff09;定义所有支持的算法的公共入口。Context使用这个接口来调用某ConcreteStrategy定义的方…

口袋实验室--使用AD2高效调试IIC、UART、SPI等低速接口

目录 1. 简介 2. 调试过程 2.1 简要步骤 2.2 Si5338 寄存器配置流程 2.3 AD2的基本配置 2.4 检查Si5338状态 2.5 配置Si5338寄存器 2.6 保存Si5338寄存器 3. 总结 1. 简介 使用Digilent Analog Discovery 2进行调试不仅提升了工作效率&#xff0c;而且极大地简化了常…

AI大模型探索之路-训练篇9:大语言模型Transformer库-Pipeline组件实践

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…

Gson打印按照想要的key顺序

默认大家都知道这个吧&#xff1f; val gson GsonBuilder().setPrettyPrinting().create() log(gson.toJson(bean))它是用于将对象bean&#xff0c;转成json以后&#xff0c;能够比较漂亮的打印出json的结构。我常用的是如下4个函数。 //就是jsonStr&#xff0c;使用该函数来…

Android 在AMS中拦截某个指定Activity的启动

文章目录 Android在AMS中拦截某个具体Activity的启动方案一&#xff08;推荐&#xff09;&#xff1a;在ActivityTaskManagerService.startActivityAsUser方法中去作拦截方案二&#xff1a;在Dialog.show()方法中直接对这个包名所创建的Dialog做限制 Android在AMS中拦截某个具体…

python 注释符

4、注释 1、单行注释 单行注释用于解释代码中的一行或一小段代码。 在Python中&#xff0c;单行注释以#开头&#xff0c;后面的内容都是注释。 单行注释可以放在代码的任何位置&#xff0c;但通常放在代码行的上方或旁边。 单行注释不会影响代码的执行&#xff0c;Python解…

C++中的右值引用和移动语义

1 左值引用和右值引用 传统的C语法中就有引用的语法&#xff0c;而C11中新增了的右值引用语法特性&#xff0c;所以从现在开始我们之前学习的引用就叫做左值引用。无论左值引用还是右值引用&#xff0c;都是给对象取别名。 什么是左值&#xff1f;什么是左值引用&#xff1f; 左…

内网端口转发与代理

思路&#xff1a;渗透的前提是双方能够建立通信。目前无法和win7建立通信&#xff0c;但是拿到了windows2003的权限&#xff0c;所以可以在Windows2003主机上面建立节点&#xff0c;作为跳板机去访问到内网。 目前状态&#xff1a;控制win2003&#xff08;IP&#xff1a;192.1…

上位机图像处理和嵌入式模块部署(树莓派4b利用驱动实现进程数据共享)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们讨论过&#xff0c;目前在linux系统上面有很多办法可以实现多进程数据共享。这里面比如说管道&#xff0c;比如说共享内存&#xff0c;比如…

react 笔记

vscode bookmarks react源码 v18.2.0 jsx语法转换成reactelement reactapi npx creatte-react-app 项目名称&#xff08;myreactapp&#xff09; cd 项目名称&#xff08;myreactapp&#xff09; npm start启动项目 看index.js文件 React ReactDom App 看App.js文件 react有…

【Unity动画系统】详解Root Motion动画在Unity中的应用(二)

Root Motion遇到Blend Tree 如果Root Motion动画片段的速度是1.8&#xff0c;那么阈值就要设置为1.8&#xff0c;那么在代码中的参数就可以直接反映出Root Motion的最终移动速度。 Compute Thresholds&#xff1a;根据Root Motion中某些数值自动计算这里的阈值。 Velocity X/…