flink 批量压缩redis集群 sink

server/2024/9/23 18:06:22/

idea maven依赖

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>


import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import redis.clients.jedis.*
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStreamclass RedisSink : RichSinkFunction<Data>() {private lateinit var jedisCluster: JedisClusteroverride fun open(parameters: Configuration) {super.open(parameters)val jedisPoolConf = JedisPoolConfig()jedisPoolConf.maxTotal = 128// 最大连接数jedisPoolConf.maxIdle = 50// 最大空闲连接数jedisPoolConf.testOnBorrow = true // 当调用 borrow Object方法时,是否进行有效性检查// 集群模式val nodes = HashSet<HostAndPort>()val hostAndPort1 = HostAndPort("h1", port)val hostAndPort2 = HostAndPort("h2", port)val hostAndPort3 = HostAndPort("h3", port)nodes.add(hostAndPort1)nodes.add(hostAndPort2)nodes.add(hostAndPort3)jedisCluster = JedisCluster(nodes, 100000, 100000, 2, "password", jedisPoolConf)}override fun invoke(value: Data, context: SinkFunction.Context<*>?) {val key = "data" + value.x+ value.yval outputBts = ByteArrayOutputStream()val gzip = GZIPOutputStream(outputBts)gzip.write(value.toByteArray())gzip.flush()gzip.finish()jedisCluster.set(key.toByteArray(), outputBts.toByteArray())jedisCluster.expire(key.toByteArray(),15552000)}override fun close() {super.close()jedisCluster.close()}}


http://www.ppmy.cn/server/120903.html

相关文章

Flink Task 日志文件隔离

Flink Task 日志文件隔离 任务在启动时会先通过 MdcUtils 启动一个 slf4j 的 MDC 环境&#xff0c;然后将 jobId 添加到 slf4j 的 MDC 容器中&#xff0c;随后任务输出的日志都将附带 joid。 MDC 介绍如下&#xff1a; MDC ( Mapped Diagnostic Contexts )&#xff0c;它是一个…

提升晶振电路抗扰性:优化方案解析

在现代电子设备中&#xff0c;晶振作为提供稳定时钟信号的核心组件&#xff0c;其稳定性对整个系统的运行至关重要。然而&#xff0c;电路抗扰性不良往往会导致晶振失效&#xff0c;进而影响设备的整体性能。晶发电子针对这一问题&#xff0c;提出了以下关于晶振电路抗扰性及优…

7.搭建个人金融数据库之快速获取股票列表和基本信息!

前边我们提过&#xff0c;免费的数据一般来自于爬虫&#xff0c;获取难度和维护成本都比较高&#xff0c;其实不太适合小白用户。所以非必要情况下&#xff0c;我们尽量不用这种方式来获取数据。 我自己用的比较多的是tushare&#xff0c;一般来说有它也就够了&#xff0c;大…

ABAP 学习t-code DWDM

ABAP 学习t-code DWDM &#xff0c;里面有很多例子展示&#xff0c;且能看到源代码

prometheus概念

一、Prometheus概述 1.prometheus概念&#xff1a;开源的系统监控和告警系统&#xff0c;在k8s分布式的容器化管理系统当中&#xff0c;一般都是搭配promethuse来进行监控&#xff1b;是一个服务监控系统&#xff0c;同时也可以监控主机&#xff0c;自带数据库&#xff0c;名字…

基于微信小程序的美食外卖管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

006——队列

目录 队列&#xff1a; 单端队列&#xff1a; 存储结构&#xff1a; 顺序队列 思路1&#xff1a;r指针指向尾元素的下一个位置 思路2&#xff1a;r指针指向真正的尾元素 如何解决假溢出的问题&#xff1f; 链式队列 双端队列 存储方式&#xff1a; 顺式存储 代码案例…

LeetCode41. 缺失的第一个正数(2024秋季每日一题 20)

给你一个未排序的整数数组 nums &#xff0c;请你找出其中没有出现的最小的正整数。 请你实现时间复杂度为 O(n) 并且只使用常数级别额外空间的解决方案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,0] 输出&#xff1a;3 解释&#xff1a;范围 [1,2] 中的数字都在数组…