一、开启redis消息通知功能
方法1: 修改conf文件
编辑/etc/redis/redis.conf文件,添加或启用以下内容(key过期通知):
notify-keyspace-events Ex
方法2: 使用命令
- 登陆redis-cli
- 输入下列命令
config set notify-keyspace-events Ex
关键字介绍:
上面Ex就是其中的关键字之一
- K:keyspace事件,事件以__keyspace@__为前缀进行发布
- E:keyevent事件,事件以__keyevent@__为前缀进行发布
- g:一般性的,非特定类型的命令,比如del,expire,rename等
- $:字符串特定命令
- l:列表特定命令
- s:集合特定命令
- h:哈希特定命令
- z:有序集合特定命令
- x:过期事件,当某个键过期并删除时会产生该事件
- e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
- A:g$lshzxe的别名,因此AKE意味着所有事件
订阅者介绍
- onMessage: 收到消息回调
- onSubscribe: 订阅频道(channel)
- onUnsubscribe: 取消订阅频道(channel)
- onPMessage: 收到消息回调-p模式
- onPSubscribe: 订阅频道(channel)p模式
- onPUnsubscribe: 取消订阅频道(channel)p模式
带P的就是可以在订阅的时候支持表达式, 一次性订阅多个频道,
例如:
__keyevent@*__:expired ```其中的*标识订阅所有db的key过期事件
二、在pom文件中引入需要的redis依赖
<!--添加redis依赖--><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>1.8.4.RELEASE</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>
三、编写基本配置文件 redis.properties
redis.hostName=192.168.6.138
redis.port=6379
redis.password=123321
# 连接超时时间
redis.timeout=10000#最大空闲数
redis.maxIdle=300
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=1000
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000
#连接的最小空闲时间 默认1800000毫秒(30分钟)
redis.minEvictableIdleTimeMillis=300000
#每次释放连接的最大数目,默认3
redis.numTestsPerEvictionRun=1024
#逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
redis.timeBetweenEvictionRunsMillis=30000
#是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
redis.testOnBorrow=true
#在空闲时检查有效性, 默认false
redis.testWhileIdle=true
四、编写配置类 RedisConfig
package com.lanqiaobei.ssm.yjk.config;import com.lanqiaobei.ssm.yjk.util.RedisKeyExpirationListener;
//import com.liuyanzhao.ssm.blog.util.RedisLockUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.jcache.config.JCacheConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.nio.charset.StandardCharsets;@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends JCacheConfigurerSupport {@Autowiredprivate Environment environment;@Beanpublic RedisConnectionFactory redisConnectionFactory() {JedisConnectionFactory fac = new JedisConnectionFactory();fac.setHostName(environment.getProperty("redis.hostName"));fac.setPort(Integer.parseInt(environment.getProperty("redis.port")));fac.setPassword(environment.getProperty("redis.password"));fac.setTimeout(Integer.parseInt(environment.getProperty("redis.timeout")));fac.getPoolConfig().setMaxIdle(Integer.parseInt(environment.getProperty("redis.maxIdle")));fac.getPoolConfig().setMaxTotal(Integer.parseInt(environment.getProperty("redis.maxTotal")));fac.getPoolConfig().setMaxWaitMillis(Integer.parseInt(environment.getProperty("redis.maxWaitMillis")));fac.getPoolConfig().setMinEvictableIdleTimeMillis(Integer.parseInt(environment.getProperty("redis.minEvictableIdleTimeMillis")));fac.getPoolConfig().setNumTestsPerEvictionRun(Integer.parseInt(environment.getProperty("redis.numTestsPerEvictionRun")));fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(Integer.parseInt(environment.getProperty("redis.timeBetweenEvictionRunsMillis")));fac.getPoolConfig().setTestOnBorrow(Boolean.parseBoolean(environment.getProperty("redis.testOnBorrow")));fac.getPoolConfig().setTestWhileIdle(Boolean.parseBoolean(environment.getProperty("redis.testWhileIdle")));return fac;}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){// 创建RedisTemplate对象RedisTemplate<String, Object> template = new RedisTemplate<>();// 设置连接工厂template.setConnectionFactory(connectionFactory);// 创建JSON序列化工具GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();// 设置Key的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);// 设置Value的序列化template.setValueSerializer(jsonRedisSerializer);template.setHashValueSerializer(jsonRedisSerializer);template.setDefaultSerializer(stringRedisSerializer);// 返回return template;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,RedisKeyExpirationListener redisKeyExpirationListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 监听 __keyevent@0__:expired 频道,这里的0指数据库编号为 0;container.addMessageListener(redisKeyExpirationListener,new PatternTopic("__keyevent@0__:expired"));return container;}@Beanpublic RedisKeyExpirationListener redisKeyExpirationListener() {return new RedisKeyExpirationListener();}// 其他 Bean 定义
}
五、实现监听类 RedisKeyExpirationListener
package com.lanqiaobei.ssm.yjk.util;import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.concurrent.TimeUnit;import static cn.hutool.core.util.IdUtil.randomUUID;
@Component
public class RedisKeyExpirationListener implements MessageListener {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Autowiredprivate PublisherMQ publisherMQ;//分布式锁过期时间 s 可以根据业务自己调节private static final Long LOCK_REDIS_TIMEOUT = 2000L;@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取过期的 Key,需要利用byte[]录入和接收,不然会出现中文乱码byte[] body = message.getBody();String allKey = redisTemplate.getStringSerializer().deserialize(body);String expiredKey = StrUtil.removePrefix(allKey, "todo:");//hutool工具里面去掉首位字符
// System.out.println(expiredKey);// 处理相应的业务逻辑
//————————————————————————————————————————————————————————String key = "todolock:"+expiredKey;String value = randomUUID();//redis尝试获取锁,加锁Boolean getLock = getLock(key,value);if(getLock){publisherMQ.sendMessage("lqb.direct","queueLQBKey",expiredKey);releaseLock(key,value);}}/*** 加锁**/public Boolean getLock(String key,String value){Boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(key,value);if (lockStatus) {System.out.println("Set key-value successfully!");redisTemplate.expire(key, LOCK_REDIS_TIMEOUT, TimeUnit.MILLISECONDS);//毫秒级} else {System.out.println("Key already exists!");}return lockStatus;}/*** 释放锁**/public Long releaseLock(String key,String value){String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);Long releaseStatus = (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);return releaseStatus;}
}
监听redis过期消息提醒,同一个数据(键)过期会有多次通知提醒。原因是:可能是由于 Redis 的主从复制或者分片集群等机制导致的。在主从复制或者分片集群中,可能会发生多个节点同时订阅了相同的键空间通知,从而导致同一个键空间事件被多次触发。
我的解决方法是:给键过期后提醒的回调函数加锁,收到多个通知提醒,回调函数加锁后最终只会有一个执行,其他没有获得锁的回调不会执行,这样就避免了重复执行任务代码。
这里的实现方法在另一个文章:
https://blog.csdn.net/m0_46652188/article/details/130394484