Flink学习连载文章10--CheckPoint和维表Join

news/2024/11/30 9:33:52/

一、Checkpoint

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

2、设置Checkpoint

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 09:18:30**/
public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

测试代码效果:启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");在设置检查点之前,设置一句这样带权限的语句,如果是集群运行,不存在该问题。可以不设置!!!

查看快照情况:

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

启动HDFS、Flink

[root@hadoop10 app]#start-dfs.sh
[root@hadoop10 app]#start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:输入一个hello,1 得到新的结果hello,8

3、重启策略

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

就是一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4J的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

那为什么开启检查点之后,报错了程序还在运行?因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

程序如果上传至服务器端运行,可以看到重启状态

完整代码如下:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 09:13:31**/
public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

在本地测试,是没有办法看到重试机制的现象的,需要打包上传至集群,特别注意:使用的类名到底是哪一个。

4、savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1.提交一个flink job --提交的还是重启策略的代码打成的jar包

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

2.输入一些数据,观察单词对应的数字的变化

3.执行savepoint操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

4.查看最近完成的flink job对应的savepoint

5.根据之前的savepoint路径,重新启动flink job

flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
Job has been submitted with JobID 3766ec9ff6f34b46376493a04b50a1f4

再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:

这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

二、维表join 【重要】

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果,那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。维表一般的特点是变化比较慢。

需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。

期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京

在MySQL创建城市表:

create table city(city_id varchar(50) primary key,city_name varchar(50) 
);
insert into city values('1001','北京'),('1002','上海'),('1003','郑州') ;

1、 预加载维表

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在kafka流map()方法中与维表数据进行关联。

RichMapFunction中open方法里加载维表数据到内存的方式特点如下:

  • 优点:实现简单
  • 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。另外,维表是变化慢,不是一直不变的,只是变化比较缓慢而已。
package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/
public class _04PreLoadDataDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Map<Integer,String> cityMap = new HashMap<Integer,String>();Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){int cityId =  resultSet.getInt("city_id");String cityName =  resultSet.getString("city_name");cityMap.put(cityId,cityName);}}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");System.out.println("+++++++++++++++" +cityMap);String cityName = cityMap.get(Integer.valueOf(arr[1]));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
在黑窗口输入:
张三,1001
李四,1001
王五,1002

那如果数据多了怎么办,数据更新了怎么办?可以进行查询,代码示例如下:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/
public class _05SelectDBDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select city_name from city where city_id = ? ");}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");statement.setInt(1,Integer.valueOf(arr[1]));ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){cityName = resultSet.getString("city_name");}return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

以上做法成功解决了我们以前的两个问题:数据更新怎么办,数据多了怎么办。

但是缺点是每次都得查询数据库,非常不方便。

以上两个版本使用的是socket进行演示的,以下随堂代码是使用kafka演示的,不太一样。

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo03 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  每一次都从数据库中查询一下:*  优点是:假如数据库中的数据有更新,每次都可以查询到最新的数据*  缺点是:每次都查询数据库,假如kafka中的数据特别多,就会查询数据库多次,效率低*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ? ");}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];statement.setString(1,cityCode);ResultSet resultSet = statement.executeQuery();String cityName = "";if(resultSet.next()){cityName = resultSet.getString("city_name");}return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo04 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  使用hashmap*  将数据库中的数据只查询一次,放入map集合,map号称本地缓存*  优点:查询数据库只查询一次,每次数据都可以直接从map中获取,效率高*  缺点:假如数据库中的数据更新了,map缓存的数据是没有办法更新的,而且假如数据库中的数据特别多,全部加载到map中会导致堆内存爆炸 OOM*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;Map<String,String> hashMap = new HashMap<String,String>();@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){String cityCode = resultSet.getString("city_id");String cityName = resultSet.getString("city_name");hashMap.put(cityCode,cityName);}}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = hashMap.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

2、 热存储维表

以前的方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

  • 优点:维度数据量不受内存限制,可以存储很大的数据量。
  • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

(1) 使用cache来减轻访问压力

可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用Guava Cache。

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/public class _06GuavaCacheDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将程序的并行度设置为1,能够更好的展示缓存效果env.setParallelism(1);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Connection connection;PreparedStatement statement;// 定义一个CacheLoadingCache<Integer, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(10, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<Integer, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(Integer cityId) throws Exception {System.out.println("进入数据库查询啦。。。。。。。");statement.setInt(1,cityId);ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){System.out.println("进入到了if中.....");cityName = resultSet.getString("city_name");}return cityName;}});// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select city_name from city where city_id = ? ");}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");String cityName = "" ;if (cache.get(Integer.valueOf(arr[1])) != null){cityName = cache.get(Integer.valueOf(arr[1]));}return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

设置的guawa缓存是每一个分区都有一个缓存,多个分区之间缓存不共享。所以你需要把并行度设置为1,方便查看效果。

随堂代码演示:终极版本:

package com.bigdata.day06;import avro.shaded.com.google.common.cache.*;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo05 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  第三个版本:使用guawaCache [google的技术]*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);System.out.println(streamSource.getParallelism());streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;// 定义一个CacheLoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ?");cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(100, TimeUnit.SECONDS)//指定移除通知/*.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {// 在这个需求中,我们没有要删除的数据,所以这个代码可以删除掉,没有意义,但是为了学习,保留了。System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}})*/.build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {System.out.println("进入数据库查询啦。。。。。。。");statement.setString(1,cityId);ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){System.out.println("进入到了if中.....");cityName = resultSet.getString("city_name");}return cityName;}});}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = cache.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

http://www.ppmy.cn/news/1551157.html

相关文章

auto与decltype

auto: 1.定义&#xff1a; 在C中&#xff0c; auto 是一个类型说明符&#xff0c;它让编译器在编译阶段自动推导变量的类型&#xff0c;其类型取决于初始化表达式的类型。auto 在声明变量时使用&#xff0c;编译器会根据变量初始化表达式自动推断类型。 #include<iostrea…

IvorySQL与pg_failover_slot插件:如何实现逻辑复制槽的高可用主备同步

前言 在数据库高可用架构中&#xff0c;逻辑复制是实现数据同步和扩展的重要机制之一。通过逻辑复制&#xff0c;数据库管理员可以选择性地复制特定表的数据&#xff0c;而不必像物理复制那样进行全量数据库实例的复制。然而&#xff0c;逻辑复制槽的局限性在于它仅存在于主节…

QT的exec函数

在Qt框架中&#xff0c;exec()方法是QDialog类&#xff08;及其子类&#xff09;的一个成员函数&#xff0c;用于以模态&#xff08;modal&#xff09;方式显示对话框。当exec()被调用时&#xff0c;它会启动一个局部的事件循环&#xff0c;这个循环会阻塞对对话框之外的其他窗…

实时数据开发 | Flink反压机制原因、影响及解决方案

今天是很忙碌的一天哦&#xff0c;有两个业务在催着验收&#xff0c;终于21&#xff1a;45卡点交上去了。 明早再修修补补一下应该就可以开始做实时方面的需求了&#xff0c;小紧张&#xff0c; 今天同事在同步会上讲这块业务的数据流时就提到了checkpoint和savepoint还有流处理…

深入讲解Spring Boot和Spring Cloud,外加图书管理系统实战!

很抱歉&#xff0c;我的疏忽&#xff0c;说了这么久还没有给大家详细讲解过Spring Boot和Spring Cloud,那今天给大家详细讲解一下。 大家可以和下面这三篇博客一起看&#xff1a; 1、Spring Boot 和 Spring Cloud 微服务开发实践详解https://blog.csdn.net/speaking_me/artic…

Vite 6 正式发布

11 月 26 日&#xff0c;Vite 6.0 正式发布。自一年前 Vite 5 发布以来&#xff0c;Vite 的采用率持续增长&#xff0c;每周 npm 下载量已从 750 万次跃升至 1700 万次。与此同时&#xff0c;Vitest 不仅越来越受到用户的青睐&#xff0c;并开始逐步建立起属于自己的生态系统。…

MySQL隐式转换造成索引失效

一、什么是 MySQL 的隐式转换&#xff1f; MySQL 在执行查询语句时&#xff0c;有时候会自动帮我们进行数据类型的转换&#xff0c;这个过程就是隐式转换。比如说&#xff0c;我们在一个 INT 类型的字段上进行查询&#xff0c;但是传入的查询条件却是字符串类型的值&#xff0c…

pageoffice最新版本浏览器点击没反应解决办法

一、问题现象 最新版本的谷歌、火狐浏览器&#xff0c;调用pageoffice时&#xff0c;点击后没反应&#xff08;旧的谷歌浏览器不受影响&#xff09;。 二、产生原因 服务器返回pageOffice的客户端唤起链接格式为&#xff1a; PageOffice://|http://192.168.1.120:8080/xxx …