1、KeyedState 用例
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _01_KeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;private ListState<String> listState;private ReducingState<Integer> reducingState;private AggregatingState<Integer, String> aggregatingState;private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("listState", String.class);ReducingStateDescriptor<Integer> reducingStateDescriptor = new ReducingStateDescriptor<Integer>("reduceingState", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class);AggregatingStateDescriptor<Integer, Integer, String> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("aggregatingState", new AggregateFunction<Integer, Integer, String>() {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(Integer value, Integer accumulator) {return value + accumulator;}@Overridepublic String getResult(Integer accumulator) {return "res=>" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}, Integer.class);MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Integer.class);valueState = getRuntimeContext().getState(valueStateDescriptor);listState = getRuntimeContext().getListState(listStateDescriptor);reducingState = getRuntimeContext().getReducingState(reducingStateDescriptor);aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);} else {res.f1 += 1;valueState.update(res);}listState.add(input);reducingState.add(1);aggregatingState.add(1);if (mapState.contains(input)) {Integer beforeNum = mapState.get(input);mapState.put(input, beforeNum + 1);} else {mapState.put(input, 1);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>"+valueState.value());valueState.clear();System.out.println("listState=>"+listState.get());listState.clear();System.out.println("reduceState=>"+reducingState.get());reducingState.clear();System.out.println("aggregatingState=>"+aggregatingState.get());aggregatingState.clear();System.out.println("mapState=>"+mapState.entries().toString());mapState.clear();}});env.execute();}
}
输入与输出:
依次输出:a,b,c,a,b,c预期输出结果:每个key的数据明细 listState:[a,a],[b,b],[c,c]每个key的数量带key valueState:[a,2],[b,2],[c,2]每个key的数量不带key reduceState:2,2,2每个key的数量不带key,且输入和输出数据类型不同 aggregatingState:res=>2,res=>2,res=>2每个key的数量带key mapState:[a,2],[b,2],[c,2]实际输出结果:valueState=>(a,2)valueState=>(b,2)valueState=>(c,2)listState=>[a, a]listState=>[b, b]listState=>[c, c]reduceState=>2reduceState=>2reduceState=>2aggregatingState=>res=>2aggregatingState=>res=>2aggregatingState=>res=>2mapState=>[a=2]mapState=>[b=2]mapState=>[c=2]
2、KeyedStateTTL
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class _02_KeyedStateTTL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);source.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {private ValueState<Tuple2<String, Integer>> valueState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("valueState", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 只支持 KeyedState 的 TTL// 只支持 processing time 的 TTLStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(5))// TTL 的更新策略// StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新// StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)// 状态的可见性// StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据// StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 状态的清理策略,默认过期数据会在读取的时候被删除// cleanupFullSnapshot() 全量快照时进行清理// cleanupIncrementally(10, true) 增量数据清理// cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) RocksDB 压缩过滤器.cleanupFullSnapshot().build();valueStateDescriptor.enableTimeToLive(stateTtlConfig);valueState = getRuntimeContext().getState(valueStateDescriptor);}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {Tuple2<String, Integer> res = valueState.value();if (res == null) {res = new Tuple2<>(input, 1);valueState.update(res);out.collect(res);} else {res.f1 += 1;valueState.update(res);out.collect(res);}}@Overridepublic void close() throws Exception {System.out.println("valueState=>" + valueState.value());valueState.clear();}}).print("res=>");env.execute();}
}