一,消息队列的背景知识
我们以前学过阻塞队列,其实阻塞队列和消息队列的原理差不多。
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)
二,需求分析
具体的生产者 消费者 阻塞队列之间关系如图:
我们主要就是对Broker server进行开发:
在 Broker 中, ⼜存在以下概念.
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost.
- 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue.
- 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.
- 消息 (Message): 传递的内容.
- 如图所示:
核心API
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
交换机类型
• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.(这里看不明白就当先了解概念,后面涉及到的时候会详谈)。
持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.
网络通信
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.Connection 对应⼀个 TCP 连接.Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
消息应答
• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.(⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.)
三,模块划分
四,项目创建
创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 17.
依赖引⼊ Spring Web 和 MyBatis.
五,创建核心类
先将这几个包创建了,common(共同部分),mqclient(客户端),mqserver(服务器)
mqserver里:core(核心)mapper(数据库的映射),model
在core包里创建核心类:Exchange
java">/*** 这个类表示一个交换机*/
public class Exchange {//此处用name表示交换机的身份识别:(唯一)private String name;//交换机类型:direct,fanout,topicprivate ExchangeType type = ExchangeType.DIRECT;//该交换机是否要持久化储存, true为持久化,false为不持久化private boolean durable = false;//自动删除:如果当前交换机没人用了,就自动删除,此功能我们只是列出来,后续不会进行实现(RabbiteMq是有的)private boolean autoDelete = false;//arguments表示创建交换机的一些额外参数,今后我们并不会实现,只是先列出来。private Map<String, Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。
}
创建ExchangeType类
java">public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;ExchangeType(int type){this.type = type;}public int getType(){return type;}
}
创建MESGQueue 类
(由于直接使用名字Queue和标准库里的queue重复,所以就以MESGQueue命名消息队列)
java">/*** MESG ->message MESGQueue = 消息队列,储存消息的队列*/
public class MESGQueue {//名字:private String name;//是否持久化保存:true持久化保存, false不持久化保存private boolean durable = false;//独有的,如果为true表示只能有一个消费者使用,如果为false表示所有的消费者都能使用,此处只是列出来,暂时不做实现private boolean exclusive = false;//是否自动删除:当不再使用这个消息队列的时候是否自动删除。此处只是列出来,暂时不做实现private boolean autoDelete = false;//表示拓展参数:此处只是列出来,暂时不做实现private Map<String,Object> arguments = new HashMap<>();//先省略getter和setter,因为这里还有细节,后面会细说。
创建Binding类
java">
/*** 队列与交换机之间的关联关系*/
public class Binding {private String exchangeName;private String queueName;//bindingKey就是在出题,当发来一个消息的时候会附带一个routingKey,此时会验证routingKey是否和bindingKey符合//某种匹配规则,如果符合,就将这个消息加入到该消息队列当中。private String bindingKey;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}
创建消息 Message类
由于以后的消息要储存在文件当中,所以要进行序列化,因此这个Message要实现Serializable接口。
java">
/*** 表示一个要传递的消息*/
public class Message implements Serializable {//要传递消息的属性:private BasicProperties basicProperties = new BasicProperties();private byte[] body;//表示偏移量,由于我们要将消息储存在一个文件中,所以记忆好 begin和end 能找到这个消息的具体存在的位置[begin,end)private transient long offsetBeg;//transient表示不被序列化private transient long offsetEnd;//0x1表示有效 0x0表示无效private byte isVail = 0x1;//创建一个能自动生成message的工厂类:public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message = new Message();if(basicProperties != null) {message.setBasicProperties(basicProperties);}message.setMessageId("M+"+ UUID.randomUUID().toString());message.setRoutingKey(routingKey);message.setBody(body);//对于offsetBeg,offsetEnd,isVail,此时只是在内存中创建了一个对象,这些值会在之后的持久化操作中进行设置。return message;}public BasicProperties getBasicProperties() {return basicProperties;}public void setMessageId(String messageId){this.basicProperties.setMessageId(messageId);}public String getMessageId(){return this.basicProperties.getMessageId();}public void setRoutingKey(String routingKey){this.basicProperties.setRoutingKey(routingKey);}public String getRoutingKey(){return this.basicProperties.getRoutingKey();}public void setDeliverMode(int deliverMode){this.basicProperties.setDeliverMode(deliverMode);}public int getDeliverMode(){return this.basicProperties.getDeliverMode();}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public long getOffsetBeg() {return offsetBeg;}public void setOffsetBeg(long offsetBeg) {this.offsetBeg = offsetBeg;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd = offsetEnd;}public byte getIsVail() {return isVail;}public void setIsVail(byte isVail) {this.isVail = isVail;}
}
消息里的基本属性 BasicProperties 类
java">
public class BasicProperties implements Serializable {//使用String作为唯一身份标识,使用UUID 生成messageidprivate String messageId;//是一个消息 带有的内容,为了和bindingKey做匹配//如果当前交换机是direct,routingKey就是转发的队列名//如果当前交换机是fanout,不会使用routingKey,因为那样无意义。//如果当前交换机是topic,此时bindingKey就要和routingKey做匹配,只有符合要求才会转发给相应的队列。private String routingKey;//表示是否要持久化,不持久化 1, 持久化:2private int deliverMode = 1;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public int getDeliverMode() {return deliverMode;}public void setDeliverMode(int deliverMode) {this.deliverMode = deliverMode;}
}
总的来说,现在创建的类一共就这些:
六,数据库设计
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.
1,配置sqlite,引⼊ pom.xml 依赖
java"><dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>
简图如下:
2,配置数据源 application.yml (默认是 .properties后缀,可以直接重命名改成yml后缀,这是常用的方法,但是改完以后就要注意格式对齐了)
java">spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml
Username 和 password 空着即可.
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.
3,创建数据库表
准备工作:
创建MetaMapper类,和MetaMapper.xml
metaMapper.xml里的配置
java"><?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">
创建数据库表的方法实现
java">@Mapper
public interface MetaMapper {//创建 交换机数据库表void createExchangeTable();//创建 队列数据库表void createMESGQueueTable();//创建 绑定数据库表void createBindingTable();}
但是,对于数据库的操作,有四种 insert,delete,select,update
可是,没有创建create啊,所以我们将create语句写在update里。
创建对应xml
java"><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createMESGQueueTable">create table if not exists MESGQueue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));</update>
插入
java"> //对ExchangeTable进行插入操作void insertExchangeTable(Exchange exchange);//对MESGQueueTable进行插入操作void insertMESGQueueTable(MESGQueue mesgQueue);//对BindingTable进行插入操作void insertBindingTable(Binding binding);
插入对应 xml
java"> <insert id="insertExchangeTable" parameterType="com.example.mq.mqserver.core.Exchange">insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});</insert><insert id="insertMESGQueueTable" parameterType="com.example.mq.mqserver.core.MESGQueue">insert into MESGQueue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});</insert><insert id="insertBindingTable" parameterType="com.example.mq.mqserver.core.Binding">insert into binding values (#{exchangeName},#{queueName},#{bindingKey});</insert>
删除
java">//对ExchangeTable进行删除操作void deleteExchangeTable(String exchangeName);//对MESGQueueTable进行删除操作void deleteMESGQueueTable(String mesgQueueName);//对BindingTable进行删除操作void deleteBindingTable(Binding binding);
删除对应 xml
java"> <delete id="deleteExchangeTable">delete from exchange where name = #{exchangeName};</delete><delete id="deleteMESGQueueTable">delete from MESGQueue where name = #{mesgQueueName};</delete><delete id="deleteBindingTable" parameterType="com.example.mq.mqserver.core.Binding">delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};</delete>
查询
java">//查找所有的exchange交换机List<Exchange> selectExchangeTable();//查找所有的DESGQueue消息队列List<MESGQueue> selectMESGQueueTable();//查找所有的Binding绑定List<Binding> selectBindingTable();
查询对应 xml
java"><select id="selectExchangeTable" resultType="com.example.mq.mqserver.core.Exchange">select * from exchange;</select><select id="selectMESGQueueTable" resultType="com.example.mq.mqserver.core.MESGQueue">select * from MESGQueue;</select><select id="selectBindingTable" resultType="com.example.mq.mqserver.core.Binding">select * from binding;</select>