Java项目--仿RabbitMQ的消息队列--虚拟主机设计

ops/2025/3/12 9:57:59/

目录

一、引言

二、VirtualHost类

1.准备工作

2.交换机操作

3.队列操作

4.绑定操作

三、完善Router类

四、测试Router类

五、完善VirtualHost类

六、完善ConsumerManager类

七、总结


一、引言

  本篇文章我们就消息队列中的虚拟主机进行设计,将内存和硬盘上的数据串起来,但此处,我们仅设计一个虚拟主机。

二、VirtualHost类

1.准备工作

public class VirtualHost {private String virtualHostName;DiskDataCenter diskDataCenter = new DiskDataCenter();MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// router用来实现转发规则private Router router = new Router();// consumerManager用来实现消息消费private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();public String getVirtualHostName(){return virtualHostName;}public MemoryDataCenter getMemoryDataCenter(){return memoryDataCenter;}public DiskDataCenter getDiskDataCenter(){return diskDataCenter;}public VirtualHost (String virtualHostName){this.virtualHostName = virtualHostName;diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException |MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}
}

2.交换机操作

/*创建一个交换机*/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String,Object> arguments){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange existExchange = memoryDataCenter.getExchange(exchangeName);if(existExchange!=null){System.out.println("[VirtualHost] 交换机已经存在!exchangeName="+exchangeName);return true;}Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}/*删除交换机*/public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if(toDelete==null){throw new MqException("[VirtualHost] 交换机不存在无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机删除失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}

3.队列操作

/*创建队列*/public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue!=null){System.out.println("[VirtualHost]队列已经存在!queueName="+queueName);return true;}MsgQueue queue = new MsgQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);if(durable){diskDataCenter.insertQueue(queue);}memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost]队列创建成功!queueName="+queueName);}return true;}catch (Exception e){System.out.println("[VirtualHost]队列创建失败!queueName="+queueName);e.printStackTrace();return false;}}/*删除队列*/public boolean queueDelete(String queueName){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue toDelete = memoryDataCenter.getQueue(queueName);if(toDelete==null){throw new MqException("[VirtualHost]队列不存在,无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost]队列删除成功!queueName="+queueName);}return true}catch (Exception e){System.out.println("[VirtualHost]队列删除失败!queueName="+queueName);e.printStackTrace();return false;}}

4.绑定操作

/*绑定操作*/public boolean QueueBind(String exchangeName,String queueName,String bindingKey){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding!=null){throw new MqException("[VirtualHost] binding已经存在!exchangeName="+exchangeName+",queueName="+queueName);}if(!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey非法!bindingKey="+bindingKey);}Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if (queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}if(exchange.isDurable() && queue.isDurable()){diskDataCenter.insertBinding(binding);}memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost]绑定创建成功!exchangeName="+exchangeName+",queueName="+queueName);}}return true;}catch (Exception e){System.out.println("[VirtualHost]绑定创建失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}/*删除绑定*/public boolean queueUnbind(String exchangeName,String queueName){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding==null){throw new MqException("[VirtualHost]绑定不存在,无法删除!exchangeName="+exchangeName+",queueName="+queueName);}diskDataCenter.deleteBinding(existsBinding);memoryDataCenter.deleteBinding(existsBinding);System.out.println("[VirtualHost]删除绑定成功!");}}return true;}catch (Exception e){System.out.println("[VirtualHost]删除绑定失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}

5.消息操作

public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){try {exchangeName = virtualHostName + exchangeName;if(!router.checkRoutingKey(routingKey)){throw new MqException("[VirtualHost]routingKey非法!routingKey="+routingKey);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}if(exchange.getType()==ExchangeType.DIRECT){String queueName = virtualHostName + routingKey;Message message = Message.createMessageWithId(routingKey,basicProperties,body);MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}sendMessage(queue,message);}else{ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String,Binding> entry:bindingsMap.entrySet()){Binding binding = entry.getValue();MsgQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue==null){System.out.println("[VirtualHost]上传消息的时候,发现队列不存在!queueName="+binding.getQueueName());continue;}Message message = Message.createMessageWithId(routingKey,basicProperties,body);if(!router.route(exchange.getType(),binding,message)){continue;}sendMessage(queue,message);}}return true;}catch (Exception e){System.out.println("[VirtualHost]发送消息失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}private void sendMessage(MsgQueue queue,Message message) throws IOException, MqException {int deliverMode = message.getDeliverMode();if(deliverMode==2){diskDataCenter.sendMessage(queue,message);}memoryDataCenter.sendMessage(queue,message);consumerManager.notifyConsume(queue.getName());}

三、完善Router类

public class Router {public boolean checkBindingKey(String bindingKey){if(bindingKey.length()==0){return true;}for(int i =0;i<bindingKey.length();i++){char ch = bindingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.' || ch=='*' || ch=='#'){continue;}return false;}String[] words = bindingKey.split("\\.");for(String word:words){if(word.length()>1 && (word.contains("#") || word.contains("*")) ){return false;}}// 通配符之间的相邻关系// 1.aaa.#.#.bbb  => 非法// 2.aaa.#.*.bbb  => 非法// 3.aaa.*.#.bbb  => 非法// 4.aaa.*.*.bbb  => 合法for(int i=0;i<words.length-1;i++){if(words[i].equals("#") && words[i+1].equals("#")){return false;}if(words[i].equals("#") && words[i+1].equals("*")){return false;}if(words[i].equals("*") && words[i+1].equals("#")) {return false;}}return true;}public boolean checkRoutingKey(String routingKey){if(routingKey.length()==0){return true;}for(int i=0;i<routingKey.length();i++){char ch = routingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.'){continue;}return false;}return true;}// 这个方法用来判定该消息是否可以转发给对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {if(exchangeType==ExchangeType.FANOUT){return true;}else if(exchangeType == ExchangeType.TOPIC){return routeTopic(binding,message);}else {throw new MqException("[Router] 交换机类型非法!exchangeType="+exchangeType);}}private boolean routeTopic(Binding binding,Message message){String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;while(bindingIndex<bindingTokens.length && routingIndex<routingTokens.length){if(bindingTokens[bindingIndex].equals("*")){bindingIndex++;routingIndex++;continue;}else if(bindingTokens[bindingIndex].equals("#")) {bindingIndex++;if (bindingIndex == bindingTokens.length) {// 情况3 如果#后面没有其他内容了,直接返回truereturn true;}// 情况4 如果#后面还存在其他内容,直接找#后面的部分,在routingKey中是否存在routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {return false;}// 找到了就往后匹配bindingIndex++;routingIndex++;}else {// 情况1:普通字符串两边内容必须一致if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}if(bindingIndex==bindingTokens.length && routingIndex==routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens,int routingIndex,String bindingToken){for (int i =routingIndex;i<routingTokens.length;i++){if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}
}

四、测试Router类

@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp(){binding = new Binding();message = new Message();}@AfterEachpublic void tearDown(){binding = null;message = null;}@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest1]测试成功!");}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest2]测试成功!");}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest3]测试成功!");}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest4]测试成功!");}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest5]测试成功!");}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest6]测试成功!");}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest7]测试成功!");}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest8]测试成功!");}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest9]测试成功!");}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest10]测试成功!");}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes11]测试成功!");}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes12]测试成功!");}@Testpublic void test13() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes13]测试成功!");}
}

五、完善VirtualHost类

/*消费者订阅消息*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功!queueName="+queueName);return true;}catch (Exception e){System.out.println("[VirtualHost] basicConsume失败!queueName="+queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName,String messageId){queueName = virtualHostName +queueName;try {Message message = memoryDataCenter.getMessage(messageId);if(message==null){throw new MqException("[VirtualHost]要确认的消息不存在!messageId="+messageId);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]要确认的队列不存在!queueName="+queueName);}if(message.getDeliverMode()==2){diskDataCenter.deleteMessage(queue,message);}memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println("[VirtualHost]basicAck成功!消息被成功确认!queueName="+queueName+",messageId="+messageId);return true;}catch (Exception e){System.out.println("[VirtualHost]basicAck成功!消息被失败确认!queueName="+queueName+",messageId="+messageId);e.printStackTrace();return false;}}

六、完善ConsumerManager类

public class ConsumerManager {private VirtualHost parent;private ExecutorService workerPool = Executors.newFixedThreadPool(4);private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();private Thread scannerThread = null;public ConsumerManager(VirtualHost p){parent = p;scannerThread = new Thread(() ->{while (true){try {String queueName = tokenQueue.take();MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager]取令牌后发现,该队列名不存在!queueName="+queueName);}synchronized (queue){consumeMessage(queue);}}catch (InterruptedException | MqException e){e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 队列不存在!queueName="+queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);int n = parent.getMemoryDataCenter().getMessageCount(queueName);// 针对消息进行消费for(int i=0;i<n;i++){consumeMessage(queue);}}}private void consumeMessage(MsgQueue queue) {// 1.先找到一个消费者ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog == null){return;}// 2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){return;}// 3.把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() ->{try{// 1.把消息放入待确认集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);// 2.真正执行回调// consumerEnv中有Consumer函数式接口,调用在Consumer函数式接口中的回调方法luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());// 3.如果当前是“自动应答”,直接把消息删除// 4.如果当前是“手动应答”,就先不处理if(luckyDog.isAutoAck()){// 1.删除硬盘上的消息if(message.getDeliverMode()==2){parent.getDiskDataCenter().deleteMessage(queue,message);}// 2.删除内存上的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3.删除内存中消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager]消息被消费成功!queueName="+queue.getName());}}catch (Exception e){e.printStackTrace();}});}
}

七、总结

  本篇文章就如何将内存以及硬盘上的数据进行串在一起设计了一个虚拟主机,以及相对应的方法,下一篇文章就网络通信设计进行代码编写,感谢观看!


http://www.ppmy.cn/ops/143924.html

相关文章

【漫话机器学习系列】011.Bagging方法 VS Dropout方法

Bagging 和 Dropout 是两种用于提高模型性能、减少过拟合的方法&#xff0c;但它们的工作原理和适用场景有所不同。以下是两者的详细对比&#xff1a; 1. 方法背景 Bagging 全称&#xff1a;Bootstrap Aggregating。背景&#xff1a;一种集成学习方法&#xff0c;用于提升基学…

基于Spring Boot的个人财务系统

一、系统背景与目的 随着全球经济的发展和人们生活水平的提高&#xff0c;个人财务管理变得越来越重要。传统的个人财务软件存在操作复杂、用户体验差、数据不安全等问题&#xff0c;无法满足用户的个性化需求。因此&#xff0c;开发一种基于Spring Boot的个人财务系统&#x…

基于Spring Boot的水果蔬菜商城系统

一、系统概述 该系统主要适用于实体店的线上销售&#xff0c;旨在打造线上线下一体化的销售模式&#xff0c;带动水果蔬菜的销售量&#xff0c;提高店铺的销售额。系统前台主要面向用户&#xff0c;提供登录注册、首页展示、分类搜索、购物车、地址信息、个人信息、订单信息等…

在Ubuntu下运行QEMU仿真FreeBSD riscv64系统

在Ubuntu下运行QEMU仿真FreeBSD riscv64系统 突发奇想&#xff0c;尝试在Ubuntu下运行QEMU仿真FreeBSD riscv64系统&#xff0c; 参考这篇文档&#xff1a;手把手教你在QEMU上运行RISC-V Linux_qemu 运行 .bin-CSDN博客 并参考FreeBSD的Wiki&#xff1a;riscv - FreeBSD Wik…

开源数字人系统源码短视频文案提取文案改写去水印小程序

应用场景 短视频去水印&#xff1a; 个人用户&#xff1a;在社交媒体上分享短视频时&#xff0c;去除原视频中的水印&#xff0c;以保护个人隐私或避免侵权问题。企业用户&#xff1a;在广告、宣传和营销活动中&#xff0c;使用无水印的短视频以提高品牌知名度和吸引力。 文案提…

Serverless核心组件、最佳实践及性能优化

接下来,我们将深入探讨Serverless架构的更多细节,包括其核心组件、最佳实践以及如何优化Serverless应用的性能。Serverless架构通常涉及以下几个核心组件: 函数即服务(FaaS):云服务提供商提供的计算服务,允许您运行代码而无需管理服务器。AWS Lambda、Azure Functions和…

PDF-Extract-Kit

环境安装 conda create -n pdf-extract-kit-1.0 python3.10 conda activate pdf-extract-kit-1.0 pip install -r requirements.txt网有点卡&#xff0c;每次下到一半就停了&#xff0c;回宿舍再试试 模型权重下载 官网 huggingface-cli download --resume-download openda…

JS中的innerHTML,innerText,value的区别

目录 Document 对象 主要用途 getElementById() 方法 innerHTML innerText value Document 对象 Document 对象&#xff0c;当 HTML 文档加载到 Web 浏览器中时&#xff0c;它就变成了一个文档对象。文档对象是 HTML 文档的根节点。文档对象是窗口对象的属性。 在JavaSc…