【自定义Source、Sink】Flink自定义Source、Sink对redis进行读写操作

news/2025/3/30 8:50:56/

使用ParameterTool读取配置文件

Flink读取参数的对象

  1. Commons-cli: Apache提供的,需要引入依赖
  2. ParameterTool:Flink内置

ParameterTool 比 Commons-cli 使用上简便;

ParameterTool能避免Jar包的依赖冲突

建议使用第二种

使用ParameterTool对象可以直接获取配置文件中的信息,需要如下依赖

        <!-- Flink基础依赖 【ParameterTool类 在该依赖中】 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId></dependency><!-- Flink流批处理依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId></dependency>

Java读取资源的方式

  1. Class.getResourceAsStream(Path):Path 必须以 “/”,表示从ClassPath的根路径读取资源
  2. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”,默认从ClassPath的根路径读取资源

推荐使用第2种,以类加载器的方式获取静态资源文件,不要通过ClassPath的相对路径查找

最基本的工具类

public class ParameterUtil {// 创建 ParameterTool 对象public static ParameterTool getParameters() {// 读取 resources 文件夹下 "flink.properties" 文件InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {return ParameterTool.fromPropertiesFile(inputStream);} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}
}

image-20231209095849541

可以通过 ParameterUtil.getParameters().get("redis.port") 直接读取key对应的value值

Flink写入Redis方式

  1. 继承RichSinkFunction (Flink-Stream)
  2. 使用第3方的包 (Apache-Bahir-Flink)

Apache-Bahir-Flink 的 Redis-Connector的缺点:

  1. 使用Jedis, 没有使用Lettuce
  2. 没有对 Flink Table/SQL Api 的支持

不少基于bahir二开的例子解决了上述问题

gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search

github地址:https://github.com/apache/bahir-flink

bahir 集成了许多连接器,其中就包含Redis

image-20231209103659812

Flink官网上也可以看到bahir的影子

image-20231209104014483

方便起见,接下来就基于bahir,Flink写入Redis集群

基于巴希尔(Bahir)-Flink写入Redis集群

引入connector连接器依赖

        <!-- Flink-Connector-Redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_${scala.binary.version}</artifactId></dependency>

依赖版本定义在父模块中

image-20231209100449996

实现RedisMapper接口自定义Sink

首先实现RedisMapper接口并指定泛型——处理元素的类型

/*** 基于apache bachir flink的RedisSink,作用于Redis String数据类型*/
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {/*** 指定Redis的命令*/@Overridepublic RedisCommandDescription getCommandDescription() {/* ************************ 如果Redis的数据类型是 hash 或 z-Set* RedisCommandDescription 的构造方法必须传入 additionalKey* additionalKey就是Redis的键** *********************/return new RedisCommandDescription(RedisCommand.SET);}/*** 从数据流里获取Key值*/@Overridepublic String getKeyFromData(Tuple2<String, String> input) {return input.f0;}/*** 从数据流里获取Value值*/@Overridepublic String getValueFromData(Tuple2<String, String> input) {return input.f1;}
}

写入Redis工具类

public class RedisWriteUtil {/* ************************ FlinkJedisClusterConfig:集群模式* FlinkJedisPoolConfig:单机模式* FlinkJedisSentinelConfig:哨兵模式** *********************/// Jedis配置private static final FlinkJedisClusterConfig JEDIS_CONF;static {ParameterTool parameterTool = ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* ************************ InetSocketAddress 是Java的套接字** *********************/InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));Set<InetSocketAddress> set = new HashSet<>();set.add(inetSocketAddress);JEDIS_CONF = new FlinkJedisClusterConfig.Builder().setNodes(set).build();}/*** 基于Bahir写入Redis,Redis的数据是String类型*/public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));}}

测试一下

class RedisWriteUtilTest {@DisplayName("测试基于Bahir写入Redis,Redis数据类型是String类型")@Testvoid writeByBahirWithString() throws Exception {LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));RedisWriteUtil.writeByBahirWithString(dataStream);env.execute();}
}

非常完美!写入成功

image-20231209105850707

Flink读取Redis方式

  1. 继承RichSourceFunction (实现自定义Source)
  2. 继承RichParallelSourceFunction (实现自定义Source)【可以指定并行度】
  3. 实现SourceFunction接口 (实现自定义Source)

RichParallelSourceFunction 和 RichSourceFunction区别

RichParallelSourceFunction 可以设置并行度

RichParallelSourceFunction 和 RichSourceFunction 代码是可以互相套用

RichParallelSourceFunction 默认的并行度是cpu 的 核心数(core数)

RichSourceFunction 的并行度只能是1

继承RichSourceFunction类-Flink读取Redis集群

前置准备

定义枚举类

Redis数据类型枚举类

@Getter
public enum RedisDataType {STRING,HASH,LIST,SET,SORTED_SET,;RedisDataType() {}
}

定义Redis命令的枚举类,便于Source判断操作

@Getter
public enum RedisCommand {// get stringGET(RedisDataType.STRING);private final RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}
}

Jedis配置类

bahir依赖中自带jedis依赖一般不用,自行引入jedis,jedis依赖版本要与巴希尔中jedis版本保持一致

image-20231209111800457

public class JedisConf {public static JedisCluster getJedisCluster() throws IOException {ParameterTool parameterTool =ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* *********************** Jedis对象** JedisPool : 用于redis单机版* JedisCluster: 用于redis集群** JedisCluster对象能够自动发现正常的redis节点** *********************/HostAndPort hostAndPort = new HostAndPort(host,Integer.parseInt(port));Set<HostAndPort> nodes = new HashSet<>();nodes.add(hostAndPort);return new JedisCluster(nodes);}
}

封装Jedis对象的redis方法

封装Jedis对象的redis方法,方便统一调用和维护

public class JedisBuilder {private JedisCluster jedis = null;public JedisBuilder(JedisCluster jedisCluster) {this.jedis = jedisCluster;}public void close() {if (this.jedis != null) {this.jedis.close();}}/*** Redis的Get方法*/public String get(String key) {return jedis.get(key);}
}

自定义Source

Redis数据的映射对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {private String data;}

Flink 自定义Redis Source读取Redis

/* *********************** 【富函数类】 比函数类提供了更多函数生命周期,提供了获取上下文的方法* 富函数类通常是抽象类* *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {/*** Jedis对象*/private JedisBuilder jedisBuilder;/*** Redis命令枚举对象*/private final RedisCommand redisCommand;/*** redis key*/private final String key;public RedisSource(RedisCommand redisCommand, String key) {this.redisCommand = redisCommand;this.key = key;}/*** volatile 修饰的变量,它的更新都会通知其他线程.*/private volatile boolean isRunning = true;/*** Redis的连接初始化*/@Overridepublic void open(Configuration parameters) throws Exception {JedisCluster jedisCluster = JedisConf.getJedisCluster();jedisBuilder = new JedisBuilder(jedisCluster);}/*** Redis数据的读取*/@Overridepublic void run(SourceContext<RedisPO> output) throws Exception {/* ************************ 一直监听Redis数据的读取** *********************/String data = null;// while (isRunning) {switch (redisCommand.getRedisDataType()) {case STRING:data = jedisBuilder.get(key);}output.collect(new RedisPO(data));// }}@Overridepublic void cancel() {this.isRunning = false;}}

读取Redis工具类

public class RedisReadUtil {public static DataStream<RedisPO> read(StreamExecutionEnvironment env,RedisCommand redisCommand,String key) {return env.addSource(new RedisSource(redisCommand, key));}
}

测试一下

class RedisReadUtilTest {@DisplayName("测试自定义Source读取Redis,Redis数据类型是String类型")@Testvoid testReadByCustomSourceWithString() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<RedisPO> dataStream = RedisReadUtil.read(env,RedisCommand.GET,"k");dataStream.print();env.execute();}
}

测试成功!

image-20231209113539037

文章来源:https://blog.csdn.net/qq_43417581/article/details/134892879
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ppmy.cn/news/1264231.html

相关文章

ssh 公私钥登录

关于 SSH 公私钥登录 SSH 公私钥登录方式是一种基于非对称加密机制的身份验证方法&#xff0c;用于登录 SSH 服务器&#xff0c;可以安全、高效地完成身份验证。 在使用 SSH 公私钥登录方式时&#xff0c;用户需要生成一对公私钥&#xff0c;并将公钥放置在 SSH 服务器上的授…

openEuler学习05-kernel升级

周末没事&#xff0c;尝试下openEuler的kernel升级 [rootlocalhost ~]# more /etc/os-release NAME"openEuler" VERSION"20.03 (LTS-SP3)" ID"openEuler" VERSION_ID"20.03" PRETTY_NAME"openEuler 20.03 (LTS-SP3)" ANSI_…

AWS攻略——创建VPC

文章目录 创建一个可以外网访问的VPCCIDR主路由表DestinationTarget 主网络ACL入站规则出站规则 子网创建EC2测试连接创建互联网网关&#xff08;IGW&#xff09;编辑路由表 知识点参考资料 在 《AWS攻略——VPC初识》一文中&#xff0c;我们在AWS默认的VPC下部署了一台可以SS…

基于lambda简化设计模式

前言 虽说使用设计模式可以让复杂的业务代码变得清晰且易于维护&#xff0c;但是某些情况下&#xff0c;开发可能会遇到我为了简单的业务逻辑去适配设计模式的情况&#xff0c;本文笔者就以四种常见的设计模式为例&#xff0c;演示如何基于lambda来简化设计模式的实现。 策略…

【MySQL】MySQL 在 Centos 7环境安装教程

文章目录 1.卸载不要的环境2.检查系统安装包3.获取mysql官方yum源4.安装mysql yum 源&#xff0c;对比前后yum源5.安装mysql服务6.查看配置文件和数据存储位置7.启动服务和查看启动服务8.登录9.配置my.cnf 1.卸载不要的环境 先检查是否有mariadb存在 ps ajx |grep mariadb如果…

【trino权威指南】使用trino详解:trino client安装、查询sql、DBeaver连接trino、java通过JDBC连接trino

文章目录 一. Trino CLI1. 安装client2. 使用client执行sql 二. JDBC driver 连接Trino1. 通过DBeaver用户界面连接2. JDBC Driver in java2.1. 环境配置2.2. 注册和配置driver2.3. 连接参数2.4. 查询例子 一. Trino CLI 1. 安装client Trino CLI提供了一个基于终端的交互式s…

Zookeeper单机模式搭建

1、下载 ​wget https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz 2、解压 tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz 3、进入 apache-zookeeper-3.6.3-bin目录下&#xff0c;创建data cd apache-zookeeper-3.6.3-bin mkdir da…

NFC和蓝牙在物联网中有什么意义?如何选择?

#NFC物联网# #蓝牙物联网# 在物联网中&#xff0c;NFC和蓝牙有什么意义&#xff1f; NFC在物联网中代表近场通信技术。它是一种短距离、高频的无线通信技术&#xff0c;可以在近距离内实现设备间的数据传输和识别。NFC技术主要用于移动支付、电子票务、门禁、移动身份识别、防…