flink 批量压缩redis集群 sink

devtools/2024/9/24 6:35:29/

idea maven依赖

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>


import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import redis.clients.jedis.*
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStreamclass RedisSink : RichSinkFunction<Data>() {private lateinit var jedisCluster: JedisClusteroverride fun open(parameters: Configuration) {super.open(parameters)val jedisPoolConf = JedisPoolConfig()jedisPoolConf.maxTotal = 128// 最大连接数jedisPoolConf.maxIdle = 50// 最大空闲连接数jedisPoolConf.testOnBorrow = true // 当调用 borrow Object方法时,是否进行有效性检查// 集群模式val nodes = HashSet<HostAndPort>()val hostAndPort1 = HostAndPort("h1", port)val hostAndPort2 = HostAndPort("h2", port)val hostAndPort3 = HostAndPort("h3", port)nodes.add(hostAndPort1)nodes.add(hostAndPort2)nodes.add(hostAndPort3)jedisCluster = JedisCluster(nodes, 100000, 100000, 2, "password", jedisPoolConf)}override fun invoke(value: Data, context: SinkFunction.Context<*>?) {val key = "data" + value.x+ value.yval outputBts = ByteArrayOutputStream()val gzip = GZIPOutputStream(outputBts)gzip.write(value.toByteArray())gzip.flush()gzip.finish()jedisCluster.set(key.toByteArray(), outputBts.toByteArray())jedisCluster.expire(key.toByteArray(),15552000)}override fun close() {super.close()jedisCluster.close()}}


http://www.ppmy.cn/devtools/116374.html

相关文章

JDK7u21 HashMap版

今天在搞ROME HotSwappableTargetSource链的时候突然发现&#xff0c;JDK7U21反序列化链不仅HashMap.put触发了key.equals putForCreate也调用了 而且HashMap.readObject直接调用了putForCreate来还原 what?直接向HashMap两个put不就完了&#xff0c;还搞什么HashSet 开弄&am…

鸿蒙​​​​​​保障应用开发安全的技术措施

应用开发安全是指在开发过程中嵌入安全能力&#xff0c;使应用程序从源头上安全可靠。 开发者是应用程序的创作者&#xff0c;合法的开发者是创作出安全、可靠应用的前提条件;为了保证应用开发者身份真实可信&#xff0c;鸿蒙通过开发者证书对应用进行签名&#xff0c;保证应用…

MySQL增删面试题

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云/阿里云/华为云/51CTO&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互…

深度学习:(七)梯度下降法在神经网络中的应用

梯度下降法在神经网络中的应用 事先规定&#xff1a; 用 n n n 表示个数&#xff08;维度&#xff09;: n [ 0 ] n x n^{[0]}n_x n[0]nx​ &#xff0c;表示单个训练样本 x x x 的元素个数&#xff1b; n [ 1 ] n^{[1]} n[1] 表示隐藏层 1 1 1 的单元&#xff08;节点&am…

甩锅笔记:好好的服务端应用突然起不来,经定位是无法访问外网了?测试又说没改网络配置,该如何定位?

在工作中、团队协作时&#xff0c;可能遇到的问题&#xff0c;如集成测试等场景。但是作为偏前端的全栈&#xff0c;锅从天上来&#xff0c;不是你想甩就能甩&#xff0c;尤其面对测试等比较强势的团体&#xff08;bug创造者&#xff09;&#xff0c;你必须有强大的心理承受能力…

Flask-JWT-Extended登录验证, 不用自定义

"""安装:pip install Flask-JWT-Extended创建对象 初始化与app绑定jwt JWTManager(app) # 初始化JWTManager设置 Cookie 的选项:除了设置 cookie 的名称和值之外&#xff0c;你还可以指定其他的选项&#xff0c;例如&#xff1a;过期时间 (max_age)&#xff1…

DANN GRL

域自适应是指在目标域与源域的数据分布不同但任务相同下的迁移学习&#xff0c;从而将模型在源域上的良好性能迁移到目标域上&#xff0c;极大地缓解目标域标签缺失严重导致模型性能受损的问题。 介绍一篇经典工作 DANN &#xff1a; 模型结构 在训练阶段需要预测如下两个任务…

流行的微前端框架有哪些,适应场景是什么

以下是一些流行的微前端框架&#xff0c;各自的适用场景 1. Single-SPA 适用场景&#xff1a; 适用于需要将大型应用拆分为多个小型、独立应用的场景。支持多种框架的集成。 GitHub地址&#xff1a; Single-SPA Star数量&#xff1a; 约6.8k 2. qiankun 适用场景&#x…