仿 RabbitMQ 的消息队列1(实战项目)

embedded/2025/1/22 14:45:46/

一,消息队列的背景知识

我们以前学过阻塞队列,其实阻塞队列和消息队列的原理差不多。
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)

二,需求分析

具体的生产者 消费者 阻塞队列之间关系如图:
在这里插入图片描述
我们主要就是对Broker server进行开发:
在 Broker 中, ⼜存在以下概念.

  • 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost.
  • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue.
  • 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.
  • 消息 (Message): 传递的内容.
  • 如图所示:
    在这里插入图片描述

核心API

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

交换机类型

• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.(这里看不明白就当先了解概念,后面涉及到的时候会详谈)。

持久化

Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.

网络通信

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.Connection 对应⼀个 TCP 连接.Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.

这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
在这里插入图片描述

消息应答

• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.(⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.)

三,模块划分

在这里插入图片描述

四,项目创建

创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 17.
依赖引⼊ Spring Web 和 MyBatis.

五,创建核心类

先将这几个包创建了,common(共同部分),mqclient(客户端),mqserver(服务器)
mqserver里:core(核心)mapper(数据库的映射),model
在这里插入图片描述

在core包里创建核心类:Exchange

java">/*** 这个类表示一个交换机*/
public class Exchange {//此处用name表示交换机的身份识别:(唯一)private String name;//交换机类型:direct,fanout,topicprivate ExchangeType type = ExchangeType.DIRECT;//该交换机是否要持久化储存, true为持久化,false为不持久化private boolean durable = false;//自动删除:如果当前交换机没人用了,就自动删除,此功能我们只是列出来,后续不会进行实现(RabbiteMq是有的)private boolean autoDelete = false;//arguments表示创建交换机的一些额外参数,今后我们并不会实现,只是先列出来。private Map<String, Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。
}

创建ExchangeType类

java">public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;ExchangeType(int type){this.type = type;}public int getType(){return type;}
}

创建MESGQueue 类
(由于直接使用名字Queue和标准库里的queue重复,所以就以MESGQueue命名消息队列)

java">/*** MESG ->message  MESGQueue = 消息队列,储存消息的队列*/
public class MESGQueue {//名字:private String name;//是否持久化保存:true持久化保存, false不持久化保存private boolean durable = false;//独有的,如果为true表示只能有一个消费者使用,如果为false表示所有的消费者都能使用,此处只是列出来,暂时不做实现private boolean exclusive = false;//是否自动删除:当不再使用这个消息队列的时候是否自动删除。此处只是列出来,暂时不做实现private boolean autoDelete = false;//表示拓展参数:此处只是列出来,暂时不做实现private Map<String,Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。

创建Binding类

java">
/*** 队列与交换机之间的关联关系*/
public class Binding {private String exchangeName;private String queueName;//bindingKey就是在出题,当发来一个消息的时候会附带一个routingKey,此时会验证routingKey是否和bindingKey符合//某种匹配规则,如果符合,就将这个消息加入到该消息队列当中。private String bindingKey;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}

创建消息 Message类
由于以后的消息要储存在文件当中,所以要进行序列化,因此这个Message要实现Serializable接口。

java">
/*** 表示一个要传递的消息*/
public class Message implements Serializable {//要传递消息的属性:private BasicProperties basicProperties = new BasicProperties();private byte[] body;//表示偏移量,由于我们要将消息储存在一个文件中,所以记忆好 begin和end 能找到这个消息的具体存在的位置[begin,end)private transient long offsetBeg;//transient表示不被序列化private transient long offsetEnd;//0x1表示有效  0x0表示无效private byte isVail = 0x1;//创建一个能自动生成message的工厂类:public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();if(basicProperties != null) {message.setBasicProperties(basicProperties);}message.setMessageId("M+"+ UUID.randomUUID().toString());message.setRoutingKey(routingKey);message.setBody(body);//对于offsetBeg,offsetEnd,isVail,此时只是在内存中创建了一个对象,这些值会在之后的持久化操作中进行设置。return message;}public BasicProperties getBasicProperties() {return basicProperties;}public void setMessageId(String messageId){this.basicProperties.setMessageId(messageId);}public String getMessageId(){return this.basicProperties.getMessageId();}public void setRoutingKey(String routingKey){this.basicProperties.setRoutingKey(routingKey);}public String getRoutingKey(){return this.basicProperties.getRoutingKey();}public void setDeliverMode(int deliverMode){this.basicProperties.setDeliverMode(deliverMode);}public int getDeliverMode(){return this.basicProperties.getDeliverMode();}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg = offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd = offsetEnd;}public byte getIsVail() {return isVail;}public void setIsVail(byte isVail) {this.isVail = isVail;}
}

消息里的基本属性 BasicProperties 类

java">
public class BasicProperties implements Serializable {//使用String作为唯一身份标识,使用UUID 生成messageidprivate String messageId;//是一个消息 带有的内容,为了和bindingKey做匹配//如果当前交换机是direct,routingKey就是转发的队列名//如果当前交换机是fanout,不会使用routingKey,因为那样无意义。//如果当前交换机是topic,此时bindingKey就要和routingKey做匹配,只有符合要求才会转发给相应的队列。private String routingKey;//表示是否要持久化,不持久化 1,  持久化:2private int deliverMode = 1;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public int getDeliverMode() {return deliverMode;}public void setDeliverMode(int deliverMode) {this.deliverMode = deliverMode;}
}

总的来说,现在创建的类一共就这些:
在这里插入图片描述

六,数据库设计

对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.
1,配置sqlite,引⼊ pom.xml 依赖

java"><dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>

简图如下:
在这里插入图片描述

2,配置数据源 application.yml (默认是 .properties后缀,可以直接重命名改成yml后缀,这是常用的方法,但是改完以后就要注意格式对齐了)

java">spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml

Username 和 password 空着即可.
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.

在这里插入图片描述

3,创建数据库表
准备工作:
创建MetaMapper类,和MetaMapper.xml
在这里插入图片描述
在这里插入图片描述
metaMapper.xml里的配置

java"><?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">

创建数据库表的方法实现

java">@Mapper
public interface MetaMapper {//创建 交换机数据库表void createExchangeTable();//创建 队列数据库表void createMESGQueueTable();//创建 绑定数据库表void createBindingTable();}

但是,对于数据库的操作,有四种 insert,delete,select,update
可是,没有创建create啊,所以我们将create语句写在update里。
创建对应xml

java"><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createMESGQueueTable">create table if not exists MESGQueue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));</update>

插入

java">	//对ExchangeTable进行插入操作void insertExchangeTable(Exchange exchange);//对MESGQueueTable进行插入操作void insertMESGQueueTable(MESGQueue mesgQueue);//对BindingTable进行插入操作void insertBindingTable(Binding binding);

插入对应 xml

java"> <insert id="insertExchangeTable" parameterType="com.example.mq.mqserver.core.Exchange">insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});</insert><insert id="insertMESGQueueTable" parameterType="com.example.mq.mqserver.core.MESGQueue">insert into MESGQueue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});</insert><insert id="insertBindingTable" parameterType="com.example.mq.mqserver.core.Binding">insert into binding values (#{exchangeName},#{queueName},#{bindingKey});</insert>

删除

java">//对ExchangeTable进行删除操作void deleteExchangeTable(String exchangeName);//对MESGQueueTable进行删除操作void deleteMESGQueueTable(String mesgQueueName);//对BindingTable进行删除操作void deleteBindingTable(Binding binding);

删除对应 xml

java"> <delete id="deleteExchangeTable">delete from exchange where name = #{exchangeName};</delete><delete id="deleteMESGQueueTable">delete from MESGQueue where name = #{mesgQueueName};</delete><delete id="deleteBindingTable" parameterType="com.example.mq.mqserver.core.Binding">delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};</delete>

查询

java">//查找所有的exchange交换机List<Exchange> selectExchangeTable();//查找所有的DESGQueue消息队列List<MESGQueue> selectMESGQueueTable();//查找所有的Binding绑定List<Binding> selectBindingTable();

查询对应 xml

java"><select id="selectExchangeTable" resultType="com.example.mq.mqserver.core.Exchange">select * from exchange;</select><select id="selectMESGQueueTable" resultType="com.example.mq.mqserver.core.MESGQueue">select * from MESGQueue;</select><select id="selectBindingTable" resultType="com.example.mq.mqserver.core.Binding">select * from binding;</select>

http://www.ppmy.cn/embedded/156073.html

相关文章

基于微信小程序的手机银行系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

Java 在包管理与模块化中的优势:与其他开发语言的比较

在开发复杂的、规模庞大的软件系统时&#xff0c;包管理和模块化设计起着至关重要的作用。它们不仅决定了代码的组织和可维护性&#xff0c;还直接影响到团队协作效率、扩展性和性能。在众多编程语言中&#xff0c;Java 凭借其成熟的生态系统、强类型系统和标准化的包管理机制&…

matlab计算功率谱的四种方法

%{ 计算功率谱的四种方法&#xff0c;前三种方法采用归一化频率&#xff0c;第四种方法采用原始频率 %} clear;clc;close all; % 生成一个示例时间序列&#xff08;例如正弦波加上一些随机噪声&#xff09; t 0:0.01:100-0.1; % 时间向量 % x sin(t) 0.5 * randn(size(t))…

题解 CodeForces 131D Subway BFS C++

题目传送门 Problem - 131D - Codeforceshttps://codeforces.com/problemset/problem/131/Dhttps://codeforces.com/problemset/problem/131/D 翻译 地铁方案&#xff0c;对于Berland城市来说是一种经典的表示&#xff0c;由一组n站点和连接这些站点的n通道组成&#xff0c;…

Linux C\C++方式下的文件I/O编程

【图书推荐】《Linux C与C一线开发实践&#xff08;第2版&#xff09;》_linux c与c一线开发实践pdf-CSDN博客 《Linux C与C一线开发实践&#xff08;第2版&#xff09;&#xff08;Linux技术丛书&#xff09;》(朱文伟&#xff0c;李建英)【摘要 书评 试读】- 京东图书 Lin…

路径规划之启发式算法之二十八:候鸟优化算法(Migrating Birds Optimization, MBO)

候鸟优化算法(Migrating Birds Optimization, MBO)是一种基于群体智能的元启发式优化算法,其灵感来源于候鸟迁徙时的“V”字形飞行队列。这种队列结构能够有效减少能量消耗,同时提高飞行效率。MBO算法通过模拟候鸟的迁徙行为,利用群体间的协作和信息共享来优化问题的解。 …

JavaScript DOM 操作与事件处理

Hi&#xff0c;我是布兰妮甜 &#xff01;在现代Web开发中&#xff0c;JavaScript不仅是用来增强用户体验的工具&#xff0c;它更是创建动态、交互式网页的关键。通过操作文档对象模型&#xff08;DOM&#xff09;和处理用户事件&#xff0c;开发者能够构建出响应迅速且功能丰富…

Oracle之RMAN备份异机恢复(单机到单机)

Oracle之RMAN备份异机恢复&#xff08;单机到单机&#xff09; 一、环境说明二、正式库进行RMAN备份三、将正式库备份与参数文件拷贝到测试库四、测试库异机恢复五、验证数据 一、环境说明 系统版本主机名DB版本DB名实例名Public-IP正式库Redhat9.5lemonEnterprise 19.25lemon…