仿RabbitMQ实现消息队列

devtools/2024/9/19 19:27:12/ 标签: rabbitmq

前言:本项目是仿照RabbitMQ并基于SpringBoot + Mybatis + SQLite3实现的消息队列,该项目实现了MQ的核心功能:生产者、消费者、中间人、发布、订阅等。

源码链接:仿Rabbit MQ实现消息队列

目录

前言:本项目是仿照RabbitMQ并基于SpringBoot + Mybatis + SQLite3实现的消息队列,该项目实现了MQ的核心功能:生产者、消费者、中间人、发布、订阅等。

一、核心概念

二、模块划分 

三、创建核心实体类 

3.1 创建交换机(Exchange)

3.2 创建队列实体类(MSGQueue)

 3.3 创建绑定实体类(Binding)

 3.4 创建消息实体类(Message)

四、数据库操作

 五、封装对数据库的操作

 六、消息的存储设计

6.1 设计思路及设定

 6.2 设定存储消息的格式

6.3 实现消息序列化工具 

七、实现文件管理消息

7.1 读取消息统计文件

 7.2 写消息统计文件

7.3 创建队列对应的文件和目录

7.4 删除队列对应的文件和目录

 7.5 判断队列的目录和消息文件是否存在

7.6 新的消息写入到文件中 

 7.7 删除队列对应的消息数据文件中的消息

7.8 从文件中读取消息到内存中

 7.9 判断垃圾回收时机

7.10 获取新消息数据文件的路径

 7.11 消息的垃圾回收

八、统一硬盘处理

九、内存管理

 9.1 交换机相关操作的API

 9.2 队列相关操作的API

9.3 绑定相关操作的API

 9.4 消息相关操作的API

 十、虚拟机 VirtualHost

10.1 创建交换机

 10.2 删除交换机

 10.3 创建队列

10.4 删除队列

 10.5 创建绑定

10.6 删除绑定

 10.7 发送消息

10.8 订阅消息

10.9 消息确认

十一、网络通信协议设计

11.1 设计应用层协议 

 11.2 定义Request/Response

 11.3 定义参数父类

11.4 定义返回值父类

 11.5 定义其他参数类

 十二、实现 BrokerServer 类

 12.1 启动/停止服务器

 12.2 实现处理连接

 12.3 实现 readRequest / writeResponse

 12.4 实现处理请求

12.5 实现清理过期的会话

 十三、实现客户端

13.1 创建 ConnectionFactory

 13.2 Connection 和 Channel 定义

十四、样例演示


一、核心概念

关于消息队列,有几个重要的核心概念:

  • 生产者(Producer) :负责将应用程序产生的数据转换成消息,并将这些消息推送到消息队列服务器上,以便消费者(Consumer)可以接收并处理这些消息。
  • 消费者(Consumer):它的主要职责是监听特定的队列或主题,并对到达的消息执行必要的业务逻辑。
  • 中间人(Broker):作为生产者(Producer)和消费者(Consumer)之间的中介,负责管理和协调消息的传递过程。
  • 发布(Publish):生产者向中间人投递消息的过程。
  • 订阅(Subscribe):哪些消费者要从这个中间人获取数据,这个注册的过程称为订阅。

在中间人(Broker)模块,又有以下几个概念:

  • 虚拟机 (VirtualHost):类似于 MySQL 的 "database", 是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue。
  • 队列 (Queue): 真正⽤来存储消息的部分,每个消费者决定⾃⼰从哪个 Queue 上读取消息。
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多" 关系,使⽤⼀个关联表就可以把这两个概念联系起来。

二、模块划分 

明确需要做的工作:

  • 实现生产者、消费者、Broker Server这三个部分。

  • 针对生产者、消费者,主要实现的是客户端和服务器的网络通信部分。

  • 给客户端提供一组 API,让客户端的业务代码来调用,通过网络通信的方式远程调用Broker Server上的方法。

  • 实现Broker Server 内部的一些基本概念和API(虚拟主机、交换机、队列、绑定、消息)。

  • 持久化(考虑到 SQLite 相比 MySQL 来说比较轻量,因此存储交换机、队列等这些实体用 SQLite,消息的存储使用文件进行管理)。

针对于上述所需要实现的模块,进行划分:

 

三、创建核心实体类 

3.1 创建交换机(Exchange)

nametypedurableautoDeletearguments
交换机身份标识交换机类型是否持久化是否自动删除

额外参数选项

@Data
public class Exchange {//交换机的身份标识(唯一)private String name;//交换机类型 direct fanout topicprivate ExchangeType type = ExchangeType.DIRECT;//表示该交换机是否要持久化存储. true 表示需要持久化. false 表示不需要持久化private boolean durable = false;//如果当前交换机,无客户端使用,就自动删除private boolean autoDelete = false;//表示创建交换机时指定的一些额外的参数选项private Map<String,Object> arguments = new HashMap<>();
}

此处省略 arguments 存储数据库时的Json转换,只需要使用 ObjectMapper即可实现。关于交换机的类型,此次主要实现了以下三种:DIRECT、FANOUT、TOPIC。并使用枚举类定义:

public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type = type;}public int getType() {return type;}
}

3.2 创建队列实体类(MSGQueue)

namedurableexclusiveautoDeleteargumentsconsumerEnvListconsumerSeq
队列标识是否持久化是否独占是否自动删除额外参数选项当前订阅的消费者列表记录当前取到第几个消费者
@Data
public class MSGQueue {//表示队列的身份标识private String name;//表示队列是否持久化 true:需要持久化 false:不需要持久化private boolean durable = false;//表示是否独占,true:独占 false:都可以使用private boolean exclusive = false;//表示无客户端使用是,是否自动删除private boolean autoDelete = false;//表示扩展参数private Map<String,Object> arguments = new HashMap<>();//当前队列都有哪些消费者订阅了.private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//记录当前取到的第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);
}

 3.3 创建绑定实体类(Binding)

exchangeNamequeueNamebindingKey
交换机名字队列名字绑定(和 routingKey匹配)
@Data
public class Binding {// 交换机名字private String exchangeName; //队列名字private String queueName;// 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配private String bindingKey;
}

 3.4 创建消息实体类(Message)

消息存储为二进制形式,因此对消息的存储需要进行序列化处理,所以 Message 类要实现Serializable 接口。

basicProperties bodyoffsetBegoffsetEndisValid
消息的属性消息的正文消息开头在文件中的偏移量消息末尾在文件中的偏移量是否有效
@Data
public class Message implements Serializable {private BasicProperties basicProperties = new BasicProperties();private byte[] body;//辅助属性,后续消息要存储在文件中//一个文件存储很多消息 [offsetBeg, offsetEnd)private transient long offsetBeg = 0;//消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd = 0;//消息数据的结尾距离文件开头的位置偏移(字节)private byte isValid = 0x1;//表示该消息在文件中是否是有效的消息,逻辑删除, 0x1:有效 0x0:无效//创建一个工厂方法,让工厂方法去封装一下创建 Message 对象的过程//这个方法创建的 Message 会自动生成一个唯一的 MessageIdpublic static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();if(basicProperties != null){message.setBasicProperties(basicProperties);}//此处生成的 MessageId 以 "M-" 为前缀, 方便区分message.setMessageId("M-" + UUID.randomUUID());message.setRoutingKey(routingKey);message.body = body;return message;}public String getMessageId(){return basicProperties.getMessageId();}public void  setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int deliverMode){basicProperties.setDeliverMode(deliverMode);}
}

 对于消息的属性,使用一个实体类去表示:

messageIdroutingKeydeliverMode
消息的唯一身份标识和(bindingKey匹配)是否持久化
@Data
public class BasicProperties implements Serializable {//消息的唯一身份标识,使用 UUID 作为 messageIdprivate String messageId;/*** 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名* 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)* 如果当前的交换机类型是 TOPIC,  此时 routingKey 就表示和 bindingKey 进行匹配*/private String routingKey;//表示消息是否持久化, 1: 不持久化; 2: 持久化private int deliverMode = 1;
}

四、数据库操作

对于 Exchange, MSGQueue, Binding, 需要使⽤数据库进⾏持久化保存,这里使用 SQLite 进行存储,直接去 Maven 中央仓库复制依赖到项目的POM文件,再配置数据库文件即可。

SQLite 只是把数据单纯的存储到⼀个⽂件中,因此在这里设定存储到 “./data/meta.db”文件。

 实现创建表以及数据库操作(这里不再展示具体的SQL语句)

@Mapper
public interface MetaMapper {//三个核心建表方法void createExchangeTable();void createQueueTable();void createBindingTable();//针对上述三个基本概念进行插入和删除void insertExchange(Exchange exchange);List<Exchange> selectAllExchanges();void deleteExchange(@Param("exchangeName") String exchangeName);void insertQueue(MSGQueue queue);List<MSGQueue> selectAllQueues();void deleteQueue(@Param("queueName") String queueName);void insertBinding(Binding binding);List<Binding> selectAllBindings();void deleteBinding(Binding binding);}

 五、封装对数据库的操作

创建 DataBaseManager 类,通过这个类来封装针对数据库的操作。
public class DataBaseManager {//数据库初始化public void init(){...}//删除数据库public void deleteDB(){...}//判断数据库是否存在private boolean checkDBExists(){...}//建表操作private void createTable(){...}//创建默认数据,RabbitMQ 里默认也带有一个 匿名 的交换机,类型是 DIRECTprivate void createDefaultData(){...}//交换机的数据库操作:增删查public void insertExchange(Exchange exchange){...}public void deleteExchange(String exchangeName){...}public List<Exchange> selectAllExchanges(){...}//队列的数据库操作:增删查public void insertQueue(MSGQueue queue){...}public void deleteQueue(String queueName){...}public List<MSGQueue> selectAllQueues(){...}//Binding的数据库操作:增删查public void insertBinding(Binding binding){...}public void deleteBinding(Binding binding){...}public List<Binding> selectAllBindings(){...}
}

 六、消息的存储设计

6.1 设计思路及设定

设计思路:消息需要在硬盘上存储,考虑到对于消息的操作并不需要复杂的增删改查,而⽂件的操作效率比数据库会高很多,因此这里设定,用文件来管理消息。

同时,因为队列用来存储消息,因此这里约定:

  1. 给每个队列分配⼀个目录,目录的名字为 data + 队列名,形如 :./data/queueName
  2. 该目录中包含两个固定名字的⽂件:

 queue_data.txt 消息数据⽂件, 用来保存消息内容。

 queue_stat.txt 消息统计⽂件, 用来保存消息统计信息。(消息总个数/t有效消息数),这样设计主要时考虑到后续进行垃圾回收,方便判断进行GC的时机。

 6.2 设定存储消息的格式

消息数据文件以二进制的形式存储在 queue_data.txt 文件中 ,为了方便进行消息的读取,这里进行这样的设定:

          每个消息分成两个部分:前四个字节, 表示 Message 对象的长度(字节数),后面若干字节,表示 Message 内容,消息和消息之间首尾相连。同时每个 Message 基于 Java 标准库进行序列化。 Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置。

6.3 实现消息序列化工具 

对于实现消息序列化,首先 Message 实体类要实现  Serializable 接口,接下来需要借助ByteArrayOutputStream 和 ObjectOutputStream 实现消息的序列化和反序列化:

public class BinaryTool {/*** 把对象序列化成一个字节数组* @param object* @return*/public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组,// 可以把 object 对象序列化的数据逐步写入导 byteArrayOutputStream 中,// 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把 object 进行序列化,生成的字节数据就会写入到 objectOutputStream// objectOutputStream 又关联到了 byteArrayOutputStream,最终结果写入到 byteArrayOutputStream 里objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}/*** 把一个字节数组反序列化成一个对象* @param data* @return*/public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){object = objectInputStream.readObject();}}return object;}
}

七、实现文件管理消息

 创建 MessageFileManager 类,这个类主要去实现消息统计文件的读写、消息数据文件的读写、创建存储消息的文件、消息的垃圾回收等等。

下面这段代码是 MessageFileManager 的基础代码,实现文件的读写和垃圾回收都需要调用下面的方法:

public class MessageFileManager {//定义一个内部类,来表示该队列的统计信息static public class Stat{public int totalCount; // 总消息数量public int validCount; // 有效消息数量}public void init(){}//约定消息文件所在的目录和文件名// 所在路径及文件名: ./data/队列名/queue_data.txt(消息数据文件)//                ./data/队列名/queue_stat.txt(消息统计文件)/*** 这个方法用来获取指定队列对应的消息文件所在路径* @param queueName* @return*/private String getQueueDir(String queueName){return "./data/" + queueName;}/*** 这个方法用来获取该队列的消息数据文件路径* @param queueName* @return*/private String getQueueDataPath(String queueName){return getQueueDir(queueName) + "/queue_data.txt";}/*** 这个方法用来获取指定队列的消息统计文件的路径* @param queueName* @return*/private String getQueueStatPath(String queueName){return getQueueDir(queueName) + "/queue_stat.txt";}

7.1 读取消息统计文件

private Stat readStat(String queueName){Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}

 7.2 写消息统计文件

    private void writeStat(String queueName, Stat stat){// 使用 PrintWriter 写文件try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}

7.3 创建队列对应的文件和目录

public void createQueueFiles(String queueName) throws IOException {//1.创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if(!baseDir.exists()){boolean success =  baseDir.mkdirs();if(!success){throw new IOException("创建目录失败! baseDir :" + baseDir.getAbsolutePath());}}//2.创建消息数据文件File queueDataFile = new File(getQueueDataPath(queueName));if(!queueDataFile.exists()){boolean success = queueDataFile.createNewFile();if(!success){throw new IOException("创建消息数据文件失败! queueDataFile: " + queueDataFile.getAbsolutePath());}}//3.创建消息统计文件File queueStatFile = new File(getQueueStatPath(queueName));if(!queueStatFile.exists()){boolean success = queueStatFile.createNewFile();if(!success){throw new IOException("创建消息统计文件失败! queueStatFile: " + queueStatFile.getAbsolutePath());}}//4.给消息统计文件设置初始值Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}

7.4 删除队列对应的文件和目录

public void destroyQueueFiles(String queueName) throws IOException {//先删除文件,再删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean success1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean success2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean success3 = baseDir.delete();if(!success1 || !success2 || !success3){//删除失败throw new IOException("删除队列目录和消息文件失败! baseDir: " + baseDir.getAbsolutePath());}}

 7.5 判断队列的目录和消息文件是否存在

    public boolean checkFileExists(String queueName){//判断队列的 消息数据文件 和 消息统计文件 是否都存在File queueDataFile = new File(getQueueDataPath(queueName));File queueStatFile = new File(getQueueStatPath(queueName));if(queueDataFile.exists() && queueStatFile.exists()){return true;}return false;}

7.6 新的消息写入到文件中 

步骤:

  • 检查要写入的队列对应的文件是否存在
  • 对 Message 对象进行序列化
  • 获取当前消息数据文件的长度,由此来设置当前要写入的消息的 offsetBeg 和 offsetEnd。

offsetBeg = 消息数据文件长度 + 4

offsetEnd = 消息数据文件长度 + 4 + 该消息序列化后的 byte 数组的长度

  • 写入消息数据文件,更新消息统计文件 
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 检查当前要写入的队列对应的文件是否存在if(!checkFileExists(queue.getName())){throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName: " + queue.getName());}// 把 Message 对象进行序列化,转成二进制的字节数组byte[] messageBinary = BinaryTool.toBytes(message);// 避免出现线程安全问题,即多个消息同时都往一个消息队列里面写消息synchronized (queue){// 获取到队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据写入到数据文件的末尾, 此时 Message 对象的 offsetBeg, 就是当前文件长度 + 4// offsetEnd 就是当前文件长度 + 4 + message长度File queueDateFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDateFile.length() + 4);message.setOffsetEnd(queueDateFile.length() + 4 + messageBinary.length);// 写入消息数据文件, 此处是追加写try (OutputStream outputStream = new FileOutputStream(queueDateFile, true)){try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 先写消息长度,占据 4 个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(),stat);}}

 7.7 删除队列对应的消息数据文件中的消息

这里的删除采用逻辑删除,即把 Message 对象从文件中读取出来之后,把 valid 属性设置成 0 ,再重新写入并更新消息统计文件。

    public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue){try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){// 读取对应的 Message 数据.byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 转换为 Message 对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);diskMessage.setIsValid((byte) 0x0);// 重新写入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}// 更新统计文件Stat stat = readStat(queue.getName());if(stat.validCount > 0){stat.validCount -= 1;}writeStat(queue.getName(),stat);}}

7.8 从文件中读取消息到内存中

    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();long currentOffset = 0; // 使用这个变量记录光标的位置try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){try (DataInputStream dataInputStream = new DataInputStream(inputStream)){while (true){//读取一条消息长度int messageSize =  dataInputStream.readInt();// 按照长度读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if(messageSize != actualSize){//不匹配说明文件有问题throw new MqException("[MessageFileManager] 文件格式错误! queueName: " + queueName);}//反序列化Message message = (Message) BinaryTool.fromBytes(buffer);//判断是否是无效数据if (message.getIsValid() != 0x1){currentOffset += (4 + messageSize);continue;}//有效数据,加入到链表中message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}}catch (EOFException e){System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}return messages;}}

 7.9 判断垃圾回收时机

这里的数字是拍脑门写的,当消息总数大于 2000 并且 消息的有效个数小于 50% 时,进行垃圾回收。

public boolean checkGC(String queueName){// 读取消息统计文件的数据Stat stat = readStat(queueName);if(stat.totalCount > 2000 && (double)stat.validCount / stat.totalCount < 0.5){return true;}return false;}

7.10 获取新消息数据文件的路径

    public String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}

 7.11 消息的垃圾回收

 这里我采用了复制算法进行垃圾回收,具体实现步骤:

  • 创建一个新的文件,命名为 queue_data_new.txt
  • 把之前消息数据文件的有效消息读取并写到新的文件中
  • 删除旧的消息数据文件,进行文件重命名,同时记录消息统计文件
    public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {synchronized (queue){long gcBeg = System.currentTimeMillis();//创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new.txt 已经存在! queueName: " + queue.getName());}boolean success = queueDataNewFile.createNewFile();if(!success){throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath());}// 从旧的消息文件中读取所有的有效数据文件LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 把有效的消息全部写入到新的文件中try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for (Message message : messages){byte[] buffer = BinaryTool.toBytes(message);//先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);// 再写消息的内容dataOutputStream.write(buffer);}}}// 删除旧的数据文件,进行文件重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));success = queueDataOldFile.delete();if(!success){throw new MqException("[MessageFileManager] 旧的数据文件删除失败! queueDataOldFile: " + queueDataOldFile.getAbsolutePath());}success = queueDataNewFile.renameTo(queueDataOldFile);if(!success){throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath() +", queueDataOldFile: " + queueDataOldFile.getAbsolutePath());}// 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName: " + queue.getName() +", time: " + (gcEnd - gcBeg) + "ms");}}

八、统一硬盘处理

消息存储在文件中,交换机、绑定、队列存储在数据库中,对此进行统一处理。也就是说使用一个类管理所有硬盘上的数据。

public class DiskDataCenter {//这个实例用来管理数据库的数据private DataBaseManager dataBaseManager = new DataBaseManager();//这个实例用来管理文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init(){dataBaseManager.init();messageFileManager.init();}/*** 封装交换机、绑定、队列操作* @param exchange*/public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//封装队列操作public void insertQueue(MSGQueue queue) throws IOException {messageFileManager.createQueueFiles(queue.getName());dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {messageFileManager.destroyQueueFiles(queueName);dataBaseManager.deleteQueue(queueName);}public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueues();}//封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}/*** 封装消息操作*/public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}

九、内存管理

这里主要使用了线程安全的哈希表保存内存中的消息、交换机、绑定、队列等。

public class MemoryDataCenter {// key 是 exchangeName value 是 Exchange 对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();// key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();// 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivate ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();// key 是 messageId, value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();// key 是 queueName, value 是 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();// 第一个 key 是 queueName 第二个 key 是 messageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}

 9.1 交换机相关操作的API

  • 新增交换机
public void insertExchange(Exchange exchange){exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName: " + exchange.getName());}
  • 查询交换机
    public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);}
  •  删除交换机
    public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 删除交换机成功! exchangeName: " + exchangeName);}

 9.2 队列相关操作的API

  • 新增队列 
    public void insertQueue(MSGQueue queue){queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功! queueName: " + queue.getName());}
  •  查询队列
    public MSGQueue getQueue(String queueName){return queueMap.get(queueName);}
  •  删除队列
    public void deleteQueue(String queueName){queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 删除队列成功成功! queueName: " + queueName);}

9.3 绑定相关操作的API

  • 新增绑定 
    public void insertBinding(Binding binding) throws MqException {//先使用 exchangeName 查一下对应的哈希表是否存在ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());synchronized (bindingMap){//再根据 queueName 查一下,如果已经存在,就抛出异常,不存在才能插入if(bindingMap.get(binding.getQueueName()) != null){throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName: " +binding.getExchangeName() + ", queueName: " + binding.getQueueName());}bindingMap.put(binding.getQueueName(),binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功! queueName: " + binding.getQueueName() + ", exchangeName: "+ binding.getExchangeName());}
  •  根据交换机名和队列名查询绑定
    public Binding getBinding(String exchangeName,String queueName){ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap == null){return null;}return bindingMap.get(queueName);}
  •  查询交换机绑定的所有队列
    public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}
  •  删除绑定
    public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null){//无法删除throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName: " + binding.getExchangeName() +", queueName: " + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功! queueName: " + binding.getQueueName() + ", exchangeName: "+ binding.getExchangeName());}

 9.4 消息相关操作的API

  • 添加消息 
    public void addMessage(Message message){messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功! messageId: " + message.getMessageId());}
  •  查询消息
    public Message getMessage(String messageId){return messageMap.get(messageId);}
  •  删除消息
    public void removeMessage(String messageId){messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息成功移除! messageId: " + messageId);}
  •  发送消息到指定队列
    public void sendMessage(MSGQueue queue, Message message){//把消息放到指定的数据结构中LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());synchronized (messages){messages.add(message);}addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId: " + message.getMessageId());}
  •  从队列中获取消息
    public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null){return null;}synchronized (messages){if(messages.size() == 0){return null;}//取头元素Message curMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId: " + curMessage.getMessageId());return curMessage;}}
  •  获取指定队列中的消息个数
    public int getMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null){return 0;}synchronized (messages){return messages.size();}}
  •  添加未确认的消息
    public void addMessageWaitAck(String queueName,Message message){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageHashMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId: " + message.getMessageId());}
  •  删除已经确认的消息
    public void removeMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null){return;}messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId: " + messageId);}
  •  获取未确认的消息
    public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null){return null;}return messageHashMap.get(messageId);}
  •  从硬盘读取数据恢复到内存中
    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {// 1. 恢复所有的交换机数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(), exchange);}// 2. 恢复所有的队列数据List<MSGQueue> queues = diskDataCenter.selectAllQueues();for(MSGQueue queue : queues){queueMap.put(queue.getName(), queue);}// 3. 恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for(Binding binding : bindings){ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(),binding);}// 4. 恢复所有的消息数据//    遍历所有的队列 根据每个队列的名字获取到所有的消息for(MSGQueue queue : queues){LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);for(Message message : messages){messageMap.put(message.getMessageId(), message);}}// 针对未确认的消息不需要从硬盘上恢复,一旦在等待 ack 的过程中服务器重启,此时被恢复成未被取走的消息}

 十、虚拟机 VirtualHost

每个虚拟机下都管理着自己的交换机、队列、绑定、消息这些数据,同时供上层API进行调用,本项目目前只支持单个交换机。

public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataCenter diskDataCenter = new DiskDataCenter();private Router router = new Router();private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();// 操作交换机的锁对象private final Object queueLocker = new Object(); // 操作队列的锁对象public VirtualHost(String name){this.virtualHostName = name;//对于 MemoryDataCenter 来说, 不需要初始化//针对 DiskDataCenter 来说,需要进行初始化操作,建库建表和初始数据的设定//此外,针对硬盘的数据,恢复到内存中diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException  | MqException  | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}public String getVirtualHostName() {return virtualHostName;}public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter() {return diskDataCenter;}

 其中,Router 类规定了交换机转发的规则:

public class Router {/*** bindingKey 的构造规则* 1.数字、字母、下划线* 2.使用 . 分割成若干部分* 3.允许存在 * 和 # 作为通配符,但是通配符只能作为一个独立的分段* @param bindingKey* @return*/public  boolean checkBindingKey(String bindingKey){}/*** routingKey 的构造规则:* 1.数字、字母、下划线* 2.使用 . 分割成若干部分*/public boolean checkRoutingKey(String routingKey){}/*** 判定该消息是否可以转发给这个绑定对应的队列* @param exchangeType* @param binding* @param message* @return*/public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException{}/*** 校验 bindingKey 和 routingKey 是否匹配* @param binding* @param message* @return*/private boolean routeTopic(Binding binding,Message message){}
}

10.1 创建交换机

    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments){//把交换机的名字加上虚拟主机的名字exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){//判定该交换机是否已经存在,直接通过内存查询Exchange existsExchange =  memoryDataCenter.getExchange(exchangeName);if(existsExchange != null){//该交换机已经存在System.out.println("[VirtualHost] 交换机已经存在! exchangeName: " + exchangeName);return true;}Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成! exchangeName: " + exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败! exchangeName: " + exchangeName);e.printStackTrace();return false;}}

 10.2 删除交换机

    public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if(toDelete == null){throw new MqException("[VirtualHost] 交换机不存在,无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName: " + exchangeName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 交换机删除失败! exchangeName: " + exchangeName);e.printStackTrace();return false;}}

 10.3 创建队列

    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String,Object> arguments){queueName = virtualHostName + queueName;try {synchronized (queueLocker){//1.判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue != null){System.out.println("[VirtualHost] 队列已经存在! queueName: " + queueName);return false;}//2.构造队列对象MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//3.插入硬盘if(queue.isDurable()){diskDataCenter.insertQueue(queue);}//4.插入内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName: " + queueName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 创建队列失败! queueName: " + queueName);e.printStackTrace();return false;}}

10.4 删除队列

    public boolean queueDelete(String queueName){queueName = virtualHostName + queueName;try {synchronized (queueLocker){MSGQueue toDelete = memoryDataCenter.getQueue(queueName);if (toDelete == null){throw new MqException("[VirtualHost] 队列不存在,无法删除!");}if(toDelete.isDurable()){diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 队列删除成功! queueName: " + queueName);}return true;}catch (Exception e){System.out.println("[VirtualHost] 队列删除失败! queueName: " + queueName);e.printStackTrace();return false;}}

 10.5 创建绑定

    public boolean queueBind(String queueName,String exchangeName,String bindingKey){queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){//1.判断绑定是否存在Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding != null){throw new MqException("[VirtualHost] 绑定已经存在! queueName: " + queueName + ", exchangeName: "+ exchangeName);}//2.判断绑定是否合法if(!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey 不合法! bindingKey: " + bindingKey);}//3.创建 Binding 对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//4.获取到对应的交换机和队列,如果对应的交换机和队列不存在,这样的绑定是无法创建的MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[VirtualHost] 该绑定对应的队列不存在! queueName: " + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange == null){throw new MqException("[VirtualHost] 该绑定对应的交换机不存在! exchangeName: " + exchangeName);}if(queue.isDurable() && exchange.isDurable()){diskDataCenter.insertBinding(binding);}memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 创建绑定成功! queueName: " + queueName + ", exchangeName: "+ exchangeName);}}return true;}catch (Exception e){System.out.println("[VirtualHost] 绑定创建失败! queueName: " + queueName + ", exchangeName: "+ exchangeName);e.printStackTrace();return false;}}

10.6 删除绑定

    public boolean queueUnbind(String queueName, String exchangeName){queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){//1.判断绑定是否存在Binding toDelete = memoryDataCenter.getBinding(exchangeName,queueName);if(toDelete == null){throw new MqException("[VirtualHost] 绑定不存在,无法删除! queueName: " + queueName+ ", exchangeName: " + exchangeName);}//2.无论绑定是否持久化,都尝试在硬盘上删一下,就算不存在,这个删除操作也没有副作用diskDataCenter.deleteBinding(toDelete);memoryDataCenter.deleteBinding(toDelete);System.out.println("[VirtualHost] 删除绑定成功! queueName: " + queueName + ", exchangeName: " + exchangeName);}}return true;}catch (Exception e){System.out.println("[VirtualHost] 删除绑定失败!");return false;}}

 10.7 发送消息

流程图:

实现步骤:

  • 检查 routingKey 是否合法
  • 判断交换机是否存在
  • 判断交换机的类型,根据不同的类型决定如何进行后续转发
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body){try {//1.转换交换机的名字exchangeName = virtualHostName + exchangeName;//2.检查 routingKey 是否合法if(!router.checkRoutingKey(routingKey)){throw new MqException("[virtualHost] routingKey 非法! routingKey: " + routingKey);}//3.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange == null){throw new MqException("[virtualHost] 交换机不存在! exchangeName: " + exchangeName);}//4.判定交换机的类型if(exchange.getType() == ExchangeType.DIRECT){//按照直接交换机的方式转发消息//以 routingKey 作为队列的名字,直接把消息写入指定的队列中//此时,可以无视绑定关系String queueName = virtualHostName + routingKey;//5.构造消息对象Message message = Message.createMessageWithId(routingKey,basicProperties,body);//6.查找队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[virtualHost] 队列不存在! queueName: " + queueName);}//7.队列存在,给队列中写入消息sendMessage(queue,message);}else{//按照 fanout 和 topic 的方式来转发//5.找到该交换机关联的所有绑定ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()){// 1) 获取到绑定队列,判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if(queue == null){System.out.println("[virtualHost] basicPublish 发送消息时,发现队列不存在! queueName: " + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey,basicProperties,body);// 3) 判定这个消息是否能转发给该队列// 如果是 fanout, 所有绑定的队列都要进行转发// 如果是 topic, 需要判定 bindingKey 和 routingKey 是否匹配if(!router.route(exchange.getType(),binding,message)){continue;}// 4) 真正转发消息给队列sendMessage(queue,message);}}return true;}catch (Exception e){System.out.println("[virtualHost] 消息发送失败!");e.printStackTrace();return false;}}

 上述过程中涉及到的被调用的 API:

    private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {//此处发送消息,就是调用之前封装好的 api, 写到内容和硬盘上int deliverMode = message.getDeliverMode();//deliverMode 为 1 不持久化//deliverMode 为 2 要持久化if(deliverMode == 2){diskDataCenter.sendMessage(queue,message);}//写入内存memoryDataCenter.sendMessage(queue,message);//通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}

10.8 订阅消息

流程图:

    /*** 订阅消息* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者* @param consumerTag 消费者的身份标识* @param queueName* @param autoAck 消息被消费完成后,应答的方式 true:自动应答; false:手动应答* @param consumer 是一个回调函数,此处类型设定为函数式接口,后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 表达式* @return*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 ConsumerEnv 对象添加到该队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[virtualHost] basicConsume 成功! queueName: " + queueName);return true;}catch (Exception e){System.out.println("[virtualHost] basicConsume 失败! queueName: " + queueName);e.printStackTrace();return false;}}

 上述过程涉及到了 ConsumerManager 类:

public class ConsumerManager {//持有上层 VirtualHost 对象的引用,用来操作数据private VirtualHost parent;//指定一个线程池,执行具体的回调任务private ExecutorService workerPool = Executors.newFixedThreadPool(4);//存放令牌的队列private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();//扫描线程private Thread scannerThread = null;public ConsumerManager(VirtualHost p){this.parent = p;scannerThread = new Thread(() ->{while (true){try {//1.拿到令牌String queueName = tokenQueue.take();//2.根据令牌找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 取令牌时队列名不存在! queueName: " + queueName);}//3.从队列中消费一个消息synchronized (queue){consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});//线程设为后台线程scannerThread.setDaemon(true);scannerThread.start();}/*** 调用时机:发送消息的时候* @param queueName* @throws InterruptedException*/public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName: " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);int n = parent.getMemoryDataCenter().getMessageCount(queueName);for(int i = 0; i < n; i++){//调用一次就消费一条消息consumeMessage(queue);}}}/*** 消费一个消息* @param queue*/private void consumeMessage(MSGQueue queue) {// 1.按照轮询的方式找个消费者出来ConsumerEnv lucyDog = queue.chooseConsumer();if(lucyDog == null){return;//当前队列没有消费者,暂时不消费}//2.从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){return;//当前队列中没有消息,也不需要消费}//3.把消息带入到消费者的回调方法中,让线程池去执行workerPool.submit(() ->{try {// 1) 先把消息放到待确认的集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);// 2) 执行回调lucyDog.getConsumer().handleDelivery(lucyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());// 3) 如果当前是 自动应答,直接把消息删除//    如果当前是手动应答,交给后续消费者调用 basicAck 来处理if(lucyDog.isAutoAck()){//删除硬盘上的消息if(message.getDeliverMode() == 2){//当前这个消息是持久化存储,需要删除硬盘上的消息parent.getDiskDataCenter().deleteMessage(queue,message);}//删除等待应答的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());//删除内存中的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName: " + queue.getName());}}catch (Exception e){e.printStackTrace();}});}
}

ConsumeMessage方法流程图:

 

10.9 消息确认

该方法只是手动应答的时候才会使用,应答成功, 则把消息删除掉。
public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {Message message = memoryDataCenter.getMessage(messageId);if(message == null){//要确认的消息不存在throw new MqException("[VirtualHost] 要确认的消息不存在! messageId: " + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[VirtualHost] 要确认的队列不存在! queueName: " + queueName);}//1.删除硬盘上的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}//2.删除消息中心中的数据memoryDataCenter.removeMessage(messageId);//3.删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName: " + queueName + ", messageId: " +messageId);return true;}catch (Exception e){System.out.println("[VirtualHost] basicAck 失败! 消息被确认失败! queueName: " + queueName + ", messageId: " +messageId);e.printStackTrace();return false;}}

十一、网络通信协议设计

生产者和消费者都是客户端程序,并且需要通过网络远程调用 BrokerServer 提供的 API, 这里,我使用 TCP 作为底层协议,在这个基础上自定义应用层协议,简单来说就是约定一下生产者以及消费者和 BrokerServer 之间交互的规范或者是传输数据的格式。

客户端要调用的功能有以下几个部分:

  • 创建 channel
  • 关闭 channel
  • 创建 exchange
  • 删除 exchange
  • 创建 queue
  • 删除 queue
  • 创建 binding
  • 删除 binding
  • 发送 message
  • 订阅 message
  • 发送 ack
  • 返回 message (服务器 -> 客户端)

11.1 设计应用层协议 

因为 Message 本身就是二进制数据,因此这里同样使用二进制的方式设定协议。

 其中 type 表示请求响应不同的功能,取值如下:

0x1

创建 channel
0x2关闭 channel
0x3
创建 exchange
0x4
销毁 exchange
0x5
创建 queue
0x6
销毁 queue
0x7
创建 binding
0x8
销毁 binding
0x9
发送 message
0xa
订阅 message
0xb
返回 ack
0xc
服务器给客户端推送的消息
 payload 部分, 会根据不同的 type, 存在不同的格式:
  • 对于请求来说, payload 表示这次方法调用的各种参数信息
  • 对于响应来说, payload 表示这次方法调用的返回值

 11.2 定义Request/Response

/*** @description:表示一个请求对象,按照自定义协议的格式展开* @created by 清风 on 2024/7/30 21:21*/
@Data
public class Request {private int type;private int length;private byte[] payload;
}

/*** @description:这是一个响应,也是根据自定义应用层协议来的* @created by 清风 on 2024/7/30 21:22*/
@Data
public class Response {private int type;private int length;private byte[] payload;
}

 11.3 定义参数父类

/*** @description:使用这个类来表示方法的公共参数/辅助的字段* 后续每个方法就会有不同的参数,不同的参数使用不同的子类表示* @created by 清风 on 2024/7/30 21:24*/
@Data
public class BasicArguments implements Serializable{// 表示一次请求和一次响应的身份标识,可以把请求和响应对上protected String rid;//本次通信使用的 channel 的身份标识protected String channelId;
}

11.4 定义返回值父类

/*** @description:表示各个远程的调用的方法的返回值的公共信息* @created by 清风 on 2024/7/30 21:28*/
@Data
public class BasicReturns implements Serializable {//用来标识唯一的请求和响应protected String rid;//用来标识一个 channelprotected String channelId;//表示远程调用方法的返回值protected boolean ok;
}

 11.5 定义其他参数类

针对每个 VirtualHost 提供的方法, 都需要有⼀个类表⽰对应的参数。这里写一个创建交换机的请求参数类,其他的相关请求参数都大同小异,想看的可以去源码链接看。
/*** @description:这个类是创建交换机的请求参数的类* @created by 清风 on 2024/7/30 21:33*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String,Object> arguments;
}
⼀个创建交换机的请求, 形如:
  • 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图片的结构
  • 按照 length 长度读取出 payload, 就可以把读到的⼆进制数据转换成 ExchangeDeclareArguments 对象

 十二、实现 BrokerServer 类

public class BrokerServer {private ServerSocket serverSocket = null;//一个 BrokerServer 上只有一个虚拟主机private VirtualHost virtualHost = new VirtualHost("default");//存储当前的所有会话,也就是有哪些客户端正在和服务器进行通信//此处的 key 是 channelId; value 为对应的 socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();//引入线程池来处理多个客户端的请求private ExecutorService executorService = null;//引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;
}

 12.1 启动/停止服务器

    public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable){Socket clientSocket = serverSocket.accept();//把处理连接的逻辑丢给线程池executorService.submit(() ->{processConnection(clientSocket);});}}catch (SocketException e){System.out.println("[BrokerServer] 服务器停止运行!");}}/*** 停止服务器*/public void stop() throws IOException {runnable = false;executorService.shutdownNow();//把线程池中的任务都放弃,让线程都销毁serverSocket.close();}

 12.2 实现处理连接

    /*** 通过这个方法来处理一个客户端的连接* 在这个连接中,可能会涉及到多个请求和响应* @param clientSocket*/private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){//这里需要按照特定格式读取并解析,此时需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){//1.读取请求并解析Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request,clientSocket);//3.把响应写回给客户端writeResponse(dataOutputStream,response);}}catch (EOFException | SocketException e){//对于当前代码, DataInputStream 如果读到 EOF, 就会抛出一个 EOFException 异常//需要借助这个异常来结束循环System.out.println("[BrokerServer] 连接关闭! 客户端地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());}}catch (IOException | ClassNotFoundException | MqException e){System.out.println("[BrokerServer] Connection 出现异常!");e.printStackTrace();}finally {try {//关闭 socketclientSocket.close();//一个 TCP 连接中,可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 清理掉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

 12.3 实现 readRequest writeResponse

    private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if(n != request.getLength()){throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());dataOutputStream.flush();}

 12.4 实现处理请求

  • 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
  • 再根据不同的 type, 分别处理不同的逻辑,(主要是调⽤ virtualHost 中不同的方法)
  • 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客户端
  • 最后构造成统⼀的响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//1.把 request 中的 payload 做一个初步的解析BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid: " + basicArguments.getRid() + ", channelId: " + basicArguments.getChannelId()+ ", type: " + request.getType() + ", length: " + request.getLength());//2.根据 type 的值来进一步区分接下来这次请求要干什么boolean ok = true;if(request.getType() == 0x1){//创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId: " + basicArguments.getChannelId());}else if(request.getType() == 0x2){//销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId: " + basicArguments.getChannelId());}else if(request.getType() == 0x3){//创建一个交换机//此时 payload 就是 ExchangeDeclareArguments 对象ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());}else if(request.getType() == 0x4){//销毁交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(),arguments.isAutoDelete(),arguments.getArguments());}else if(request.getType() == 0x6){//销毁队列QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());}else if(request.getType() == 0x7){//创建 BindingQueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());}else if(request.getType() == 0x8){//删除 BindingQueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());}else if(request.getType() == 0x9){//发送消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(),arguments.getBody());}else if(request.getType() == 0xa){//订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//这个回调函数要做的工作:把服务器收到的消息直接推送给对应的客户端//先知道当前这个收到的消息要发给哪个客户端//此处 consumerTag 其实是 channelId//1.根据 channel 找到 Socket 对象Socket clientSocket = sessions.get(consumerTag);if(clientSocket == null || clientSocket.isClosed()){throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}//2.构造响应数据//此处 response 的 payload 就是 SubScribeReturnsSubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");subScribeReturns.setOk(true);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();response.setType(0xc);//0xc 表示服务器给消费者客户端推送的数据response.setLength(payload.length);response.setPayload(payload);//3.把数据写回给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream,response);}});}else if(request.getType() == 0xb){//调用 basicAck 来确认消息BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());}else {//当前的 type 是非法的throw new MqException("[BrokerServer] 未知的 type: " + request.getType());}//3.构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid: " + basicReturns.getRid() + ", channelId: "+ basicReturns.getChannelId() + ", type: "+ response.getType() + ", length: " + response.getLength());return response;}

12.5 实现清理过期的会话

    /*** 清理过期的会话* @param clientSocket*/private void clearClosedSession(Socket clientSocket) {//遍历 sessions 哈希表List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){toDeleteChannelId.add(entry.getKey());}}for(String channelId : toDeleteChannelId){sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId: " + toDeleteChannelId);}

 十三、实现客户端

13.1 创建 ConnectionFactory

用来创建连接的工厂类:

@Data
public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口号private int port;// 访问 broker server 的哪个虚拟主机.// 下列几个属性暂时不搞了.
//    private String virtualHostName;
//    private String username;
//    private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}
}

 13.2 Connection 和 Channel 定义

  • 一个客户端可以创建多个 Connection
  • 一个 Connection 对应一个 Socket,一个TCP 连接
  • 一个 Connection 可以包含多个 Channel

Connection 定义: (这个类中其他的方法在我的项目源码中自行观看,主要包括处理响应、统一封装写请求和读取响应以及创建 Channel)

@Data
public class Connection {private Socket socket = null;// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool = null;
}
  • Socket 是客户端持有的套接字
  • InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接口
  • channelMap 用来管理该连接中所有的 Channel
  • callbackPool 是用来在客户端这边执行用户回调的线程池

 Channel 定义(这个类中主要包括的方法就是构造请求参数,和服务器交互进行相关的操作,也就是远程调用服务器提供的 API,可在项目源码自行观看)

@Data
public class Channel {private String channelId;// 当前这个 channel 属于哪个连接.private Connection connection;// 用来存储后续客户端收到的服务器的响应.private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.// 此处约定一个 Channel 中只能有一个回调.private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}
}

十四、样例演示

生产者: 

/*** @description:这个类用来表示一个生产者* 通常这是一个单独的服务器程序* @created by 清风 on 2024/8/3 19:38*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);//创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok :" + ok);Thread.sleep(500);channel.close();connection.close();}
}

消费者:

/*** @description:这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* @created by 清风 on 2024/8/3 19:39*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag: " + consumerTag);System.out.println("basicProperties: " + basicProperties);String bodyString = new String(body,0, body.length);System.out.println("body: " + bodyString);System.out.println("[消费数据] 结束!");}});//模拟一直等待消费while (true){Thread.sleep(500);}}
}

启动项目之后,再先后启动消费者和生产者: 

  • 启动项目,建库建表:

  • 启动消费者:

  • 启动生产者: 

  •  查看消费者端控制台: 


至此,一个简易版本的MQ实现。文章篇幅太长,可能过于繁琐,还请各位读者有不满意的地方多多指教! 


http://www.ppmy.cn/devtools/95679.html

相关文章

MySQL 常用 SQL 语句大全

1. 基本查询 查询所有记录和字段 SELECT * FROM table_name; 查询特定字段 SELECT column1, column2 FROM table_name; 查询并限制结果 SELECT column1, column2 FROM table_name LIMIT 10; 条件查询 SELECT column1, column2 FROM table_name WHERE condition; 模糊匹…

版本控制基础理论

一、本地版本控制 在本地记录文件每次的更新&#xff0c;可以对每个版本做一个快照&#xff0c;或是记录补丁文件&#xff0c;适合个人使用&#xff0c;如RCS. 二、集中式版本控制&#xff08;代表SVN&#xff09; 所有的版本数据都保存在服务器上&#xff0c;协同开发者从…

HarmonyOS应用开发学习-ArkUI-容器组件

ArkUI-容器组件 1 ROW组件 沿水平方向布局容器。可以包含子组件。 Row(value?:{space?:string | number}) 参数&#xff1a; 参数名 参数类型 必填 默认值 参数描述 space string | number 否 0 横向布局元素间距 属性&#xff1a; 名称 参数类型 默认值 描…

JavaScript 基础(四)

五、DOM编程 1.常用事件 onload 页面加载后触发事件 onscroll 滚动时触发 onresize 尺寸变化时 onclick 鼠标点击 onmouseover 鼠标悬停 onmouseout 鼠标移出 onmousemove 鼠标移动&#xff0c;会触发多次 onfocus 对象获得光标&#xff08;焦点&#xff09;时&#x…

C语言 【自定义类型——结构体】(详细)

目录 1、结构体的定义 2、创建与初始化结构体变量 2.0 举例 2.1 结构体的特殊声明 2.1.0 匿名结构体 2.1.1 结构体的自引用 3、结构体内存对齐 3.0 为什么要内存对齐 3.1 对齐规则 3.2 如何修改默认对齐数 4、结构体传参 5、结构体中的位段使用 5.0 什么是位段&…

【研发日记】嵌入式处理器技能解锁(四)——TI C2000 DSP的Memory

文章目录 前言 背景介绍 Memory映射 RAM ROM 外设Register Memory分配 应用实例 总结 参考资料 前言 见《【研发日记】嵌入式处理器技能解锁(一)——多任务异步执行调度的三种方法》 见《【研发日记】嵌入式处理器技能解锁(二)——TI C2000 DSP的SCI(串口)通信》 见《…

编程修炼之Hibernate--- springboot启动初始化ddl过程

文章目录 跟踪Springboot整合hibernate的启动代码&#xff1a; 开始初始化 entityManagerFactory 创建方言 dialect 继续排查

EmguCV学习笔记 C# 2.5 Mat类、Matrix类和Image类的相互转换

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 EmguCV学习笔记目录 Vb.net EmguCV学习笔记目录 C# 笔者的博客网址&#xff1a;VB.Net-CSDN博客 教程相关说明以及如何获得pdf教…

从直播美颜工具到视频美颜SDK:实时美颜技术的实现与挑战

从最初的直播美颜工具到如今的高级视频美颜SDK&#xff0c;这一技术经历了快速的发展和演进。今天&#xff0c;笔者将讲解实时美颜技术的实现过程与其面临的挑战。 一、实时美颜技术的背景与需求 美颜技术不仅仅是对皮肤瑕疵的简单修饰&#xff0c;更涵盖了智能化的人脸识别、…

FreeRTOS信号量

文章目录 一、信号量的特性1、信号量的常规操作2、信号量跟队列的对比3、两种信号量的对比 二、信号量函数1、创建2、删除3、give/take 三、示例: 使用计数型信号量四、示例: 二进制信号量五、优先级反转 前面介绍的队列(queue)可以用于传输数据&#xff1a;在任务之间、任务和…

开源免费的表单收集系统TDuck

TDuck&#xff08;填鸭表单&#xff09;是一款开源免费的表单收集系统&#xff0c;它基于Apache 2.0协议开源&#xff0c;用户可以随时下载源码&#xff0c;自由修改和定制&#xff0c;也可以参与到项目的贡献和反馈中。TDuck表单系统不仅支持私有化部署&#xff0c;还提供了丰…

用后端实现一个简单的登录模块3 注册

该模块能做到的功能&#xff1a; 1阶&#xff1a;输入账号和密码&#xff0c;输入正确即可返回登录成功的信息&#xff0c;反之则登录失败 2阶&#xff1a;有简单的前端页面&#xff0c;有登录成功和失败的弹窗&#xff0c;还有登录成功的主页面 3阶&#xff1a;添加注册功能…

[Sqlserver][索引]SQL Server 索引概述

SQL Server 索引概述 索引简介 目的&#xff1a;提升SQL Server性能&#xff0c;加快查询速度&#xff0c;减少响应时间。限制&#xff1a;合理使用索引&#xff0c;避免过多索引影响数据更新操作和浪费硬盘空间。 索引分类 唯一索引 (UNIQUE)&#xff1a;确保索引值唯一。…

QT 数据导出到Excel

原创&#xff1a;QT 数据导出到Excel 在Qt自带的axcontainer模块中&#xff0c;我们可以使用QAxObject类来将数据保存到Excel中。Qt中将数据保存到Excel通常有两种方式&#xff1a;一种是以Excel格式导出&#xff0c;需要电脑上安装Office软件&#xff1b;另一种是以CSV格式导出…

React 学习——打包后,包体积可视化

1、安装插件 &#xff08; source-map-explorer &#xff09; npm i source-map-explorer 2、在配置文件package.json中加入 &#xff08; "analyze": "source-map-explorer build/static/js/*.js" &#xff09;&#xff0c;位置截图 "analyze&q…

力扣热题100_二分查找_35_搜索插入位置

文章目录 题目链接解题思路解题代码 题目链接 35. 搜索插入位置 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 …

聊聊JS中的WebSocket

你好&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏和关注。个人知乎 在JavaScript中&#xff0c;使用WebSocket非常简单直观。通过几行代码&#xff0c;你就可以轻松创建一个WebSocket连接&#xff0c;并监听来自服务器的消息。无论是开发实时聊天应用、在线游戏、实时数据分…

【项目】多设计模式下的同步异步日志系统(一)

继完成仿RabbitMq后&#xff0c;日志消息的不规范在&#xff0c;导致在调试的时候非常的麻烦。吸取了之前的经验后&#xff0c;以后要好好的打日志。博主在学习了设计模式后&#xff0c;做了这个日志系统项目。 总体来说&#xff0c;相对简易RabbitMq的实现更加简单。错误也明…

Containerd初体验

containerd概述 一、定义与功能 定义&#xff1a;Containerd是一个管理容器生命周期、镜像拉取和存储的工业级容器运行时。它提供了容器运行所需的核心功能&#xff0c;如镜像管理、容器生命周期管理、网络和存储管理等。功能&#xff1a; 管理容器的生命周期&#xff1a;从创…

【QT】基于UDP/TCP/串口 的Ymodom通讯协议客户端

【QT】基于UDP/TCP/串口的Ymodom通讯协议客户端 前言Ymodom实现QT实现开源库的二次开发-1开源库的二次开发-2 串口方式实现TCP方式实现UDP方式实现补充&#xff1a;文件读取补充&#xff1a;QT 封装成EXE 前言 Qt 运行环境 Desktop_Qt_5_11_2_MSVC2015_64bit &#xff0c;基于…