一、基本介绍
Redis 发布/订阅是一种消息传模式,其中发送者发送消息,而接收者(订阅者)接收消息。传递消息的通道称为channel。
例如下图的工流程,当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端。
二、对比
有人可能之前使用过RabbitMQ做消息队列,此时难免会产生疑惑,它与RabbitMQ有什么异同呢?它们的使用场景又都是什么呢?
Redis发布订阅(Pub/Sub)
- Redis的发布订阅功能提供了一种一对多的消息通信模式,其中发布者发送消息,订阅者接收消息。
- 它是轻量级的,主要适用于简单的消息广播场景。
- 它的速度非常非常非常快,但是不能持久化。
它适用于实时性要求高、但对消息可靠性要求不高的场景,如实时通知、缓存更新、一、二级别缓存同步等。
RabbitMQ消息队列
- RabbitMQ是一个实现了高级消息队列协议(AMQP)的消息队列服务,它支持多种消息传递模式,包括点对点、发布订阅和路由等。
- 它是一个功能齐全的消息代理,提供了可靠的消息传递、消息持久化、消息确认和灵活的路由机制。
它适用于需要确保消息可靠送达、处理复杂业务逻辑和分布式系统的场景,速度上要慢于redis发布订阅,并且集成时比较重量级。
对比总结:如果需要可靠的消息传递和复杂的消息路由,RabbitMQ是更好的选择。如果只需要简单的消息广播,并且对性能有较高的要求,Redis的发布订阅可能就足够了。
三、实战
1、首先在你的springboot中引入redis,并能确保使用RedisTemplate这一步就不多做赘述了。
2、创建一个你要发送的消息实体,例如我创建了一个CaffeineCacheMessage消息类。注意因为需要网络流转,所以一定要实现序列化Serializable接口。
public class CaffeineCacheMessage implements Serializable {private String cacheName;private Object key;private Object value;private Integer type;public String getCacheName() {return cacheName;}public void setCacheName(String cacheName) {this.cacheName = cacheName;}public Object getKey() {return key;}public void setKey(Object key) {this.key = key;}public Object getValue() {return value;}public void setValue(Object value) {this.value = value;}public Integer getType() {return type;}public void setType(Integer type) {this.type = type;}
3、创建一个接收到消息的处理类RedisReceiver,用来走收到消息后的逻辑处理。
//收到消息进行逻辑处理
@Component
public class RedisReceiver{private static Logger log = LoggerFactory.getLogger(RedisReceiver.class);/**接收消息的方法*/public void receiveMessage(String message){log.info("接收到了消息:{}",message);// ...你自己的业务逻辑}}
4、关键点来了,创建一个自动配置类RdMessageListenerConfiguration,来设定和封装消息发送通道,消息接收类和接收方法。
注意我这里的MessageListenerAdapter listenerAdapter(RedisReceiver receiver)
方法,它一定要与第三步的方法能对应上,不然监听器使用代理对象调用方法时就会找不到方法,报空指针异常。
/*** 用Redis的发布/订阅模式,来同步消息* @author: chenggh*/
@Configuration
public class RdMessageListenerConfiguration {@Value("${xxxx.topic.id}")String topicId;@AutowiredStringRedisTemplate redisTemplate;private static Logger log = LoggerFactory.getLogger(RdMessageListenerConfiguration.class);/*** redis消息监听器容器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理* @param connectionFactory* @param listenerAdapter* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅了一个叫chat 的通道container.addMessageListener(listenerAdapter, new PatternTopic(topicId));//这个container 可以添加多个 messageListenerreturn container;}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法* @param receiver* @return*/@BeanMessageListenerAdapter listenerAdapter(RedisReceiver receiver) {//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看return new MessageListenerAdapter(receiver, "receiveMessage");}public void publishUpdateMessage(Object key, Object value, String cacheName, Integer type) {CaffeineCacheMessage cacheMessage = new CaffeineCacheMessage();cacheMessage.setKey(key);cacheMessage.setValue(value);cacheMessage.setType(type);cacheMessage.setCacheName(cacheName);redisTemplate.convertAndSend(topicId, JSON.toJSONString(cacheMessage));}}
至此,redis的发布订阅实战就结束了。代码不是很多,但是设计和实现的很精巧,每一步的逻辑都清晰的写在了注释里,可以拿去即用。★,°:.☆( ̄▽ ̄)/$:.°★ 。