Java项目--仿RabbitMQ的消息队列--虚拟主机设计

devtools/2024/12/22 9:32:56/

目录

一、引言

二、VirtualHost类

1.准备工作

2.交换机操作

3.队列操作

4.绑定操作

三、完善Router类

四、测试Router类

五、完善VirtualHost类

六、完善ConsumerManager类

七、总结


一、引言

  本篇文章我们就消息队列中的虚拟主机进行设计,将内存和硬盘上的数据串起来,但此处,我们仅设计一个虚拟主机。

二、VirtualHost类

1.准备工作

public class VirtualHost {private String virtualHostName;DiskDataCenter diskDataCenter = new DiskDataCenter();MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// router用来实现转发规则private Router router = new Router();// consumerManager用来实现消息消费private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();public String getVirtualHostName(){return virtualHostName;}public MemoryDataCenter getMemoryDataCenter(){return memoryDataCenter;}public DiskDataCenter getDiskDataCenter(){return diskDataCenter;}public VirtualHost (String virtualHostName){this.virtualHostName = virtualHostName;diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException |MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}
}

2.交换机操作

/*创建一个交换机*/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String,Object> arguments){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange existExchange = memoryDataCenter.getExchange(exchangeName);if(existExchange!=null){System.out.println("[VirtualHost] 交换机已经存在!exchangeName="+exchangeName);return true;}Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}/*删除交换机*/public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if(toDelete==null){throw new MqException("[VirtualHost] 交换机不存在无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机删除失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}

3.队列操作

/*创建队列*/public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue!=null){System.out.println("[VirtualHost]队列已经存在!queueName="+queueName);return true;}MsgQueue queue = new MsgQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);if(durable){diskDataCenter.insertQueue(queue);}memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost]队列创建成功!queueName="+queueName);}return true;}catch (Exception e){System.out.println("[VirtualHost]队列创建失败!queueName="+queueName);e.printStackTrace();return false;}}/*删除队列*/public boolean queueDelete(String queueName){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue toDelete = memoryDataCenter.getQueue(queueName);if(toDelete==null){throw new MqException("[VirtualHost]队列不存在,无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost]队列删除成功!queueName="+queueName);}return true}catch (Exception e){System.out.println("[VirtualHost]队列删除失败!queueName="+queueName);e.printStackTrace();return false;}}

4.绑定操作

/*绑定操作*/public boolean QueueBind(String exchangeName,String queueName,String bindingKey){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding!=null){throw new MqException("[VirtualHost] binding已经存在!exchangeName="+exchangeName+",queueName="+queueName);}if(!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey非法!bindingKey="+bindingKey);}Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if (queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}if(exchange.isDurable() && queue.isDurable()){diskDataCenter.insertBinding(binding);}memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost]绑定创建成功!exchangeName="+exchangeName+",queueName="+queueName);}}return true;}catch (Exception e){System.out.println("[VirtualHost]绑定创建失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}/*删除绑定*/public boolean queueUnbind(String exchangeName,String queueName){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding==null){throw new MqException("[VirtualHost]绑定不存在,无法删除!exchangeName="+exchangeName+",queueName="+queueName);}diskDataCenter.deleteBinding(existsBinding);memoryDataCenter.deleteBinding(existsBinding);System.out.println("[VirtualHost]删除绑定成功!");}}return true;}catch (Exception e){System.out.println("[VirtualHost]删除绑定失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}

5.消息操作

public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){try {exchangeName = virtualHostName + exchangeName;if(!router.checkRoutingKey(routingKey)){throw new MqException("[VirtualHost]routingKey非法!routingKey="+routingKey);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}if(exchange.getType()==ExchangeType.DIRECT){String queueName = virtualHostName + routingKey;Message message = Message.createMessageWithId(routingKey,basicProperties,body);MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}sendMessage(queue,message);}else{ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String,Binding> entry:bindingsMap.entrySet()){Binding binding = entry.getValue();MsgQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue==null){System.out.println("[VirtualHost]上传消息的时候,发现队列不存在!queueName="+binding.getQueueName());continue;}Message message = Message.createMessageWithId(routingKey,basicProperties,body);if(!router.route(exchange.getType(),binding,message)){continue;}sendMessage(queue,message);}}return true;}catch (Exception e){System.out.println("[VirtualHost]发送消息失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}private void sendMessage(MsgQueue queue,Message message) throws IOException, MqException {int deliverMode = message.getDeliverMode();if(deliverMode==2){diskDataCenter.sendMessage(queue,message);}memoryDataCenter.sendMessage(queue,message);consumerManager.notifyConsume(queue.getName());}

三、完善Router类

public class Router {public boolean checkBindingKey(String bindingKey){if(bindingKey.length()==0){return true;}for(int i =0;i<bindingKey.length();i++){char ch = bindingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.' || ch=='*' || ch=='#'){continue;}return false;}String[] words = bindingKey.split("\\.");for(String word:words){if(word.length()>1 && (word.contains("#") || word.contains("*")) ){return false;}}// 通配符之间的相邻关系// 1.aaa.#.#.bbb  => 非法// 2.aaa.#.*.bbb  => 非法// 3.aaa.*.#.bbb  => 非法// 4.aaa.*.*.bbb  => 合法for(int i=0;i<words.length-1;i++){if(words[i].equals("#") && words[i+1].equals("#")){return false;}if(words[i].equals("#") && words[i+1].equals("*")){return false;}if(words[i].equals("*") && words[i+1].equals("#")) {return false;}}return true;}public boolean checkRoutingKey(String routingKey){if(routingKey.length()==0){return true;}for(int i=0;i<routingKey.length();i++){char ch = routingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.'){continue;}return false;}return true;}// 这个方法用来判定该消息是否可以转发给对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {if(exchangeType==ExchangeType.FANOUT){return true;}else if(exchangeType == ExchangeType.TOPIC){return routeTopic(binding,message);}else {throw new MqException("[Router] 交换机类型非法!exchangeType="+exchangeType);}}private boolean routeTopic(Binding binding,Message message){String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;while(bindingIndex<bindingTokens.length && routingIndex<routingTokens.length){if(bindingTokens[bindingIndex].equals("*")){bindingIndex++;routingIndex++;continue;}else if(bindingTokens[bindingIndex].equals("#")) {bindingIndex++;if (bindingIndex == bindingTokens.length) {// 情况3 如果#后面没有其他内容了,直接返回truereturn true;}// 情况4 如果#后面还存在其他内容,直接找#后面的部分,在routingKey中是否存在routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {return false;}// 找到了就往后匹配bindingIndex++;routingIndex++;}else {// 情况1:普通字符串两边内容必须一致if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}if(bindingIndex==bindingTokens.length && routingIndex==routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens,int routingIndex,String bindingToken){for (int i =routingIndex;i<routingTokens.length;i++){if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}
}

四、测试Router类

@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp(){binding = new Binding();message = new Message();}@AfterEachpublic void tearDown(){binding = null;message = null;}@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest1]测试成功!");}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest2]测试成功!");}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest3]测试成功!");}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest4]测试成功!");}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest5]测试成功!");}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest6]测试成功!");}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest7]测试成功!");}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest8]测试成功!");}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest9]测试成功!");}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest10]测试成功!");}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes11]测试成功!");}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes12]测试成功!");}@Testpublic void test13() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes13]测试成功!");}
}

五、完善VirtualHost类

/*消费者订阅消息*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功!queueName="+queueName);return true;}catch (Exception e){System.out.println("[VirtualHost] basicConsume失败!queueName="+queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName,String messageId){queueName = virtualHostName +queueName;try {Message message = memoryDataCenter.getMessage(messageId);if(message==null){throw new MqException("[VirtualHost]要确认的消息不存在!messageId="+messageId);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]要确认的队列不存在!queueName="+queueName);}if(message.getDeliverMode()==2){diskDataCenter.deleteMessage(queue,message);}memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println("[VirtualHost]basicAck成功!消息被成功确认!queueName="+queueName+",messageId="+messageId);return true;}catch (Exception e){System.out.println("[VirtualHost]basicAck成功!消息被失败确认!queueName="+queueName+",messageId="+messageId);e.printStackTrace();return false;}}

六、完善ConsumerManager类

public class ConsumerManager {private VirtualHost parent;private ExecutorService workerPool = Executors.newFixedThreadPool(4);private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();private Thread scannerThread = null;public ConsumerManager(VirtualHost p){parent = p;scannerThread = new Thread(() ->{while (true){try {String queueName = tokenQueue.take();MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager]取令牌后发现,该队列名不存在!queueName="+queueName);}synchronized (queue){consumeMessage(queue);}}catch (InterruptedException | MqException e){e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 队列不存在!queueName="+queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);int n = parent.getMemoryDataCenter().getMessageCount(queueName);// 针对消息进行消费for(int i=0;i<n;i++){consumeMessage(queue);}}}private void consumeMessage(MsgQueue queue) {// 1.先找到一个消费者ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog == null){return;}// 2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){return;}// 3.把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() ->{try{// 1.把消息放入待确认集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);// 2.真正执行回调// consumerEnv中有Consumer函数式接口,调用在Consumer函数式接口中的回调方法luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());// 3.如果当前是“自动应答”,直接把消息删除// 4.如果当前是“手动应答”,就先不处理if(luckyDog.isAutoAck()){// 1.删除硬盘上的消息if(message.getDeliverMode()==2){parent.getDiskDataCenter().deleteMessage(queue,message);}// 2.删除内存上的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3.删除内存中消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager]消息被消费成功!queueName="+queue.getName());}}catch (Exception e){e.printStackTrace();}});}
}

七、总结

  本篇文章就如何将内存以及硬盘上的数据进行串在一起设计了一个虚拟主机,以及相对应的方法,下一篇文章就网络通信设计进行代码编写,感谢观看!


http://www.ppmy.cn/devtools/144330.html

相关文章

云手机有哪些用途?云手机选择推荐

云手机的使用范围日益扩大&#xff0c;无论是个人使用&#xff0c;还是各种规模的中小型工作室需要进行养号、挂机、参加活动甚至完成各种测试需求&#xff0c;都已经开始大量采用云手机。以前&#xff0c;许多公司或工作室都自行建设手机批量控制的设备&#xff0c;但需要自行…

嵌入式硬件面试题

1、请问什么是通孔、盲孔和埋孔&#xff1f;孔径多大可以做机械孔&#xff0c;孔径多小必须做激光孔&#xff1f;请问激光微型孔可以直接打在元件焊盘上吗&#xff0c;为什么&#xff1f; 通孔是贯穿整个PCB的过孔&#xff0c;盲孔是从PCB表层连接到内层的过孔&#xff0c;埋孔…

Spring Boot 集成 MyBatis 全面讲解

Spring Boot 集成 MyBatis 全面讲解 MyBatis 是一款优秀的持久层框架&#xff0c;与 Spring Boot 集成后可以大大简化开发流程。本文将全面讲解如何在 Spring Boot 中集成 MyBatis&#xff0c;包括环境配置、基础操作、高级功能和最佳实践。 一、MyBatis 简介 1. SqlSession …

mybatisservlet报错:Request processing failed; Parameter ‘meeting_id‘ not found

消息 Request processing failed; nested exception is org.mybatis.spring.MyBatisSystemException: nested exception is org.apache.ibatis.binding.BindingException: Parameter meeting_id not found. Available parameters are [arg2, arg1, arg0, param3, param1, param…

关于开发C# WinForms 应用程序的方法介绍

前提&#xff1a;首先默认大家已经有了一个完整的项目&#xff0c;并且现在所需的是进行C#界面的开发设计。 本文介绍的项目是使用C与C#联合开发的客户端软件 1 首先定义一个窗体类&#xff1a; 定义了一个窗体类 Formxxxxxxxx&#xff0c;该窗体类继承自 EF.EFFormMain。该…

SQL 插入数据详解

本文介绍如何利用 SQL 的 INSERT 语句将数据插入表中。 1. 数据插入 顾名思义&#xff0c;INSERT 用来将行插入&#xff08;或添加&#xff09;到数据库表。插入有几种方式&#xff1a; 插入完整的行&#xff1b;插入行的一部分&#xff1b;插入某些查询的结果。 下面逐一介…

QT实战经验总结 连载中

QT实战经验总结 在看书系统学习后&#xff0c;就开始实战了&#xff0c;会遇到很多问题1.信号和槽的思考2.在python 或 C 代码中&#xff0c;对 QML 代码中控件的调用3.关于在一个窗口上不断打开新窗口 在看书系统学习后&#xff0c;就开始实战了&#xff0c;会遇到很多问题 p…

Apache 如何监听多个端口 ?

Apache 是一个广泛使用的 web 服务器&#xff0c;可以配置为侦听多个端口。这对于托管多个网站、运行不同类型的服务或改进服务器的可访问性特别有用。在本文中&#xff0c;我们将探讨配置 Apache 以侦听多个端口的步骤。 Step 1: Access Apache Configuration File 找到并打…