Redis发布-订阅模式之Channel的发布订阅

news/2024/10/22 5:00:05/

文章目录

  • 一、简介
  • 二、通过频道(Channel)实现
  • 三、代码示例
    • (1)发布者发送消息
    • (2)订阅者订阅频道

一、简介

Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 publish发布消息,订阅者通过 subcribe订阅接收消息或通过unsubcribe取消订阅。

主要包含三个部分组成:发布者、订阅者、Channel。

发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。

二、通过频道(Channel)实现

  • 订阅者订阅频道
  • 发布者向「频道」发布消息
  • 所有订阅「频道」的订阅者收到消息

三、代码示例

(1)发布者发送消息

其实就是定义一个channel管道,填上合适的topic,后续订阅者根据这个topic来订阅消息。

String channel = new ChannelTopic(MATCH_LOG_REDIS_TOPIC).getTopic();
jedis.publish(channel, message);

(2)订阅者订阅频道

@Beanpublic MessageListenerAdapter listenerAdapter(MatchLogMsgHandler matchLogMsgHandler) {MessageListenerAdapter adapter = new MessageListenerAdapter(matchLogMsgHandler, "onMessage");return adapter;}
    @Beanpublic RedisMessageListenerContainer container(@Qualifier("jedisConnectionFactory4Search") RedisConnectionFactory connectionFactory, Executor redisMqAsyncExecutor,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setTaskExecutor(redisMqAsyncExecutor);container.addMessageListener(listenerAdapter, new PatternTopic(MATCH_LOG_REDIS_TOPIC));return container;}
@Component
@Slf4j
public class MatchLogMsgHandler {public void onMessage(String message, String pattern) {log.info("消费match日志:{}", message);try {//将从channel拿到的message进行json转换为对象Map<String, String> map = JSONObject.parseObject(message, Map.class);String userName = map.get("userName");String data = map.get("data");WebSocketServer.sendInfo(data, userName);} catch (Exception e) {log.error("MatchLogMsgHandler处理对码日志消息失败", e);}}
}

这三片代码的看着似乎有点复杂,其实很简单,听我娓娓道来…
其实这三块代码都被springboot的注解变成了bean对象,并交给了springboot来托管。

那么订阅者是怎么订阅到发布者发布的消息呢?

  • 先看第二块代码,这个container其实是在bean加载的流程中,将new PatternTopic(MATCH_LOG_REDIS_TOPIC)这个topic和listenerAdapter的监听器绑定到一块,代表着listenerAdapter的这个监听器只监听这个topic
  • 再看第一块代码,就是声明了一个listenerAdapter监听器,这个监听器监听到消息之 - 后,通过反射调用执行matchLogMsgHandler类中的onMessage方法
  • 最后一块代码,就是创建了matchLogMsgHandler类,在onMessage方法中来具体的处理我们的逻辑。

------------------------------完结撒花------------------------------


http://www.ppmy.cn/news/1442577.html

相关文章

【C++】:拷贝构造函数和赋值运算符重载

目录 一&#xff0c;拷贝构造函数1. 什么是拷贝构造函数2. 拷贝构造函数的特性3. 实践总结 二&#xff0c;赋值运算符重载2.1 运算符重载2.2 赋值运算符重载 一&#xff0c;拷贝构造函数 1. 什么是拷贝构造函数 拷贝构造函数是特殊的构造函数。是用一个已经存在的对象&#x…

06 华三防火墙的如何进入web页面?

1 AI 思路 要进入华三防火墙的Web页面,你需要按照以下步骤操作: 确定防火墙的IP地址:首先,你需要知道你的华三防火墙的IP地址。通常,你可以从网络管理员或者设备本身获取这个信息。 打开浏览器:在你的电脑上打开一个网页浏览器,例如Chrome、Firefox或者Edge等。 输入UR…

Spring Boot 如何实现缓存预热

Spring Boot 实现缓存预热 1、使用启动监听事件实现缓存预热。2、使用 PostConstruct 注解实现缓存预热。3、使用 CommandLineRunner 或 ApplicationRunner 实现缓存预热。4、通过实现 InitializingBean 接口&#xff0c;并重写 afterPropertiesSet 方法实现缓存预热。 1、使用…

基础SQL 函数

在MySQL中内置了很多函数&#xff0c;我们可以通过一段程序或者代码直接调用这个函数 一、字符串函数 下面通过例子来验证这些函数 -- 字符串函数-- concat函数 select concat("hello ","world");-- lower函数 select lower("HELLO");-- upper函…

【Flink入门修炼】2-3 Flink Checkpoint 原理机制

如果让你来做一个有状态流式应用的故障恢复&#xff0c;你会如何来做呢&#xff1f; 单机和多机会遇到什么不同的问题&#xff1f; Flink Checkpoint 是做什么用的&#xff1f;原理是什么&#xff1f; 一、什么是 Checkpoint&#xff1f; Checkpoint 是对当前运行状态的完整记…

功能测试_分类_用例_方法

总结 测试分类 按阶段分类 是否查看源代码分类 是否运行分类 是否自动化 其他分类 软件质量模型 开发模型-瀑布模型 测试过程模型 v w 测试用例八大要素 用例编号 用例标题 …

C#设计树形程序界面的方法:创建特殊窗体

目录 1.TreeView控件 2.实例 &#xff08;1&#xff09;Resources.Designer.cs &#xff08;2&#xff09;Form1.Designer.cs &#xff08;3&#xff09;Form1.cs &#xff08;4&#xff09;生成效果 以树形来显示程序的菜单&#xff0c;可以更直观、更快捷地对窗体进行…

mysql事故复盘: 单行字节最大阈值65535字节(原创)

背景 记得还在银行做开发&#xff0c;投产上线时&#xff0c;项目发版前&#xff0c;要提DDL的sql工单&#xff0c;mysql加1个字段&#xff0c;因为这张表为下游数据入湖入仓用的&#xff0c;长度较大。在测试库加字段没问题&#xff0c;但生产库字段加不上。 先说结论 投产…