2024.2.23 模拟实现 RabbitMQ —— 实现消费消息逻辑

news/2025/1/3 6:18:23/

目录

引言

函数式接口

消费者订阅消息 实现思路

关于消息确认


引言

函数式接口

  • Lambda 表达式的本质是匿名函数
  • Java 函数无法脱离类而存在,所以 Java 通过引入函数式接口以支持 Lambda 表达式

特性:

  1. 函数式接口为一个 interface 类
  2. 该类中有且仅有一个方法
  3. 该类需加上 @FunctionalInterface 注解

注意:

  • 上述三点其实就是 Lambda 的本质,即底层实现

消费者订阅消息 实现思路

1、让 broker server 把有哪些消费者管理好

  • 消费者调用 basicConsume 方法就是订阅某个指定队列的消息

注意:

  • 消费者是以队列为纬度订阅的
  • 一个队列可以有多个消费者

  • 约定 消费者之间按照 轮询 的方式进行消费

代码编写:

  • 定义一个 ConsumerEnv 类,用来描述一个消费者
  • 该类中也会包含一些消费者消费过程中用到的数据
import lombok.Data;/*
* 表示一个消费者(完整的执行环境)
* */
@Data
public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;
//    通过这个回调来处理收到的消息private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}
  • 给每个队列对象(MSGQueue 对象)添加属性 List,用于存储该队列的 消费者对象
//    当前队列都有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//    记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);
//    添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}
//    订阅者的删除暂时不考虑
//    挑选一个订阅者,用来处理当前的消息 (按照轮询的方式)public ConsumerEnv chooseConsumer() {if(consumerEnvList.size() == 0) {
//            该队列没有人订阅return null;}
//        计算一下当前要取的元素的下标int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndDecrement();return consumerEnvList.get(index);}

2、消费者 订阅队列消息,并使用该消息完成明确好的业务逻辑

  • 所谓 消费者 消费消息,其实就是让线程池 执行对应消费者中的回调函数
  • 通过回调函数,将消息的内容通过参数传递
  • 回调函数中的内容由消费者编写,具体里面要干啥,取决于消费者自己的业务逻辑

代码编写:

  • 此处我们使用 函数式接口 的方式,让消费者在订阅消息时,明确使用该消息进行的业务逻辑是什么
import com.example.demo.mqserver.core.BasicProperties;/*
* 只是一个单纯的函数式接口(回调函数),收到消息之后要处理消息时调用的方法
* */
@FunctionalInterface
public interface Consumer {
//    Delivery 的意思是 "投递",这个方法预期是在每次服务器收到消息之后,来调用
//    通过这个方法把消息推送给对应的消费者
//    (注意!!这里的方法名和参数,也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);
}
  • 在 VirtualHost 中实现消费者订阅某个队列的消息
//    订阅消息
//    添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
//    consumerTag:表示消费者的身份标识
//    autoAck:消息被消费完成后,应答的方式,为 true 自动应答,为 false 手动应答
//    consumer:是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
//        构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 Consumer 对象添加到该队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName = " + queueName);return true;}catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName = " + queueName);e.printStackTrace();return false;}}

3、队列收到消息,并将消息推送给订阅该队列的消费者

  • 为了能够让 线程池 知道要执行哪个回调函数及其参数中的 消息 来自于哪个队列
  • 我们定义一个单独的扫描线程,用于感知哪个队列收到了新消息

问题一:

  • 为啥搞了扫描线程,还要再搞个线程池呢?
  • 既让该扫描线程获取 消息和 消费者的回调函数,又让其执行回调函数不就行了?

回答:

  • 由于消费者编写的回调函数,具体是干啥的,我们并不知道
  • 如果是比较耗时的业务逻辑的话,此时仅由一个线程来完成上述这些操作,就可能周转不开了

问题二:

  • 当前有多个队列,但扫描线程就一个,扫描线程如何知道是哪个队列中 来了新消息呢?

方案一:

  • 直接让扫描线程不停的循环遍历所有对列,如果发现有新的元素就立即处理

方案二:

  • 引入一个阻塞队列,哪个队列新增了一个消息,就将哪个队列的名字放入 阻塞队列中
  • 此时 扫描线程 仅需要盯住这阻塞队列即可
  • 阻塞队列中队列名相当于 "令牌",扫描线程从阻塞队列中取队列名,进而再根据队列名,从对应的队列中取一个消息

回答:

  • 此处我们采用方案二!

代码编写:

  • 此处我们实现一个 ConsumerManager 类,用于实现消费消息的核心逻辑
import com.example.demo.common.Consumer;
import com.example.demo.common.ConsumerEnv;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.VirtualHost;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;/*
* 通过这个类来实现消费消息的核心逻辑
* */
public class ConsumerManager {
//    持有上层的 VirtualHost 对象的引用,用来操作数据private VirtualHost parent;
//    指定一个线程池,负责执行具体的回调任务private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//    存放令牌的阻塞队列private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//    扫描线程private Thread scannerThread = null;public ConsumerManager(VirtualHost p) {this.parent = p;scannerThread = new Thread(() -> {while (true) {try {
//                    1、拿到令牌String queueName = tokenQueue.take();
//                    2、根据令牌找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现,该队列名不存在!queueName = " + queueName);}
//                    3、从队列中消费消费一个消息synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});
//        把线程设为后台线程scannerThread.setDaemon(true);scannerThread.start();}//    这个方法的调用时机就是发送消息的时候public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//        找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName = " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);
//           如果当前队列中已经有一些消息了,需要立即就消费掉int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0;i < n;i++) {
//               这个方法调用一次就消费一条消息consumeMessage(queue);}}}private void consumeMessage(MSGQueue queue) {
//        1、按照轮询的方式,找个消费者出来ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog == null) {
//            当前队列没有消费者,暂时不消费,等后面有消费者出现再说return;}
//        2、从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null) {
//            当前队列中还没有消息,也不需要消费return;}
//        3、把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() -> {try {
//            1) 把消息放入待确认的集合中,这个操作必须要在执行回调之前parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
//            2) 真正执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
//            3) 如果当前是 "自动应答",就可以直接把消息删除了
//               如果当前是 "手动应答",则先不处理,交给后续消费者调用 basicAck 方法来处理if(luckyDog.isAutoAck()) {
//                    a.删除硬盘上的消息if(message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue,message);}
//                    b.删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());
//                    c.删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费 queueName = " + queue.getName());}}catch (Exception e) {e.printStackTrace();}});}
}

关于消息确认

  • 为了能够确保消息是被正确的消费掉了,我们需要引入 消息确认 机制
  • 即消费者的回调方法 顺利执行完(未抛异常啥的),那么这条消息的历史使命就算完成了,该消息也就可以被删除了
  • 消息确认机制 也就是为了保证 消息不丢失

具体思路

  1. 在真正执行回调之前,先将该消息放到 "待确认集合" 中,避免因为回调失败,导致消息的丢失
  2. 真正执行回调
  3. 当前消费者采取的是 autoAck=true 自动应答,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了(硬盘、消息中心、待确认集合)
  4. 当前消费者采取的是 autoAck=false 手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API

basicAck 代码编写:

public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {
//            1、获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if(message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在!messageId = " + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 要确认的消息不存在!queueName = " + queueName);}
//            2、删除硬盘上的数据if(message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue,message);}
//            3、删除消息中心的数据memoryDataCenter.removeMessage(messageId);
//            4、删除待确认的集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println("[VirtualHost] basicAck 成功!消息被确认成功!queueName = " + queueName+ ", messageId = " + messageId);return true;}catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败!消息确认失败!queueName = " + queueName+ ", messageId = " + messageId);e.printStackTrace();return false;}}

问题一:

  • 执行回调方法的过程中抛异常了会产生什么影响?

回答:

  • 当回调方法抛异常,后续逻辑便会执行不到,此时该消息就会始终在 待确认的集合中
  • RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 里面不叫线程,人家是叫线程,但是这个进程不是操作系统的进程,而是 erlang 中的概念)
  • 由该线程负责关注 待确认集合中,每个待确认的消息呆多久了,如果呆的时间超出了范围就会把这个消息放到一个特定的队列 "死信队列"
  • 当然,死信对列 也是程序员手动配置的,但此处我们并未实现 死信队列逻辑

问题二:

  • 执行回调过程中,broker server 崩溃了,其中的内存数据全没了,此时有什么影响?

回答:

  • 此时硬盘数据还是在的!
  • 正在消费的这个消息,在硬盘中仍然存在
  • broker server 重启之后,这个消息就又被加载回内存了,就像从来没有消费过一样
  • 消费者就有机会重新消费到这个消息
  • 当然重复消费的问题,应该由消费者的业务代码负责保证,broker server 管不了!

http://www.ppmy.cn/news/1360881.html

相关文章

能力组队 | 求最多可以派出多少支团队(C 语言)

题目 用数组代表每个人的能力&#xff0c;一个比赛活动要求&#xff0c;参赛团队的最低能力值为N&#xff0c;每个团队可以由一人或者两人组成&#xff0c;且一个人只能参加一个团队&#xff0c;计算出最多可以派出多少只符合要求的队伍。 输入 第一行代表总人数&#xff0c…

GPT-SoVITS 快速声音克隆使用案例:webui、api接口

参考: https://github.com/RVC-Boss/GPT-SoVITS 环境: Python 3.10 PyTorch 2.1.2, CUDA 12.0 安装包: 1、使用: 1)下载项目 git clone https://github.com/RVC-Boss/GPT-SoVITS.git2)下载预训练模型 https://huggingface.co/lj1995/GPT-SoVITS 下载模型文件放到GPT…

redis 异步队列

//produceMessage.ts 模拟生产者 import Redis from ioredis; const redis new Redis(); // 生产者&#xff1a;将消息推送到队列 async function produceMessage(queueName:string, message:string) {try {await redis.rpush(queueName, message);console.log(Produced messa…

数据结构D4作业

1.实现单向循环链表的功能 loop.c #include "loop.h" loop_p create_loop() { loop_p H(loop_p)malloc(sizeof(loop)); if(HNULL) { printf("创建失败\n"); return NULL; } H->len0; H->nextH; ret…

创建一个基于Node.js的实时聊天应用

在当今数字化社会&#xff0c;实时通讯已成为人们生活中不可或缺的一部分。无论是在社交媒体平台上与朋友交流&#xff0c;还是在工作场合中与同事协作&#xff0c;实时聊天应用都扮演着重要角色。与此同时&#xff0c;Node.js作为一种流行的后端技术&#xff0c;为开发者提供了…

iOS调用系统已安装地图及内置地图实现

info.plist要添加scheme: 1.地图列表: NSArray *mapKeys=[[NSArray alloc] initWithObjects:@"com.autonavi.minimap",@"com.baidu.BaiduMap",@"com.google.android.apps.maps",@"com.tencent.map", nil]; NSArray *mapSchemes=[[NS…

微服务篇之分布式系统理论

一、CAP定理 1.什么是CAP 1998年&#xff0c;加州大学的计算机科学家 Eric Brewer 提出&#xff0c;分布式系统有三个指标&#xff1a; 1. Consistency&#xff08;一致性&#xff09;。 2. Availability&#xff08;可用性&#xff09;。 3. Partition tolerance &#xff0…

利用iSCSI服务部署IP SAN网络存储服务

一、配置环境&#xff08;Vmware WorkStation虚拟环境&#xff09; 服务端与客户端OS&#xff1a;openEuler 22.03-LTS CPU&#xff1a;1U1C 内存&#xff1a;2G 硬盘&#xff1a;5个SCSI磁盘&#xff0c;其中一个作为系统盘&#xff0c;另外四个配置为RAID5阵列 服务器IP…