一、前言
在前面我们通过以下章节对Redis
使用有了基础的了解:
Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31
Spring Boot实现Redis多数据源动态切换 | Spring Cloud 32
此章节基于spring-boot-starter-data-redis
模块,实现对Key
事件监听。
二、应用场景
Redis 2.8.0
之后版本提供允许客户发布订阅Pub/Sub
功能。
2.1 使用场景举例
- 订单在发货后15天自动确认收货
- 设备以恒定时间发送心跳保持在线状态,超出恒定时间更新设备状态为离线
需要考虑消息丢失,重复监听/消费等问题
2.2 事件类型
-
以
keyspace
为前缀的频道为键空间通知格式:
__keyspace@<db>__:<key>
针对指定
key
发生的一切改动,推送给订阅的客户端,侧重于针对指定key
的操作。针对键空间通知后续单独整理一篇文章进行详细叙述。
-
以
keyevent
为前缀的频道为键事件通知(本章重点)格式:
__keyevent@<db>__:<event>
针对指定
event
发生的一切改动,推送给订阅的客户端,侧重于指定的event
变化,如:expired
(过期事件)、DEL
(删除键事件)等,针对所有的event
,不针对指定的event
,如果要筛选event
要通过代码完成。
三、客户端验证
此客户端验证只针对键事件通知
3.1 运行redis客户端
redis-cli
3.2 开启监听
psubscribe __keyevent@*__:*
3.3 执行SETEX操作
新开启一个
redis
客户端
SETEX mykey 10 redis
SETEX
为指定的key
设置值及其过期时间。如果key
已经存在,SETEX
命令将会替换旧的值。
3.4 查看监听
切换至开启监听的
redis
客户端
- 发生
set
事件 - 发生
expire
事件 - 发生
expired
时间
四、集成Redis实现Key事件监听
4.1 实现原理
4.1.1 Redis发布订阅
-
发布消息
Redis
采用publish
命令发送消息,其返回值为接收到该消息的订阅者的数量。 -
订阅频道
Redis
采用subscribe
命令订阅某个频道,其返回值包括客户端订阅的频道、目前已订阅的频道数量、以及接收到的消息,其中subscribe
表示已经成功订阅了某个频道。 -
模式匹配
模式匹配功能允许客户端订阅符合某个模式的频道,
Redis
采用pSubscribe
订阅符合某个模式所有频道,用“ * ”
表示模式,“ * ”
可以被任意值代替。假设客户端同时订阅了某种模式和符合该模式的某个频道,那么发送给这个频道的消息将被客户端接收到两次,只不过这两条消息的类型不同,一个是
message
类型,一个是pmessage
类型,但其内容相同。 -
取消订阅
Redis
采用unsubscribe
和pUnsubscribe
命令取消订阅,其返回值与订阅类似。由于Redis
的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在unsubscribe
,pUnsubscribe
,UNSUBSCRIBE
和PUNSUBSCRIBE
命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为0
时,该客户端会自动退出订阅状态。如果使用
redis-cli
来运行subscribe
命令之后就直接阻塞了进程,也无法再输入其他的命令将,所以unsubscribe
和pUnsubscribe
命令对于redis-cli
来说其实是没有什么实质上的意义,只要在Jedis
、Lettuce
这类的Redis
客户端连接才有其用处。
4.1.2 Spring Data Redis发布订阅实现
在消息(或事件)接收端(消费端),Spring Data Redis
可以通过直接命名或使用模式匹配订阅一个或多个频道(Channel
)。模式匹配方式非常有用,因为它不仅允许使用一个命令创建多个订阅,还可以侦听订阅时尚未创建的频道(只要它们与模式匹配)
Spring Data Redis
在底层,RedisConnection
提供了 subscribe
和 pSubscribe
方法,分别映射 Redis
按频道订阅 subscribe
命令,按模式订阅 psubscribe
命令。它们语法如下:
subscribe channel [channel ...]
psubscribe pattern [pattern ...]
其中:channel
表示频道名称,pattern
表示模式字符串.
请注意,可以将多个频道(Channel
)或多个匹配模式用作参数。为了更改连接的订阅或查询它是否正在侦听,RedisConnection
提供了getSubscription()
和 isSubscribed()
方法:
@Autowired
RedisMessageListenerContainer container;final Map<String, RedisConnection> map = new HashMap<>();@RequestMapping("/dosub")
public String doSubscribe(String channel) {RedisConnection connection = container.getConnectionFactory().getConnection();map.put(channel, connection);connection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {log.info("接收到消息");System.out.println(new String(message.getBody()));}}, channel.getBytes(StandardCharsets.UTF_8));return "订阅" + channel;
}@RequestMapping("/getsub")
public Subscription getSubscription(String channel) {RedisConnection connection = map.get(channel);Subscription subscription = connection.getSubscription();log.info("是否存在订阅状态:{}", subscription);return subscription;
}@RequestMapping("/dounsub")
public String doUnSubscribe(String channel) {RedisConnection connection = map.get(channel);if (connection.isSubscribed()) {Subscription subscription = connection.getSubscription();log.info("是否存在订阅状态:{}", subscription);subscription.subscribe(channel.getBytes(StandardCharsets.UTF_8));}return "取消订阅" + channel;
}
Spring Data Redis
中的订阅命令是阻塞模式。也就是说,在连接上调用 subscribe
会导致当前线程阻塞,等待消息。只有当订阅被取消时,线程才会被释放,你可以在另一个线程上,对订阅消息的连接调用 unsubscribe
或 pUnsubscribe
方法释放。
如前所述在Spring Data Redis
中,一旦订阅,连接就会开始等待消息。此时,连接仅允许添加新的订阅、修改现有订阅、取消现有订阅的命令。如果调用 subscribe
、pSubscribe
、unsubscribe
或 pUnsubscribe
之外的任何命令都会引发异常。
此处指的连接为
Jedis
、Lettuce
这类的Redis
客户端连接,并不包括redis-cli
。
为了订阅消息,基于Spring Data Redis
,需要实现 MessageListener
回调。每次新消息到达时,都会调用回调,并通过 onMessage
方法执行用户业务代码。该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel
),以及订阅时用于匹配频道(Channel
)的模式。此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。
MessageListener
接口源码如下:
package org.springframework.data.redis.connection;import org.springframework.lang.Nullable;/*** Listener of messages published in Redis.* 在 Redis 上发布的消息的监听者。* @author Costin Leau* @author Christoph Strobl*/
public interface MessageListener {/*** Callback for processing received objects through Redis.* 通过 Redis 处理接收到的对象的回调。* @param message message must not be {@literal null}.* 消息对象,不能为 null* @param pattern pattern matching the channel (if specified) - can be {@literal null}.* 与通道匹配的模式(如果指定)-可以为空。*/void onMessage(Message message, @Nullable byte[] pattern);
}
由于其阻塞特性,低级订阅不具吸引力,因为它需要对每个侦听器进行连接和线程管理。为了缓解这个问题,Spring Data Redis
提供了RedisMessageListenerContainer
类,它可以完成所有繁重的工作。
RedisMessageListenerContainer
充当消息侦听器容器。它用于接收来自 Redis
频道(Channel
)的消息,并驱动注入其中的MessageListener
实例。
侦听器容器负责消息接收的所有线程,并将消息发送到侦听器进行处理。消息侦听器容器是 MDP
和消息传递提供者之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这让您作为应用程序开发人员可以编写与接收消息相关的业务逻辑。
MessageListener
还可以实现 SubscriptionListener
,以便在确认订阅/取消订阅时接收通知。同步调用时,侦听订阅通知非常有用。
此外,为了最小化应用程序占用空间,RedisMessageListenerContainer
允许多个侦听器共享一个连接和一个线程,即使它们不共享频道(Channel
)。因此,无论应用程序跟踪多少侦听器或频道(Channel
),运行时成本在其整个生命周期中都保持不变。
此外,容器允许运行时更改配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。容器使用延迟订阅方法,仅在需要时使用RedisConnection
。如果所有侦听器都被取消订阅,则会自动执行清理,并释放线程。
所以基于Spring Data Redis
订阅键事件通知实现方式共有两种方式:
- 实现
MessageListener
监听器,重写OnMessage
方法 - 继承
KeyspaceEventMessageListener
监听器,重写doHandleMessage
方法
以下分别讲述详细实现方式。
4.2 实现MessageListener监听器方式
4.2.1 自定义监听器RedisListener
com/gm/key/listen/listener/RedisListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String key = message.toString();RedisSerializer<?> serializer = redisTemplate.getValueSerializer();String channel = String.valueOf(serializer.deserialize(message.getChannel()));log.info("redis key: {} , channel: {}", key, channel);}
}
4.2.2 开启监听及监听范围及事件类型
com/gm/key/listen/config/RedisListenerConfig.java
:
package com.gm.key.listen.config;import com.gm.key.listen.listener.RedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;@Configuration
public class RedisListenerConfig {@AutowiredRedisListener redisListener;private static final Topic TOPIC_EXPIRED_KEYEVENTS = new PatternTopic("__keyevent@*__:expired");private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.addMessageListener(redisListener, TOPIC_ALL_KEYEVENTS);return container;}
}
4.3 继承KeyspaceEventMessageListener监听器方式
4.3.1 自定义监听器RedisKeyAllEventListener
KeyspaceEventMessageListener
默认监听范围:__keyevent@*
com/gm/key/listen/listener/RedisKeyAllEventListener.java
:
@Slf4j
@Component
public class RedisKeyAllEventListener extends KeyspaceEventMessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public RedisKeyAllEventListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overrideprotected void doHandleMessage(Message message) {String key = message.toString();RedisSerializer<?> serializer = this.redisTemplate.getKeySerializer();String channel = String.valueOf(serializer.deserialize(message.getChannel()));log.info("redis key: {} , channel: {}", key, channel);}
}
此方式不需要在
RedisMessageListenerContainer
中重复进行监听配置,通过@Compont
注解,让IOC
容器发现即可。
4.3.2 自定义监听器RedisKeyExpiredListener
KeyspaceEventMessageListener
默认监听范围:__keyevent@*__:expired
,只针对expired
(过期事件)进行监听
com/gm/key/listen/listener/RedisKeyExpiredListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] pattern) {log.info("redis expired key: {}", message.toString());}
}
此方式不需要在
RedisMessageListenerContainer
中重复进行监听配置,通过@Compont
注解,让IOC
容器发现即可。