Flink学习连载文章10--CheckPoint和维表Join

devtools/2024/11/29 23:05:54/

一、Checkpoint

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

2、设置Checkpoint

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 09:18:30**/
public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

测试代码效果:启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");在设置检查点之前,设置一句这样带权限的语句,如果是集群运行,不存在该问题。可以不设置!!!

查看快照情况:

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

启动HDFS、Flink

[root@hadoop10 app]#start-dfs.sh
[root@hadoop10 app]#start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:输入一个hello,1 得到新的结果hello,8

3、重启策略

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

就是一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4J的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

那为什么开启检查点之后,报错了程序还在运行?因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

程序如果上传至服务器端运行,可以看到重启状态

完整代码如下:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 09:13:31**/
public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

在本地测试,是没有办法看到重试机制的现象的,需要打包上传至集群,特别注意:使用的类名到底是哪一个。

4、savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1.提交一个flink job --提交的还是重启策略的代码打成的jar包

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

2.输入一些数据,观察单词对应的数字的变化

3.执行savepoint操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

4.查看最近完成的flink job对应的savepoint

5.根据之前的savepoint路径,重新启动flink job

flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
Job has been submitted with JobID 3766ec9ff6f34b46376493a04b50a1f4

再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:

这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

二、维表join 【重要】

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果,那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。维表一般的特点是变化比较慢。

需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。

期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京

在MySQL创建城市表:

create table city(city_id varchar(50) primary key,city_name varchar(50) 
);
insert into city values('1001','北京'),('1002','上海'),('1003','郑州') ;

1、 预加载维表

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在kafka流map()方法中与维表数据进行关联。

RichMapFunction中open方法里加载维表数据到内存的方式特点如下:

  • 优点:实现简单
  • 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。另外,维表是变化慢,不是一直不变的,只是变化比较缓慢而已。
package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/
public class _04PreLoadDataDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Map<Integer,String> cityMap = new HashMap<Integer,String>();Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){int cityId =  resultSet.getInt("city_id");String cityName =  resultSet.getString("city_name");cityMap.put(cityId,cityName);}}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");System.out.println("+++++++++++++++" +cityMap);String cityName = cityMap.get(Integer.valueOf(arr[1]));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
在黑窗口输入:
张三,1001
李四,1001
王五,1002

那如果数据多了怎么办,数据更新了怎么办?可以进行查询,代码示例如下:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/
public class _05SelectDBDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select city_name from city where city_id = ? ");}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");statement.setInt(1,Integer.valueOf(arr[1]));ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){cityName = resultSet.getString("city_name");}return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

以上做法成功解决了我们以前的两个问题:数据更新怎么办,数据多了怎么办。

但是缺点是每次都得查询数据库,非常不方便。

以上两个版本使用的是socket进行演示的,以下随堂代码是使用kafka演示的,不太一样。

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo03 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  每一次都从数据库中查询一下:*  优点是:假如数据库中的数据有更新,每次都可以查询到最新的数据*  缺点是:每次都查询数据库,假如kafka中的数据特别多,就会查询数据库多次,效率低*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ? ");}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];statement.setString(1,cityCode);ResultSet resultSet = statement.executeQuery();String cityName = "";if(resultSet.next()){cityName = resultSet.getString("city_name");}return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo04 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  使用hashmap*  将数据库中的数据只查询一次,放入map集合,map号称本地缓存*  优点:查询数据库只查询一次,每次数据都可以直接从map中获取,效率高*  缺点:假如数据库中的数据更新了,map缓存的数据是没有办法更新的,而且假如数据库中的数据特别多,全部加载到map中会导致堆内存爆炸 OOM*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;Map<String,String> hashMap = new HashMap<String,String>();@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city");ResultSet resultSet = statement.executeQuery();while(resultSet.next()){String cityCode = resultSet.getString("city_id");String cityName = resultSet.getString("city_name");hashMap.put(cityCode,cityName);}}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = hashMap.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

2、 热存储维表

以前的方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

  • 优点:维度数据量不受内存限制,可以存储很大的数据量。
  • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

(1) 使用cache来减轻访问压力

可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用Guava Cache。

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 15:12:45**/public class _06GuavaCacheDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将程序的并行度设置为1,能够更好的展示缓存效果env.setParallelism(1);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换dataStreamSource.map(new RichMapFunction<String, Tuple3<String,Integer,String>>() {Connection connection;PreparedStatement statement;// 定义一个CacheLoadingCache<Integer, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(10, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<Integer, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(Integer cityId) throws Exception {System.out.println("进入数据库查询啦。。。。。。。");statement.setInt(1,cityId);ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){System.out.println("进入到了if中.....");cityName = resultSet.getString("city_name");}return cityName;}});// 将mysql的数据加载到map中connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1","root","123456");statement = connection.prepareStatement("select city_name from city where city_id = ? ");}@Overridepublic void close() throws Exception {statement.close();connection.close();}// zhangsan,1001@Overridepublic Tuple3<String, Integer, String> map(String s) throws Exception {String[] arr = s.split(",");String cityName = "" ;if (cache.get(Integer.valueOf(arr[1])) != null){cityName = cache.get(Integer.valueOf(arr[1]));}return Tuple3.of(arr[0],Integer.valueOf(arr[1]),cityName);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

设置的guawa缓存是每一个分区都有一个缓存,多个分区之间缓存不共享。所以你需要把并行度设置为1,方便查看效果。

随堂代码演示:终极版本:

package com.bigdata.day06;import avro.shaded.com.google.common.cache.*;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-17 11:40:56**/
public class Demo05 {/*** 需求:kafka输入的数据格式: 姓名,城市编号 例如 zhangsan,1001。*  期望输出的数据: 姓名,城市编号,城市名称 例如 zhangsan,1001,北京**  第三个版本:使用guawaCache [google的技术]*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g4");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("kafka-01",new SimpleStringSchema(),properties);//3. transformation-数据处理转换DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);System.out.println(streamSource.getParallelism());streamSource.map(new RichMapFunction<String, String>() {Connection connection= null;PreparedStatement statement =null;// 定义一个CacheLoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kettle", "root", "root");statement = connection.prepareStatement("select * from city where city_id = ?");cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(100, TimeUnit.SECONDS)//指定移除通知/*.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {// 在这个需求中,我们没有要删除的数据,所以这个代码可以删除掉,没有意义,但是为了学习,保留了。System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}})*/.build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {System.out.println("进入数据库查询啦。。。。。。。");statement.setString(1,cityId);ResultSet resultSet = statement.executeQuery();String cityName = null;if(resultSet.next()){System.out.println("进入到了if中.....");cityName = resultSet.getString("city_name");}return cityName;}});}@Overridepublic void close() throws Exception {if(statement != null){statement.close();}if(connection != null) {connection.close();}}@Overridepublic String map(String value) throws Exception {// zhangsan,1001String[] arr = value.split(",");String name = arr[0];String cityCode = arr[1];String cityName = cache.get(cityCode);return name+","+cityCode+","+cityName;}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

http://www.ppmy.cn/devtools/138035.html

相关文章

TCP/IP学习笔记

TCP\IP从实际应用的五层结构开始&#xff0c;自顶而下的去分析每一层。 TCP/IP五层架构概述 学术上面是TCP/IP四层架构&#xff0c;OSI/ISO是七层架构&#xff0c;实际中使用的是TCP/IP五层架构。 数据链路层 ICMP数据包分析 Wireshark抓包分析ICMP协议_wireshark抓ping包分析…

Linux——基础命令(2) 文件内容操作

目录 ​编辑 文件内容操作 1.Vim &#xff08;1&#xff09;移动光标 &#xff08;2&#xff09;复制 &#xff08;3&#xff09;剪切 &#xff08;4&#xff09;删除 &#xff08;5&#xff09;粘贴 &#xff08;6&#xff09;替换,撤销,查找 &#xff08;7&#xff…

数据结构--链表实现栈和队列

引入 在数据结构--数组实现栈和队列-CSDN博客这篇文章中我们已经用数组实现了栈和队列&#xff0c;那么我们再练习一下链表实现吧&#xff01;o(*&#xffe3;▽&#xffe3;*)ブ 既然是链表&#xff0c;那么前提给出一个创建节点的封装类&#xff1a; public class Node {int…

hadoop_zookeeper详解

Zookeeper秒懂 工作机制特点数据结构应用场景安装选举机制初始化启动无法和Leader保持连接 节点类型监听器原理写数据流程Paxos算法算法流程 客户端命令 Zookeeper 是一个开源的分布式的&#xff0c;为分布式框架提供协调服务的 Apache 项目。 工作机制 Zookeeper是一个基于观察…

uniapp配置全局消息提醒

1.H5使用根标签插入dom的方式实现。 2.app端使用plus.nativeObj.View的方式绘制实现 H5端app端 H5端 创建组件orderAlert.vue <template><div class"view"><div class"content" v-if"visible"><div class"message&q…

DVWA靶场——File Inclusion

File Inclusion&#xff08;文件包含&#xff09;漏洞 指攻击者通过恶意构造输入&#xff0c;利用应用程序错误的文件包含机制&#xff0c;导致程序包含并执行未经授权的本地或远程文件。这类漏洞广泛存在于Web应用程序中&#xff0c;尤其是在那些允许用户提供文件路径或URL的地…

详解 PyTorch 中的 DataLoader:功能、实现及应用示例

详解 PyTorch 中的 DataLoader&#xff1a;功能、实现及应用示例 在 PyTorch 框架中&#xff0c;Dataloader 是一个非常重要的类&#xff0c;用于高效地加载和处理来自 Dataset 的数据。Dataloader 允许批量加载数据&#xff0c;支持多线程/多进程加载&#xff0c;并可进行数据…

11.28深度学习_bp算法

七、BP算法 多层神经网络的学习能力比单层网络强得多。想要训练多层网络&#xff0c;需要更强大的学习算法。误差反向传播算法&#xff08;Back Propagation&#xff09;是其中最杰出的代表&#xff0c;它是目前最成功的神经网络学习算法。现实任务使用神经网络时&#xff0c;…