仿 RabbitMQ 的消息队列1(实战项目)

ops/2025/1/20 19:43:25/

一,消息队列的背景知识

我们以前学过阻塞队列,其实阻塞队列和消息队列的原理差不多。
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)

二,需求分析

具体的生产者 消费者 阻塞队列之间关系如图:
在这里插入图片描述
我们主要就是对Broker server进行开发:
在 Broker 中, ⼜存在以下概念.

  • 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost.
  • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue.
  • 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.
  • 消息 (Message): 传递的内容.
  • 如图所示:
    在这里插入图片描述

核心API

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

交换机类型

• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.(这里看不明白就当先了解概念,后面涉及到的时候会详谈)。

持久化

Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.

网络通信

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.Connection 对应⼀个 TCP 连接.Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.

这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
在这里插入图片描述

消息应答

• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.(⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.)

三,模块划分

在这里插入图片描述

四,项目创建

创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 17.
依赖引⼊ Spring Web 和 MyBatis.

五,创建核心类

先将这几个包创建了,common(共同部分),mqclient(客户端),mqserver(服务器)
mqserver里:core(核心)mapper(数据库的映射),model
在这里插入图片描述

在core包里创建核心类:Exchange

java">/*** 这个类表示一个交换机*/
public class Exchange {//此处用name表示交换机的身份识别:(唯一)private String name;//交换机类型:direct,fanout,topicprivate ExchangeType type = ExchangeType.DIRECT;//该交换机是否要持久化储存, true为持久化,false为不持久化private boolean durable = false;//自动删除:如果当前交换机没人用了,就自动删除,此功能我们只是列出来,后续不会进行实现(RabbiteMq是有的)private boolean autoDelete = false;//arguments表示创建交换机的一些额外参数,今后我们并不会实现,只是先列出来。private Map<String, Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。
}

创建ExchangeType类

java">public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;ExchangeType(int type){this.type = type;}public int getType(){return type;}
}

创建MESGQueue 类
(由于直接使用名字Queue和标准库里的queue重复,所以就以MESGQueue命名消息队列)

java">/*** MESG ->message  MESGQueue = 消息队列,储存消息的队列*/
public class MESGQueue {//名字:private String name;//是否持久化保存:true持久化保存, false不持久化保存private boolean durable = false;//独有的,如果为true表示只能有一个消费者使用,如果为false表示所有的消费者都能使用,此处只是列出来,暂时不做实现private boolean exclusive = false;//是否自动删除:当不再使用这个消息队列的时候是否自动删除。此处只是列出来,暂时不做实现private boolean autoDelete = false;//表示拓展参数:此处只是列出来,暂时不做实现private Map<String,Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。

创建Binding类

java">
/*** 队列与交换机之间的关联关系*/
public class Binding {private String exchangeName;private String queueName;//bindingKey就是在出题,当发来一个消息的时候会附带一个routingKey,此时会验证routingKey是否和bindingKey符合//某种匹配规则,如果符合,就将这个消息加入到该消息队列当中。private String bindingKey;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}

创建消息 Message类
由于以后的消息要储存在文件当中,所以要进行序列化,因此这个Message要实现Serializable接口。

java">
/*** 表示一个要传递的消息*/
public class Message implements Serializable {//要传递消息的属性:private BasicProperties basicProperties = new BasicProperties();private byte[] body;//表示偏移量,由于我们要将消息储存在一个文件中,所以记忆好 begin和end 能找到这个消息的具体存在的位置[begin,end)private transient long offsetBeg;//transient表示不被序列化private transient long offsetEnd;//0x1表示有效  0x0表示无效private byte isVail = 0x1;//创建一个能自动生成message的工厂类:public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();if(basicProperties != null) {message.setBasicProperties(basicProperties);}message.setMessageId("M+"+ UUID.randomUUID().toString());message.setRoutingKey(routingKey);message.setBody(body);//对于offsetBeg,offsetEnd,isVail,此时只是在内存中创建了一个对象,这些值会在之后的持久化操作中进行设置。return message;}public BasicProperties getBasicProperties() {return basicProperties;}public void setMessageId(String messageId){this.basicProperties.setMessageId(messageId);}public String getMessageId(){return this.basicProperties.getMessageId();}public void setRoutingKey(String routingKey){this.basicProperties.setRoutingKey(routingKey);}public String getRoutingKey(){return this.basicProperties.getRoutingKey();}public void setDeliverMode(int deliverMode){this.basicProperties.setDeliverMode(deliverMode);}public int getDeliverMode(){return this.basicProperties.getDeliverMode();}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg = offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd = offsetEnd;}public byte getIsVail() {return isVail;}public void setIsVail(byte isVail) {this.isVail = isVail;}
}

消息里的基本属性 BasicProperties 类

java">
public class BasicProperties implements Serializable {//使用String作为唯一身份标识,使用UUID 生成messageidprivate String messageId;//是一个消息 带有的内容,为了和bindingKey做匹配//如果当前交换机是direct,routingKey就是转发的队列名//如果当前交换机是fanout,不会使用routingKey,因为那样无意义。//如果当前交换机是topic,此时bindingKey就要和routingKey做匹配,只有符合要求才会转发给相应的队列。private String routingKey;//表示是否要持久化,不持久化 1,  持久化:2private int deliverMode = 1;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public int getDeliverMode() {return deliverMode;}public void setDeliverMode(int deliverMode) {this.deliverMode = deliverMode;}
}

总的来说,现在创建的类一共就这些:
在这里插入图片描述

六,数据库设计

对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.
1,配置sqlite,引⼊ pom.xml 依赖

java"><dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>

简图如下:
在这里插入图片描述

2,配置数据源 application.yml (默认是 .properties后缀,可以直接重命名改成yml后缀,这是常用的方法,但是改完以后就要注意格式对齐了)

java">spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml

Username 和 password 空着即可.
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.

在这里插入图片描述

3,创建数据库表
准备工作:
创建MetaMapper类,和MetaMapper.xml
在这里插入图片描述
在这里插入图片描述
metaMapper.xml里的配置

java"><?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">

创建数据库表的方法实现

java">@Mapper
public interface MetaMapper {//创建 交换机数据库表void createExchangeTable();//创建 队列数据库表void createMESGQueueTable();//创建 绑定数据库表void createBindingTable();}

但是,对于数据库的操作,有四种 insert,delete,select,update
可是,没有创建create啊,所以我们将create语句写在update里。
创建对应xml

java"><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createMESGQueueTable">create table if not exists MESGQueue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));</update>

插入

java">	//对ExchangeTable进行插入操作void insertExchangeTable(Exchange exchange);//对MESGQueueTable进行插入操作void insertMESGQueueTable(MESGQueue mesgQueue);//对BindingTable进行插入操作void insertBindingTable(Binding binding);

插入对应 xml

java"> <insert id="insertExchangeTable" parameterType="com.example.mq.mqserver.core.Exchange">insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});</insert><insert id="insertMESGQueueTable" parameterType="com.example.mq.mqserver.core.MESGQueue">insert into MESGQueue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});</insert><insert id="insertBindingTable" parameterType="com.example.mq.mqserver.core.Binding">insert into binding values (#{exchangeName},#{queueName},#{bindingKey});</insert>

删除

java">//对ExchangeTable进行删除操作void deleteExchangeTable(String exchangeName);//对MESGQueueTable进行删除操作void deleteMESGQueueTable(String mesgQueueName);//对BindingTable进行删除操作void deleteBindingTable(Binding binding);

删除对应 xml

java"> <delete id="deleteExchangeTable">delete from exchange where name = #{exchangeName};</delete><delete id="deleteMESGQueueTable">delete from MESGQueue where name = #{mesgQueueName};</delete><delete id="deleteBindingTable" parameterType="com.example.mq.mqserver.core.Binding">delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};</delete>

查询

java">//查找所有的exchange交换机List<Exchange> selectExchangeTable();//查找所有的DESGQueue消息队列List<MESGQueue> selectMESGQueueTable();//查找所有的Binding绑定List<Binding> selectBindingTable();

查询对应 xml

java"><select id="selectExchangeTable" resultType="com.example.mq.mqserver.core.Exchange">select * from exchange;</select><select id="selectMESGQueueTable" resultType="com.example.mq.mqserver.core.MESGQueue">select * from MESGQueue;</select><select id="selectBindingTable" resultType="com.example.mq.mqserver.core.Binding">select * from binding;</select>

http://www.ppmy.cn/ops/151741.html

相关文章

【数据分享】1929-2024年全球站点的逐日平均气温数据(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;其中又以气温指标最为常用&#xff01;说到气温数据&#xff0c;最详细的气温数据是具体到气象监测站点的气温数据&#xff01;本次我们为大家带来的就是具体到气象监…

[实现Rpc] 环境搭建 | JsonCpp | Mudou库 | callBack()

目录 1. 项目介绍 2. 技术选型 3. 开发环境和环境搭建 Ubuntu-22.04环境搭建 1. 安装 wget&#xff08;一般情况下默认会自带&#xff09; 2. 更换国内软件源 ① 备份原始 /etc/apt/sources.list 文件 ② 编辑软件源文件 ③ 更新软件包列表 3. 安装常用工具 3.1 安装…

Spring Boot 整合 Redis:提升应用性能的利器

Redis (Remote Dictionary Server) 是一款高性能的键值对存储数据库&#xff0c;它以内存存储为主&#xff0c;具有速度快、支持丰富的数据类型等特点&#xff0c;被广泛应用于缓存、会话管理、排行榜等场景。 Spring Boot 提供了对 Redis 的良好支持&#xff0c;使得我们可以轻…

yt-dlp脚本下载音频可选设置代理

import yt_dlp# 配置:是否使用代理 use_proxy = True # 设置为 False 可关闭代理# 代理地址 proxy_url = socks5://127.0.0.1:1089URLS = [https://www.bilibili.com/video/BV1WTktYcEcQ/?spm_id_from=333.1007.tianma.6-2-20.click&vd_source=dcb58f8fe1faf749f438620b…

《多模态语言模型的局限性与生态系统发展现状分析》

1. 多模态语言模型的主要局限性 推理能力问题 复杂推理任务表现不稳定图像理解深度差异大推理过程存在逻辑跳跃 技术实现挑战 视觉特征与语言理解的融合不完善训练数据和方法有限跨模态理解算法需优化 2. 生态系统的不成熟表现 评测标准问题 缺乏标准化评测框架性能评估方法…

复杂查询优化:避免 SQL 查询中的 N+1 查询问题

在 SQL 查询优化中&#xff0c;N1 查询问题是一个常见的性能问题&#xff0c;特别是在关系型数据库中。当你的查询不当时&#xff0c;可能会导致对数据库进行大量的额外查询&#xff0c;造成不必要的性能损耗。 什么是 N1 查询问题&#xff1f; N1 查询问题通常出现在一对多或…

T-SQL语言的数据库交互

T-SQL语言的数据库交互 引言 随着信息技术的不断发展&#xff0c;数据库在各个行业中扮演着越来越重要的角色。数据库的有效管理和优化对于企业的数据安全、效率提升和决策支持至关重要。T-SQL&#xff08;Transact-SQL&#xff09;作为微软SQL Server的重要扩展语言&#xf…

【Rust自学】13.3. 闭包 Pt.3:使用泛型参数和fn trait来存储闭包

13.3.0. 写在正文之前 Rust语言在设计过程中收到了很多语言的启发&#xff0c;而函数式编程对Rust产生了非常显著的影响。函数式编程通常包括通过将函数作为值传递给参数、从其他函数返回它们、将它们分配给变量以供以后执行等等。 在本章中&#xff0c;我们会讨论 Rust 的一…