redis 过期消息订阅实现(java实现)

news/2024/12/15 14:45:27/

一、开启redis消息通知功能

方法1: 修改conf文件

编辑/etc/redis/redis.conf文件,添加或启用以下内容(key过期通知):

notify-keyspace-events Ex

方法2: 使用命令

  1. 登陆redis-cli
  2. 输入下列命令
config set notify-keyspace-events Ex

关键字介绍:

上面Ex就是其中的关键字之一
  • K:keyspace事件,事件以__keyspace@__为前缀进行发布
  • E:keyevent事件,事件以__keyevent@__为前缀进行发布
  • g:一般性的,非特定类型的命令,比如del,expire,rename等
  • $:字符串特定命令
  • l:列表特定命令
  • s:集合特定命令
  • h:哈希特定命令
  • z:有序集合特定命令
  • x:过期事件,当某个键过期并删除时会产生该事件
  • e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
  • A:g$lshzxe的别名,因此AKE意味着所有事件

订阅者介绍

  • onMessage: 收到消息回调
  • onSubscribe: 订阅频道(channel)
  • onUnsubscribe: 取消订阅频道(channel)
  • onPMessage: 收到消息回调-p模式
  • onPSubscribe: 订阅频道(channel)p模式
  • onPUnsubscribe: 取消订阅频道(channel)p模式

带P的就是可以在订阅的时候支持表达式, 一次性订阅多个频道,

例如:

__keyevent@*__:expired ```其中的*标识订阅所有db的key过期事件

二、在pom文件中引入需要的redis依赖

        <!--添加redis依赖--><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>1.8.4.RELEASE</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>

三、编写基本配置文件 redis.properties

redis.hostName=192.168.6.138
redis.port=6379
redis.password=123321
# 连接超时时间
redis.timeout=10000#最大空闲数
redis.maxIdle=300
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=1000
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000
#连接的最小空闲时间 默认1800000毫秒(30分钟)
redis.minEvictableIdleTimeMillis=300000
#每次释放连接的最大数目,默认3
redis.numTestsPerEvictionRun=1024
#逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
redis.timeBetweenEvictionRunsMillis=30000
#是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
redis.testOnBorrow=true
#在空闲时检查有效性, 默认false
redis.testWhileIdle=true

四、编写配置类 RedisConfig

package com.lanqiaobei.ssm.yjk.config;import com.lanqiaobei.ssm.yjk.util.RedisKeyExpirationListener;
//import com.liuyanzhao.ssm.blog.util.RedisLockUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.jcache.config.JCacheConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.nio.charset.StandardCharsets;@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends JCacheConfigurerSupport {@Autowiredprivate Environment environment;@Beanpublic RedisConnectionFactory redisConnectionFactory() {JedisConnectionFactory fac = new JedisConnectionFactory();fac.setHostName(environment.getProperty("redis.hostName"));fac.setPort(Integer.parseInt(environment.getProperty("redis.port")));fac.setPassword(environment.getProperty("redis.password"));fac.setTimeout(Integer.parseInt(environment.getProperty("redis.timeout")));fac.getPoolConfig().setMaxIdle(Integer.parseInt(environment.getProperty("redis.maxIdle")));fac.getPoolConfig().setMaxTotal(Integer.parseInt(environment.getProperty("redis.maxTotal")));fac.getPoolConfig().setMaxWaitMillis(Integer.parseInt(environment.getProperty("redis.maxWaitMillis")));fac.getPoolConfig().setMinEvictableIdleTimeMillis(Integer.parseInt(environment.getProperty("redis.minEvictableIdleTimeMillis")));fac.getPoolConfig().setNumTestsPerEvictionRun(Integer.parseInt(environment.getProperty("redis.numTestsPerEvictionRun")));fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(Integer.parseInt(environment.getProperty("redis.timeBetweenEvictionRunsMillis")));fac.getPoolConfig().setTestOnBorrow(Boolean.parseBoolean(environment.getProperty("redis.testOnBorrow")));fac.getPoolConfig().setTestWhileIdle(Boolean.parseBoolean(environment.getProperty("redis.testWhileIdle")));return fac;}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){// 创建RedisTemplate对象RedisTemplate<String, Object> template = new RedisTemplate<>();// 设置连接工厂template.setConnectionFactory(connectionFactory);// 创建JSON序列化工具GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();// 设置Key的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);// 设置Value的序列化template.setValueSerializer(jsonRedisSerializer);template.setHashValueSerializer(jsonRedisSerializer);template.setDefaultSerializer(stringRedisSerializer);// 返回return template;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,RedisKeyExpirationListener redisKeyExpirationListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 监听 __keyevent@0__:expired 频道,这里的0指数据库编号为 0;container.addMessageListener(redisKeyExpirationListener,new PatternTopic("__keyevent@0__:expired"));return container;}@Beanpublic RedisKeyExpirationListener redisKeyExpirationListener() {return new RedisKeyExpirationListener();}// 其他 Bean 定义
}

五、实现监听类 RedisKeyExpirationListener

package com.lanqiaobei.ssm.yjk.util;import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.concurrent.TimeUnit;import static cn.hutool.core.util.IdUtil.randomUUID;
@Component
public class RedisKeyExpirationListener implements MessageListener {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate PublisherMQ publisherMQ;//分布式锁过期时间 s  可以根据业务自己调节private static final Long LOCK_REDIS_TIMEOUT = 2000L;@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取过期的 Key,需要利用byte[]录入和接收,不然会出现中文乱码byte[] body = message.getBody();String allKey = redisTemplate.getStringSerializer().deserialize(body);String expiredKey = StrUtil.removePrefix(allKey, "todo:");//hutool工具里面去掉首位字符
//        System.out.println(expiredKey);// 处理相应的业务逻辑
//————————————————————————————————————————————————————————String key = "todolock:"+expiredKey;String value = randomUUID();//redis尝试获取锁,加锁Boolean getLock = getLock(key,value);if(getLock){publisherMQ.sendMessage("lqb.direct","queueLQBKey",expiredKey);releaseLock(key,value);}}/***  加锁**/public Boolean getLock(String key,String value){Boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(key,value);if (lockStatus) {System.out.println("Set key-value successfully!");redisTemplate.expire(key, LOCK_REDIS_TIMEOUT, TimeUnit.MILLISECONDS);//毫秒级} else {System.out.println("Key already exists!");}return lockStatus;}/***  释放锁**/public Long releaseLock(String key,String value){String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);Long releaseStatus = (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);return releaseStatus;}
}

监听redis过期消息提醒,同一个数据(键)过期会有多次通知提醒。原因是:可能是由于 Redis 的主从复制或者分片集群等机制导致的。在主从复制或者分片集群中,可能会发生多个节点同时订阅了相同的键空间通知,从而导致同一个键空间事件被多次触发。
我的解决方法是:给键过期后提醒的回调函数加锁,收到多个通知提醒,回调函数加锁后最终只会有一个执行,其他没有获得锁的回调不会执行,这样就避免了重复执行任务代码。

这里的实现方法在另一个文章:
https://blog.csdn.net/m0_46652188/article/details/130394484


http://www.ppmy.cn/news/57571.html

相关文章

【Java笔试强训 17】

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、选择题 二、编程题 &#x1f525;杨辉三角…

【算法】DFS计算四连通块个数

题目&#xff1a; 输入一个二维数组的row和col, 再输入这个二维数组&#xff0c;输出这个数组中包含的四连通块数量 四连通块是指一个数组数据, 其上下左右中有一个数字和它相等, 则这两个数连通, 属于一个连通块 示例输入&#xff1a; 7 7 0 0 0 0 0 0 0 0 1 1 1 1 2 0 0 3 …

作为一名程序员,如何写出一手让同事膜拜的漂亮代码?

整洁的代码 有意义的命名 函数命名 变量命名 函数的定义 注释的规范 代码的长度 代码的对齐 我写代码已经有好几年了&#xff0c;最近看了一本书叫做《代码整洁之道》。我发现这本书中介绍的一些内容对我来说非常有启发性。书中提到的一些方法和技巧让我重新审视了自己的…

浙大的SAMTrack,自动分割和跟踪视频中的任何内容

Meta发布的SAM之后&#xff0c;Meta的Segment Anything模型(可以分割任何对象)体验过感觉很棒&#xff0c;既然能够在图片上面使用&#xff0c;那肯定能够在视频中应用&#xff0c;毕竟视频就是一帧一帧的图片的组合。 果不其然浙江大学就发布了这个SAMTrack&#xff0c;就是在…

如何构建数据血缘系统

1、明确需求&#xff0c;确定边界 在进行血缘系统构建之前&#xff0c;需要进行需求调研&#xff0c;明确血缘系统的主要功能&#xff0c;从而确定血缘系统的最细节点粒度&#xff0c;实体边界范围。 例如节点粒度是否需要精确到字段级&#xff0c;或是表级。一般来说&#x…

[Pandas] 构建DataFrame数据框

DataFrame是二维数据结构&#xff0c;数据以行和列的形式排列 构建DataFrame最基本的定义格式如下 df pd.DataFrame(dataNone, indexNone, columnsNone) 参数说明 data: 具体数据 index: 行索引&#xff0c;如果没有指定&#xff0c;会自动生成RangeIndex(0,1,2,...,n) colu…

项目沟通管理和干系人管理

与沟通相关的概念、如何制定沟通管理计划、管理和控制沟通、项目干系人管理等 1、沟通的基本概念 沟通&#xff1a;包括信息的生成、传递、接收、理解和检查 沟通渠道的数量和参与者之间成正比关系 沟通渠道的计算公式&#xff1a; M n&#xff08;n - 1&#xff09;/ 2 …

10个最流行的向量数据库【AI】

矢量数据库是一种将数据存储为高维向量的数据库&#xff0c;高维向量是特征或属性的数学表示。 每个向量都有一定数量的维度&#xff0c;范围从几十到几千不等&#xff0c;具体取决于数据的复杂性和粒度。 推荐&#xff1a;用 NSDT场景设计器 快速搭建3D场景。 矢量数据库&…