目录
一、引言
二、VirtualHost类
1.准备工作
2.交换机操作
3.队列操作
4.绑定操作
三、完善Router类
四、测试Router类
五、完善VirtualHost类
六、完善ConsumerManager类
七、总结
一、引言
本篇文章我们就消息队列中的虚拟主机进行设计,将内存和硬盘上的数据串起来,但此处,我们仅设计一个虚拟主机。
二、VirtualHost类
1.准备工作
public class VirtualHost {private String virtualHostName;DiskDataCenter diskDataCenter = new DiskDataCenter();MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// router用来实现转发规则private Router router = new Router();// consumerManager用来实现消息消费private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();public String getVirtualHostName(){return virtualHostName;}public MemoryDataCenter getMemoryDataCenter(){return memoryDataCenter;}public DiskDataCenter getDiskDataCenter(){return diskDataCenter;}public VirtualHost (String virtualHostName){this.virtualHostName = virtualHostName;diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException |MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}
}
2.交换机操作
/*创建一个交换机*/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String,Object> arguments){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange existExchange = memoryDataCenter.getExchange(exchangeName);if(existExchange!=null){System.out.println("[VirtualHost] 交换机已经存在!exchangeName="+exchangeName);return true;}Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}/*删除交换机*/public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if(toDelete==null){throw new MqException("[VirtualHost] 交换机不存在无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功!exchangeName="+exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机删除失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}
3.队列操作
/*创建队列*/public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue!=null){System.out.println("[VirtualHost]队列已经存在!queueName="+queueName);return true;}MsgQueue queue = new MsgQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);if(durable){diskDataCenter.insertQueue(queue);}memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost]队列创建成功!queueName="+queueName);}return true;}catch (Exception e){System.out.println("[VirtualHost]队列创建失败!queueName="+queueName);e.printStackTrace();return false;}}/*删除队列*/public boolean queueDelete(String queueName){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MsgQueue toDelete = memoryDataCenter.getQueue(queueName);if(toDelete==null){throw new MqException("[VirtualHost]队列不存在,无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost]队列删除成功!queueName="+queueName);}return true}catch (Exception e){System.out.println("[VirtualHost]队列删除失败!queueName="+queueName);e.printStackTrace();return false;}}
4.绑定操作
/*绑定操作*/public boolean QueueBind(String exchangeName,String queueName,String bindingKey){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding!=null){throw new MqException("[VirtualHost] binding已经存在!exchangeName="+exchangeName+",queueName="+queueName);}if(!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey非法!bindingKey="+bindingKey);}Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if (queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}if(exchange.isDurable() && queue.isDurable()){diskDataCenter.insertBinding(binding);}memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost]绑定创建成功!exchangeName="+exchangeName+",queueName="+queueName);}}return true;}catch (Exception e){System.out.println("[VirtualHost]绑定创建失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}/*删除绑定*/public boolean queueUnbind(String exchangeName,String queueName){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker){synchronized (queueLocker){Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding==null){throw new MqException("[VirtualHost]绑定不存在,无法删除!exchangeName="+exchangeName+",queueName="+queueName);}diskDataCenter.deleteBinding(existsBinding);memoryDataCenter.deleteBinding(existsBinding);System.out.println("[VirtualHost]删除绑定成功!");}}return true;}catch (Exception e){System.out.println("[VirtualHost]删除绑定失败!exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}
5.消息操作
public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){try {exchangeName = virtualHostName + exchangeName;if(!router.checkRoutingKey(routingKey)){throw new MqException("[VirtualHost]routingKey非法!routingKey="+routingKey);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost]交换机不存在!exchangeName="+exchangeName);}if(exchange.getType()==ExchangeType.DIRECT){String queueName = virtualHostName + routingKey;Message message = Message.createMessageWithId(routingKey,basicProperties,body);MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]队列不存在!queueName="+queueName);}sendMessage(queue,message);}else{ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String,Binding> entry:bindingsMap.entrySet()){Binding binding = entry.getValue();MsgQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue==null){System.out.println("[VirtualHost]上传消息的时候,发现队列不存在!queueName="+binding.getQueueName());continue;}Message message = Message.createMessageWithId(routingKey,basicProperties,body);if(!router.route(exchange.getType(),binding,message)){continue;}sendMessage(queue,message);}}return true;}catch (Exception e){System.out.println("[VirtualHost]发送消息失败!exchangeName="+exchangeName);e.printStackTrace();return false;}}private void sendMessage(MsgQueue queue,Message message) throws IOException, MqException {int deliverMode = message.getDeliverMode();if(deliverMode==2){diskDataCenter.sendMessage(queue,message);}memoryDataCenter.sendMessage(queue,message);consumerManager.notifyConsume(queue.getName());}
三、完善Router类
public class Router {public boolean checkBindingKey(String bindingKey){if(bindingKey.length()==0){return true;}for(int i =0;i<bindingKey.length();i++){char ch = bindingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.' || ch=='*' || ch=='#'){continue;}return false;}String[] words = bindingKey.split("\\.");for(String word:words){if(word.length()>1 && (word.contains("#") || word.contains("*")) ){return false;}}// 通配符之间的相邻关系// 1.aaa.#.#.bbb => 非法// 2.aaa.#.*.bbb => 非法// 3.aaa.*.#.bbb => 非法// 4.aaa.*.*.bbb => 合法for(int i=0;i<words.length-1;i++){if(words[i].equals("#") && words[i+1].equals("#")){return false;}if(words[i].equals("#") && words[i+1].equals("*")){return false;}if(words[i].equals("*") && words[i+1].equals("#")) {return false;}}return true;}public boolean checkRoutingKey(String routingKey){if(routingKey.length()==0){return true;}for(int i=0;i<routingKey.length();i++){char ch = routingKey.charAt(i);if(ch>='A' && ch<='Z'){continue;}if(ch>='a' && ch<='z'){continue;}if(ch>='0' && ch<='9'){continue;}if(ch=='_' || ch=='.'){continue;}return false;}return true;}// 这个方法用来判定该消息是否可以转发给对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {if(exchangeType==ExchangeType.FANOUT){return true;}else if(exchangeType == ExchangeType.TOPIC){return routeTopic(binding,message);}else {throw new MqException("[Router] 交换机类型非法!exchangeType="+exchangeType);}}private boolean routeTopic(Binding binding,Message message){String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;while(bindingIndex<bindingTokens.length && routingIndex<routingTokens.length){if(bindingTokens[bindingIndex].equals("*")){bindingIndex++;routingIndex++;continue;}else if(bindingTokens[bindingIndex].equals("#")) {bindingIndex++;if (bindingIndex == bindingTokens.length) {// 情况3 如果#后面没有其他内容了,直接返回truereturn true;}// 情况4 如果#后面还存在其他内容,直接找#后面的部分,在routingKey中是否存在routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {return false;}// 找到了就往后匹配bindingIndex++;routingIndex++;}else {// 情况1:普通字符串两边内容必须一致if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}if(bindingIndex==bindingTokens.length && routingIndex==routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens,int routingIndex,String bindingToken){for (int i =routingIndex;i<routingTokens.length;i++){if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}
}
四、测试Router类
@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp(){binding = new Binding();message = new Message();}@AfterEachpublic void tearDown(){binding = null;message = null;}@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest1]测试成功!");}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest2]测试成功!");}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest3]测试成功!");}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest4]测试成功!");}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest5]测试成功!");}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest6]测试成功!");}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest7]测试成功!");}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest8]测试成功!");}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest9]测试成功!");}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTest10]测试成功!");}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes11]测试成功!");}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes12]测试成功!");}@Testpublic void test13() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));System.out.println("[RouterTes13]测试成功!");}
}
五、完善VirtualHost类
/*消费者订阅消息*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功!queueName="+queueName);return true;}catch (Exception e){System.out.println("[VirtualHost] basicConsume失败!queueName="+queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName,String messageId){queueName = virtualHostName +queueName;try {Message message = memoryDataCenter.getMessage(messageId);if(message==null){throw new MqException("[VirtualHost]要确认的消息不存在!messageId="+messageId);}MsgQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost]要确认的队列不存在!queueName="+queueName);}if(message.getDeliverMode()==2){diskDataCenter.deleteMessage(queue,message);}memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println("[VirtualHost]basicAck成功!消息被成功确认!queueName="+queueName+",messageId="+messageId);return true;}catch (Exception e){System.out.println("[VirtualHost]basicAck成功!消息被失败确认!queueName="+queueName+",messageId="+messageId);e.printStackTrace();return false;}}
六、完善ConsumerManager类
public class ConsumerManager {private VirtualHost parent;private ExecutorService workerPool = Executors.newFixedThreadPool(4);private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();private Thread scannerThread = null;public ConsumerManager(VirtualHost p){parent = p;scannerThread = new Thread(() ->{while (true){try {String queueName = tokenQueue.take();MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager]取令牌后发现,该队列名不存在!queueName="+queueName);}synchronized (queue){consumeMessage(queue);}}catch (InterruptedException | MqException e){e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MsgQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 队列不存在!queueName="+queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);int n = parent.getMemoryDataCenter().getMessageCount(queueName);// 针对消息进行消费for(int i=0;i<n;i++){consumeMessage(queue);}}}private void consumeMessage(MsgQueue queue) {// 1.先找到一个消费者ConsumerEnv luckyDog = queue.chooseConsumer();if(luckyDog == null){return;}// 2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){return;}// 3.把消息带入到消费者的回调方法中,丢给线程池执行workerPool.submit(() ->{try{// 1.把消息放入待确认集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);// 2.真正执行回调// consumerEnv中有Consumer函数式接口,调用在Consumer函数式接口中的回调方法luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());// 3.如果当前是“自动应答”,直接把消息删除// 4.如果当前是“手动应答”,就先不处理if(luckyDog.isAutoAck()){// 1.删除硬盘上的消息if(message.getDeliverMode()==2){parent.getDiskDataCenter().deleteMessage(queue,message);}// 2.删除内存上的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3.删除内存中消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager]消息被消费成功!queueName="+queue.getName());}}catch (Exception e){e.printStackTrace();}});}
}
七、总结
本篇文章就如何将内存以及硬盘上的数据进行串在一起设计了一个虚拟主机,以及相对应的方法,下一篇文章就网络通信设计进行代码编写,感谢观看!