idea maven依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import redis.clients.jedis.*
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStreamclass RedisSink : RichSinkFunction<Data>() {private lateinit var jedisCluster: JedisClusteroverride fun open(parameters: Configuration) {super.open(parameters)val jedisPoolConf = JedisPoolConfig()jedisPoolConf.maxTotal = 128// 最大连接数jedisPoolConf.maxIdle = 50// 最大空闲连接数jedisPoolConf.testOnBorrow = true // 当调用 borrow Object方法时,是否进行有效性检查// 集群模式val nodes = HashSet<HostAndPort>()val hostAndPort1 = HostAndPort("h1", port)val hostAndPort2 = HostAndPort("h2", port)val hostAndPort3 = HostAndPort("h3", port)nodes.add(hostAndPort1)nodes.add(hostAndPort2)nodes.add(hostAndPort3)jedisCluster = JedisCluster(nodes, 100000, 100000, 2, "password", jedisPoolConf)}override fun invoke(value: Data, context: SinkFunction.Context<*>?) {val key = "data" + value.x+ value.yval outputBts = ByteArrayOutputStream()val gzip = GZIPOutputStream(outputBts)gzip.write(value.toByteArray())gzip.flush()gzip.finish()jedisCluster.set(key.toByteArray(), outputBts.toByteArray())jedisCluster.expire(key.toByteArray(),15552000)}override fun close() {super.close()jedisCluster.close()}}