flink 批量压缩redis集群 sink

ops/2024/11/13 8:55:13/

idea maven依赖

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>


import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import redis.clients.jedis.*
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStreamclass RedisSink : RichSinkFunction<Data>() {private lateinit var jedisCluster: JedisClusteroverride fun open(parameters: Configuration) {super.open(parameters)val jedisPoolConf = JedisPoolConfig()jedisPoolConf.maxTotal = 128// 最大连接数jedisPoolConf.maxIdle = 50// 最大空闲连接数jedisPoolConf.testOnBorrow = true // 当调用 borrow Object方法时,是否进行有效性检查// 集群模式val nodes = HashSet<HostAndPort>()val hostAndPort1 = HostAndPort("h1", port)val hostAndPort2 = HostAndPort("h2", port)val hostAndPort3 = HostAndPort("h3", port)nodes.add(hostAndPort1)nodes.add(hostAndPort2)nodes.add(hostAndPort3)jedisCluster = JedisCluster(nodes, 100000, 100000, 2, "password", jedisPoolConf)}override fun invoke(value: Data, context: SinkFunction.Context<*>?) {val key = "data" + value.x+ value.yval outputBts = ByteArrayOutputStream()val gzip = GZIPOutputStream(outputBts)gzip.write(value.toByteArray())gzip.flush()gzip.finish()jedisCluster.set(key.toByteArray(), outputBts.toByteArray())jedisCluster.expire(key.toByteArray(),15552000)}override fun close() {super.close()jedisCluster.close()}}


http://www.ppmy.cn/ops/113273.html

相关文章

【重学 MySQL】二十九、函数的理解

【重学 MySQL】二十九、函数的理解 什么是函数不同 DBMS 函数的差异函数名称和参数功能实现数据类型支持性能和优化兼容性和可移植性 MySQL 的内置函数及分类单行函数多行函数&#xff08;聚合函数&#xff09;使用注意事项 什么是函数 函数&#xff08;Function&#xff09;在…

服务器连接不上怎么办?

服务器连接不上怎么办&#xff1f;服务器连接问题对于依赖网络服务的企业来说可能是一场灾难&#xff0c;因为它可能导致业务中断和数据访问困难。当遇到服务器连接不上的情况时&#xff0c;迅速而准确地诊断问题并采取相应措施至关重要。聚名网将介绍一些常见的故障排查步骤和…

Acwing 并查集

并查集 并查集结构能够支持快速进行如下的操作&#xff1a; 将两个集合合并&#xff1b;询问两个元素是否在一个集合当中 并查集可以在近乎 O ( 1 ) O(1) O(1)的时间复杂度下吗&#xff0c;完成上述2个操作 基本原理 用树的形式来维护一个集合。用树的根节点来代表这个集合…

基于YOLOv8+LSTM的商超扶梯场景下行人安全行为姿态检测识别

基于YOLOv8LSTM的商超扶梯场景下行人安全行为姿态检测识别 手扶电梯 行为识别 可检测有人正常行走&#xff0c;有人 跌倒&#xff0c;有人逆行三种行为 跌倒检测 电梯跌倒 扶梯跌倒 人体行为检测 YOLOv8LSTM。 基于YOLOv8LSTM的商超扶梯场景下行人安全行为姿态检测识别&#xf…

maxcompute使用篇

文章目录 maxcompute使用篇1.mongoDB与maxcompute 进行数据同步1.1 基本类型的数据1.2部分复杂类型的数据 2.maxcompute中复杂数据类型解析2.1 get_json_object2.2 json_tuple2.3 处理json几种失效的情况:2.4 STR_TO_MAP、MAP_KEYS2.5 regexp_replace2.6 FROM_JSON2.7 nvl2.8 t…

分布式中间件-Pika一个高效的分布式缓存组件

文章目录 Pika简介Pika特性Pika解决的问题及应用场景Pika架构之存储引擎部署模式1、主从模式2、分布式集群模式 Pika快速上手1、二进制包方式2、源码编译方式2.1 支持的平台2.2 依赖的库软件2.3 编译过程2.4 启动 Pika2.5 清空已编译的结果2.6 Pika 的开发调试 3、容器化3.1 使…

幂函数的积分型函数

数学上&#xff0c;把形如的函数称为幂函数。幂函数的规律在博文[1]中已作说明。简单地说&#xff0c;前提下&#xff0c;当时幂函数下凸递增&#xff0c;时线性递增&#xff0c;时上凸递增&#xff0c;时为常值函数&#xff0c;时递减&#xff0c;与坐标系的轴和轴的正方向无限…

clip论文阅读(Learning Transferable Visual Models From Natural Language Supervision)

目录 摘要训练pre-train model的过程将pre-train model应用于下游任务应用&#xff08;待更新&#xff09; 论文/项目地址&#xff1a;https://github.com/OpenAI/CLIP 提供了clip的pre-trained model的权重&#xff0c;也可安装使用pre-trained model 摘要 使用标签标注的图…