Spring Boot集成Redis实现Key事件监听 | Spring Cloud 33

news/2024/11/24 2:38:40/

一、前言

在前面我们通过以下章节对Redis使用有了基础的了解:

Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31

Spring Boot实现Redis多数据源动态切换 | Spring Cloud 32

此章节基于spring-boot-starter-data-redis模块,实现对Key事件监听。

二、应用场景

Redis 2.8.0之后版本提供允许客户发布订阅Pub/Sub功能。

2.1 使用场景举例

  • 订单在发货后15天自动确认收货
  • 设备以恒定时间发送心跳保持在线状态,超出恒定时间更新设备状态为离线

需要考虑消息丢失,重复监听/消费等问题

2.2 事件类型

  • keyspace为前缀的频道为键空间通知

    格式:__keyspace@<db>__:<key>

    针对指定key发生的一切改动,推送给订阅的客户端,侧重于针对指定key的操作。

    针对键空间通知后续单独整理一篇文章进行详细叙述。

  • keyevent为前缀的频道为键事件通知(本章重点

    格式:__keyevent@<db>__:<event>

    针对指定event发生的一切改动,推送给订阅的客户端,侧重于指定的event变化,如:expired(过期事件)、DEL(删除键事件)等,针对所有的event,不针对指定的event,如果要筛选event要通过代码完成。

三、客户端验证

此客户端验证只针对键事件通知

3.1 运行redis客户端

redis-cli

3.2 开启监听

 psubscribe __keyevent@*__:*

3.3 执行SETEX操作

新开启一个 redis客户端

SETEX mykey 10 redis

SETEX 为指定的 key 设置值及其过期时间。如果 key 已经存在, SETEX 命令将会替换旧的值。

在这里插入图片描述

3.4 查看监听

切换至开启监听的redis客户端

在这里插入图片描述

  • 发生set事件
  • 发生expire事件
  • 发生expired时间

四、集成Redis实现Key事件监听

4.1 实现原理

4.1.1 Redis发布订阅

  • 发布消息

    Redis 采用 publish 命令发送消息,其返回值为接收到该消息的订阅者的数量。

  • 订阅频道

    Redis 采用 subscribe命令订阅某个频道,其返回值包括客户端订阅的频道、目前已订阅的频道数量、以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。

  • 模式匹配

    模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用pSubscribe订阅符合某个模式所有频道,用“ * ”表示模式,“ * ”可以被任意值代替。

    假设客户端同时订阅了某种模式和符合该模式的某个频道,那么发送给这个频道的消息将被客户端接收到两次,只不过这两条消息的类型不同,一个是 message 类型,一个是 pmessage 类型,但其内容相同。

  • 取消订阅

    Redis采用 unsubscribepUnsubscribe命令取消订阅,其返回值与订阅类似。由于Redis的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在 unsubscribepUnsubscribeUNSUBSCRIBEPUNSUBSCRIBE 命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为0时,该客户端会自动退出订阅状态。

    如果使用 redis-cli 来运行 subscribe 命令之后就直接阻塞了进程,也无法再输入其他的命令将,所以unsubscribepUnsubscribe命令对于 redis-cli 来说其实是没有什么实质上的意义,只要在JedisLettuce 这类的 Redis 客户端连接才有其用处。

4.1.2 Spring Data Redis发布订阅实现

在消息(或事件)接收端(消费端),Spring Data Redis 可以通过直接命名或使用模式匹配订阅一个或多个频道Channel)。模式匹配方式非常有用,因为它不仅允许使用一个命令创建多个订阅,还可以侦听订阅时尚未创建的频道(只要它们与模式匹配)

Spring Data Redis在底层,RedisConnection 提供了 subscribepSubscribe 方法,分别映射 Redis 按频道订阅 subscribe 命令,按模式订阅 psubscribe 命令。它们语法如下:

subscribe channel [channel ...]
psubscribe pattern [pattern ...]

其中:channel 表示频道名称,pattern 表示模式字符串.

请注意,可以将多个频道(Channel)或多个匹配模式用作参数。为了更改连接的订阅或查询它是否正在侦听RedisConnection 提供了getSubscription()isSubscribed() 方法:

@Autowired
RedisMessageListenerContainer container;final Map<String, RedisConnection> map = new HashMap<>();@RequestMapping("/dosub")
public String doSubscribe(String channel) {RedisConnection connection = container.getConnectionFactory().getConnection();map.put(channel, connection);connection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {log.info("接收到消息");System.out.println(new String(message.getBody()));}}, channel.getBytes(StandardCharsets.UTF_8));return "订阅" + channel;
}@RequestMapping("/getsub")
public Subscription getSubscription(String channel) {RedisConnection connection = map.get(channel);Subscription subscription = connection.getSubscription();log.info("是否存在订阅状态:{}", subscription);return subscription;
}@RequestMapping("/dounsub")
public String doUnSubscribe(String channel) {RedisConnection connection = map.get(channel);if (connection.isSubscribed()) {Subscription subscription = connection.getSubscription();log.info("是否存在订阅状态:{}", subscription);subscription.subscribe(channel.getBytes(StandardCharsets.UTF_8));}return "取消订阅" + channel;
}

Spring Data Redis 中的订阅命令是阻塞模式。也就是说,在连接上调用 subscribe 会导致当前线程阻塞,等待消息。只有当订阅被取消时,线程才会被释放,你可以在另一个线程上,对订阅消息的连接调用 unsubscribepUnsubscribe 方法释放。

如前所述在Spring Data Redis 中,一旦订阅,连接就会开始等待消息。此时,连接仅允许添加新的订阅、修改现有订阅、取消现有订阅的命令。如果调用 subscribepSubscribeunsubscribepUnsubscribe 之外的任何命令都会引发异常。

此处指的连接为JedisLettuce 这类的 Redis 客户端连接,并不包括 redis-cli

为了订阅消息,基于Spring Data Redis,需要实现 MessageListener 回调。每次新消息到达时,都会调用回调,并通过 onMessage 方法执行用户业务代码。该接口不仅可以访问实际消息,还可以访问接收消息的频道(Channel),以及订阅时用于匹配频道(Channel)的模式。此信息使被调用者不仅可以通过内容区分各种消息,还可以检查其他详细信息。

MessageListener 接口源码如下:

package org.springframework.data.redis.connection;import org.springframework.lang.Nullable;/*** Listener of messages published in Redis.* 在 Redis 上发布的消息的监听者。* @author Costin Leau* @author Christoph Strobl*/
public interface MessageListener {/*** Callback for processing received objects through Redis.* 通过 Redis 处理接收到的对象的回调。* @param message message must not be {@literal null}.*                消息对象,不能为 null* @param pattern pattern matching the channel (if specified) - can be {@literal null}.*                与通道匹配的模式(如果指定)-可以为空。*/void onMessage(Message message, @Nullable byte[] pattern);
}

由于其阻塞特性,低级订阅不具吸引力,因为它需要对每个侦听器进行连接和线程管理。为了缓解这个问题,Spring Data Redis提供了RedisMessageListenerContainer 类,它可以完成所有繁重的工作。

RedisMessageListenerContainer 充当消息侦听器容器。它用于接收来自 Redis 频道(Channel)的消息,并驱动注入其中的MessageListener 实例。

侦听器容器负责消息接收的所有线程,并将消息发送到侦听器进行处理。消息侦听器容器是 MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这让您作为应用程序开发人员可以编写与接收消息相关的业务逻辑。

MessageListener 还可以实现 SubscriptionListener,以便在确认订阅/取消订阅时接收通知。同步调用时,侦听订阅通知非常有用。

此外,为了最小化应用程序占用空间,RedisMessageListenerContainer 允许多个侦听器共享一个连接和一个线程,即使它们不共享频道(Channel)。因此,无论应用程序跟踪多少侦听器或频道(Channel),运行时成本在其整个生命周期中都保持不变。

此外,容器允许运行时更改配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有侦听器都被取消订阅,则会自动执行清理,并释放线程。

所以基于Spring Data Redis订阅键事件通知实现方式共有两种方式:

  • 实现MessageListener监听器,重写OnMessage方法
  • 继承KeyspaceEventMessageListener监听器,重写doHandleMessage方法

以下分别讲述详细实现方式。

4.2 实现MessageListener监听器方式

4.2.1 自定义监听器RedisListener

com/gm/key/listen/listener/RedisListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String key = message.toString();RedisSerializer<?> serializer = redisTemplate.getValueSerializer();String channel = String.valueOf(serializer.deserialize(message.getChannel()));log.info("redis key: {} , channel: {}", key, channel);}
}

4.2.2 开启监听及监听范围及事件类型

com/gm/key/listen/config/RedisListenerConfig.java

package com.gm.key.listen.config;import com.gm.key.listen.listener.RedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;@Configuration
public class RedisListenerConfig {@AutowiredRedisListener redisListener;private static final Topic TOPIC_EXPIRED_KEYEVENTS = new PatternTopic("__keyevent@*__:expired");private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.addMessageListener(redisListener, TOPIC_ALL_KEYEVENTS);return container;}
}

4.3 继承KeyspaceEventMessageListener监听器方式

4.3.1 自定义监听器RedisKeyAllEventListener

KeyspaceEventMessageListener默认监听范围:__keyevent@*

com/gm/key/listen/listener/RedisKeyAllEventListener.java

@Slf4j
@Component
public class RedisKeyAllEventListener extends KeyspaceEventMessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public RedisKeyAllEventListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overrideprotected void doHandleMessage(Message message) {String key = message.toString();RedisSerializer<?> serializer = this.redisTemplate.getKeySerializer();String channel = String.valueOf(serializer.deserialize(message.getChannel()));log.info("redis key: {} , channel: {}", key, channel);}
}

此方式不需要在RedisMessageListenerContainer中重复进行监听配置,通过@Compont注解,让IOC容器发现即可。

4.3.2 自定义监听器RedisKeyExpiredListener

KeyspaceEventMessageListener默认监听范围:__keyevent@*__:expired,只针对expired(过期事件)进行监听

com/gm/key/listen/listener/RedisKeyExpiredListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] pattern) {log.info("redis expired key: {}", message.toString());}
}

此方式不需要在RedisMessageListenerContainer中重复进行监听配置,通过@Compont注解,让IOC容器发现即可。


http://www.ppmy.cn/news/37923.html

相关文章

LeetCode222. 完全二叉树的节点个数(二分查找+二进制表示路径法)

一、题目描述 给你一棵 完全二叉树 的根节点 root &#xff0c;求出该树的节点个数。 完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&#xff0c;其余每层节点数都达到最大值&#xff0c;并且最下面一层的节点都集中在该层最左边的…

【vim进阶】vim编辑器的分屏操作(分屏显示文件,关闭分屏,分屏间光标的移动,移动分屏)

一、分屏显示文件 VIM 可以实现分屏操作&#xff0c;一个屏幕被多个文件给分占&#xff0c;有左右和上下两种分屏的方式。 方法一&#xff1a;启动分屏 左右分屏如下操作&#xff1a; vim -On file1 file2 ... filenn是数字&#xff0c;表示分屏的数量,n要大于等于文件个数…

设计模式---装饰模式

目录 介绍 实现 优缺点 装饰模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其结构。这种类型的设计模式属于结构型模式&#xff0c;它是作为现有类的一个包装。这种模式创建了一个装饰类&#xff0c;用来包装原有…

【新2023Q2押题JAVA】华为OD机试 - 找字符

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为od机试,独家整理 已参加机试人员的实战技巧本篇题解:找字符 题目 给定两个字符串…

目标检测算法——农业作物开源数据集汇总(收藏)

&#x1f384;&#x1f384;近期&#xff0c;小海带在空闲之余收集整理了一批农业作物开源数据集资源供大家参考。 整理不易&#xff0c;小伙伴们记得一键三连喔&#xff01;&#xff01;&#xff01;&#x1f388;&#x1f388; 一、农作物图像分类&#xff08;小麦、睡到、甘…

第02章_MySQL环境搭建

第02章_MySQL环境搭建 &#x1f3e0;个人主页&#xff1a;shark-Gao &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是shark-Gao&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f389;目前状况&#xff1a;23届毕业生&#xff0c;目前…

jsp823科研项目申报管理网站cc94程序mysql+java

具体功能 1&#xff0e;系统登录&#xff1a;系统登录是用户访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括用户名、密码和验证码&#xff0c;然后对登录进来的用户判断身份信息&#xff0c;判断是管理员用户还是普通用户。 2&#xff0e;科研项目申请&#xf…

怎么把pdf转换成高清图片

怎么把pdf转换成高清图片&#xff1f;可以使用以下两种方法&#xff1a; 方法一&#xff1a;使用Adobe Acrobat Pro DC 1、打开需要转换的PDF文件&#xff0c;点击“文件”菜单中的“导出为”&#xff0c;在弹出的菜单中选择“图像”&#xff0c;然后选择“JPEG”。 2、在“…