【Java】从0实现一个消息队列中间件

news/2024/11/14 15:03:18/

从0实现一个消息队列中间件

  • 什么是消息队列
  • 需求分析
    • 核心概念
    • 核心API
    • 交换机类型
    • 持久化
    • 网络通信
      • 网络通信API
    • 消息应答
  • 模块划分
  • 项目创建
  • 创建核心类
    • 创建Exchange
    • 创建MSGQueue
    • 创建Binding
    • 创建Message
  • 数据库设计
    • 配置sqlite
      • 实现创建表和数据库基本操作
  • 实现DataBaseManager
    • 创建DataBaseManager类
      • 初始化数据库
      • 实现checkDBExists
      • 实现createTable
      • 实现createDefaultData
      • 封装其他数据库操作
  • 消息存储设计
    • 文件格式
      • queue_data.txt 文件格式
      • queue_stat.txt文件格式
    • 创建MessageFileManager类
    • 实现统计文件读写
    • 实现创建队列目录
    • 实现删除队列目录
    • 检查队列文件是否存在
    • 实现消息对象序列化/反序列化
    • 实现写入消息文件
    • 实现删除消息
    • 实现消息加载
    • 实现垃圾回收
  • 整合数据库和文件
    • 创建DiskDataCenter
  • 内存数据结构设计
    • 创建MemoryDataCenter
  • 虚拟主机设计
    • 创建VirtualHost
  • 订阅消息
    • 添加一个订阅者
    • 创建订阅者管理类
    • 消息确认
  • 网络通信协议设计
    • 设计应用层协议
    • 定义Request/Response
    • 定义参数父类
    • 定义返回值父类
    • 定义其他参数类
      • ExchangeDeclareArguments
      • ExchangeDeleteArguments
      • QueueDeclareArguments
      • QueueDeleteArguments
      • QueueBindArguments
      • QueueUnbindArguments
      • BasicPublishArguments
      • BasicConsumeArguments
      • SubScribeReturns
  • 实现BrokerServer
    • 启动/停止服务器
    • 实现处理连接
      • 实现readRequest
      • 实现writeResponse
      • 实现处理请求
    • 实现clearClosedSessio
  • 实现客户端
    • 创建 ConnectionFactory
    • Connection 和Channel的定义
      • Connection的定义
        • 封装请求响应读写操作
        • 创建channel
      • Channel的定义
        • 创建channel
          • 实现generateRid
        • 实现waitResult
        • 关闭channel
        • 创建交换机
        • 删除交换机
        • 创建队列
        • 创建绑定
        • 删除绑定
        • 发送消息
        • 订阅消息
        • 确认消息
      • 处理响应
        • 创建扫描线程
        • 实现响应的分发
      • 关闭Connection

什么是消息队列

曾经我们学习过阻塞队列(BlockingQueue),我们说,阻塞队列最⼤的⽤途,就是⽤来实现⽣产者消费者模型.

⽣产者消费者模型,存在诸多好处,是后端开发的常⽤编程⽅式.

在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求.因此,我们通常会把阻塞队列,封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能.这样的程序我们就称为消息队列(MessageQueue,MQ)市⾯上成熟的消息队列⾮常多.

RabbitMQ Kafka RocketMQ ActiveMQ…

其中,RabbitMQ是⼀个⾮常知名,功能强⼤,⼴泛使⽤的消息队列.咱们就仿照RabbitMQ,模拟实现⼀个简单的消息队列

需求分析

核心概念

⽣产者(Producer) 消费者(Consumer) 中间⼈(Broker) 发布(Publish) 订阅(Subscribe

在这里插入图片描述
其中,Broker是最核⼼的部分.负责消息的存储和转发

在Broker中,⼜存在以下概念.

虚拟机(VirtualHost): 类似于MySQL的"database",是⼀个逻辑上的集合.⼀个BrokerServer上可以存在多个VirtualHost.
交换机(Exchange):⽣产者把消息先发送到Broker的Exchange上.再根据不同的规则,把消息转发 给不同的Queue.
队列(Queue):真正⽤来存储消息的部分.每个消费者决定⾃⼰从哪个Queue上读取消息.
绑定(Binding):Exchange和Queue之间的关联关系.Exchange和Queue可以理解成"多对多"关系.使⽤⼀个关联表就可以把这两个概念联系起来. 消息(Message):传递的内容

在这里插入图片描述
这些概念,既需要在内存中存储,也需要在硬盘上存储.

  • 内存存储:⽅便使⽤.
  • 硬盘存储:重启数据不丢失

核心API

对于Broker来说,要实现以下核⼼API.通过这些API来实现消息队列的基本功能.

  1. 创建队列(queueDeclare)
  2. 销毁队列(queueDelete)
  3. 创建交换机(exchangeDeclare)
  4. 销毁交换机(exchangeDelete)
  5. 创建绑定(queueBind)
  6. 解除绑定(queueUnbind)
  7. 发布消息(basicPublish)
  8. 订阅消息(basicConsume)
  9. 确认消息(basicAck)

另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型.

交换机类型

对于RabbitMQ来说,主要⽀持四种交换机类型.

Direct
Fanout
Topic
Header

其中Header这种⽅式⽐较复杂,⽐较少⻅.常⽤的是前三种交换机类型.咱们此处也主要实现这三种.

  1. Direct: ⽣产者发送消息时,直接指定被该交换机绑定的队列名.
  2. Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
  3. Topic: 绑定队列到交换机上时,指定⼀个字符串为bindingKey.发送消息指定⼀个字符串为routingKey. 当 routingKey 和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列

持久化

Exchange, Queue, Binding, Message 都有持久化需求.当程序重启/主机重启,保证上述内容不丢失

网络通信

⽣产者和消费者都是客⼾端程序,broker则是作为服务器.通过⽹络进⾏通信.在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作

网络通信API

  1. 创建Connection
  2. 关闭Connection
  3. 创建Channel
  4. 关闭Channel
  5. 创建队列(queueDeclare)
  6. 销毁队列(queueDelete)
  7. 创建交换机(exchangeDeclare)
  8. 销毁交换机(exchangeDelete)
  9. 创建绑定(queueBind)
  10. 解除绑定(queueUnbind)
  11. 发布消息(basicPublish)
  12. 订阅消息(basicConsume)
  13. 确认消息(basicAck)

可以看到,在broker的基础上,客⼾端还要增加Connection操作和Channel操作.

Connection 对应⼀个TCP连接. Channel 则是Connection中的逻辑通道.⼀个Connection中可以包含多个Channel. Channel 和Channel之间的数据是独⽴的.不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接.

消息应答

被消费的消息,需要进⾏应答

应答模式分成两种:

⾃动应答:消费者只要消费了消息,就算应答完毕了.Broker直接删除这个消息.
⼿动应答:消费者⼿动调⽤应答接⼝,Broker收到应答请求之后,才真正删除这个消息.

⼿动应答的⽬的,是为了保证消息确实被消费者处理成功了.在⼀些对于数据可靠性要求⾼的场景,⽐较常⻅

模块划分

在这里插入图片描述

项目创建

创建SpringBoot项⽬.
使⽤SpringBoot2系列版本,Java8.
依赖引⼊SpringWeb和MyBatis

创建核心类

创建包mqserver.core

创建Exchange

java">public class Exchange {//使用name作为交换机的唯一身份标识private String name;//交换机类型DIRECT FANOUT TOPICprivate ExchangeType type = ExchangeType.DIRECT;//该交换机是否需要持久化存储private boolean durable = false;//Todo: 如果当前交换机没人使用 就自动删除private boolean autoDelete = false;//Todo: 额外参数选项//为了将这个数据存储到数据库中 要转换为json字符串存储private Map<String , Object> arguments  = new HashMap<>();public String getArguments(){//把当前arguments参数从map转化为json字符串ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}//如果异常 返回空json字符串return "{}";}public void setArguments(String argumentsJson){//吧argumentsJson转化为map对象ObjectMapper objectMapper = new ObjectMapper();try {objectMapper.readValue(argumentsJson,new TypeReference<HashMap<String,Object>>(){});} catch (JsonProcessingException e) {e.printStackTrace();}}//针对arguments 再提供一组getter setter方法 在代码内部测试使用public Object getArguments(String key){return arguments.get(key);}public void setArguments(String key, Object value){arguments.put(key,value);}public void setArguments(Map<String,Object> arguments){this.arguments = arguments;}}

交换机类型作为枚举类

java">public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type = type;}public int getType(){return type;}}

name :交换机的名字.相当于交换机的⾝份标识.
type :交换机的类型.三种取值,DIRECT,FANOUT,TOPIC.
durable :交换机是否要持久化存储.true为持久化,false不持久化.
autoDelete:使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
arguments :交换机的其他参数属性.预留字段,暂时未使⽤

创建MSGQueue

java">@Data
public class MSGQueue {//队列的唯一身份标识private String name;//表示队列是否持久化private boolean durable;//Todo: 表示是否独占//true 表示只能为一个消费者使用 false 表示为大家都能使用private boolean exclusive;//todo:自动删除private boolean autoDelete = false;//todo:参数列表private Map<String, Object> arguments = new HashMap<>();//表示当前队列都有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//记录当前取到了第几个消费者 方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);//添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}//暂时不考虑订阅者删除//挑选一个订阅者 来处理当前的消息public ConsumerEnv chooseConsumerEnv() {//轮询的方式取if (consumerEnvList.size() == 0) {//无人订阅return null;}//计算当前下标int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";}public void setArguments(String argumentsJson) {ObjectMapper objectMapper = new ObjectMapper();try {objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}}

name :队列的名字.相当于队列的⾝份标识.
durable :交换机是否要持久化存储.true为持久化,false不持久化.
exclusive :独占(排他),队列只能被⼀个消费者使⽤.
autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
arguments :交换机的其他参数属性.预留字段,暂时未使⽤

创建Binding

java">@Data
public class Binding {//交换机名private String exchangeName;//队列名private String queueName;//绑定关键字private String bindingKey;
}

exchangeName 交换机名字
queueName 队列名字
bindingKey 只在交换机类型为 TOPIC 时才有效.⽤于和消息中的 routingKey 进⾏匹配

创建Message

java">{private BasicProperties basicProperties = new BasicProperties();private byte[] body;//辅助属性//这两个属性不需要序列化//一个文件存储很多消息 找到某个消息使用下面两个偏移量来找到消息 前闭后开[)//文件开头到消息数据的位置偏移private transient long offsetBeg = 0;//文件结尾到消息数据的位置偏移private transient long offsetEnd = 0;//使用这个属性表示该消息在文件中是否为有效消息(逻辑删除)//0x1有效 0x0无效private byte isValid = 0x1;//创建工厂方法 让工厂方法帮我们封装一下message对象的过程//万一两个参数冲突 一外面的为主public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){Message message = new Message();if(basicProperties != null){message.setBasicProperties(basicProperties);}//M-为前缀 和其他uuid做区分message.setMessageId("M-" + UUID.randomUUID().toString());message.basicProperties.setRoutingKey(routingKey);message.body = body;//此处是吧body和basicPro设置出来 其他属性暂时不设置return message;}//获取messageIdpublic String getMessageId(){return basicProperties.getMessageId();}//设置messageIdpublic void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int mode){basicProperties.setDeliverMode(mode);}
}

BasicProperties参数类

java">@Data
public class BasicProperties implements Serializable {//消息的唯一身份标识 保证唯一性 使用UUID来创建private String messageId;//消息上带有的 和bindingKey做匹配private String routingKey;// 表示消息是否要持久化 1表示不持久化 2表示持久化private int deliverMode = 1;
}

Message 需要实现 Serializable 接⼝.后续需要把Message写⼊⽂件以及进⾏⽹络传输.
basicProperties 是消息的属性信息. body 是消息体.
offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置.这⼀块具体的 设计后⾯再详细介绍.使⽤transient 关键字避免属性被序列化.
isValid ⽤来表⽰消息在⽂件中是否有效.这⼀块具体的设计后⾯再详细介绍.
createMessageWithId 相当于⼀个⼯⼚⽅法,⽤来创建⼀个Message实例.messageId通过 UUID的⽅式⽣成.

数据库设计

对于Exchange,MSGQueue,Binding,我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是SQLite,是⼀个更轻量的数据库. SQLite 只是⼀个动态库(当然,官⽅也提供了可执⾏程序exe),我们在Java中直接引⼊SQLite依赖,即可直接使⽤,不必安装其他的软件

配置sqlite

引入依赖

 <dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency>

配置数据源

 spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBCmybatis:mapper-locations: classpath:mapper/**Mapper.xml

此处我们约定,把数据库⽂件放到./data/meta.db 中. SQLite 只是把数据单纯的存储到⼀个⽂件中.⾮常简单⽅便

实现创建表和数据库基本操作

java">@Mapper
public interface MetaMapper {//提供三个核心建表方法void createExchangeTable();void createQueueTable();void createBindingTable();//针对上面三个基本概念进行插入删除void insertExchange(Exchange exchange);void deleteExchange(String exchange);void insertQueue(MSGQueue queue);void deleteQueue(String queueName);void insertBinding(Binding binding);void deleteBinding(Binding binding);List<Exchange> selectAllExchanges();List<MSGQueue> selectAllQueues();List<Binding> selectAllBindings();}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper"><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createQueueTable">create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));</update>

实现DataBaseManager

管理数据库的

mqserver.datacenter.DataBaseManage

创建DataBaseManager类

通过这个类来封装针对数据库的操作

初始化数据库

java">public void init() {//手动获取到metaMappermetaMapper = MqApplication.context.getBean(MetaMapper.class);//建库建表 插入一些默认数据//如果数据库已经存在 不做任何操作 , 如果数据库不存在 则创建库创建表 构造默认数据//根据meta.db文件是否存在来做判断if (!checkDBExists()) {//数据库不存在 就进行建库建表操作//先创建一个data目录File dataDir = new File("./data");dataDir.mkdirs();//不存在 创建表createTable();createDefaultData();log.info("[DataBaseManager] 数据库初始化完成");} else {//数据库已经存在log.info("[DataBaseManager] 数据库已经存在");}}

针对MqApplication,需要新增⼀个context属性.并初始化.

java">@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

实现checkDBExists

java">private boolean checkDBExists() {//判断meta.db文件是否存在File file = new File("./data/meta.db");return file.exists();}

实现createTable

java">//建表操作 不需要建库//首次执行数据库造作 就会自动创建meta.db文件(Mybatis执行)private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();log.info("[DataBaseManager] 创建表完成");}

实现createDefaultData

java">//创建默认数据//主要是添加默认交换机private void createDefaultData() {//构造默认交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info("[DataBaseManager] 创建默认数据完成");}

封装其他数据库操作

java">//封装其他数据库的操作public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}public List<MSGQueue> selectAllQueues(){return metaMapper.selectAllQueues();}public List<Binding> selectAllBindings(){return metaMapper.selectAllBindings();}

消息存储设计

  • 设计思路:

消息需要在硬盘上存储.但是并不直接放到数据库中,⽽是直接使⽤⽂件存储

原因如下:

  1. 对于消息的操作并不需要复杂的增删改查.
  2. 对于⽂件的操作效率⽐数据库会⾼很多

我们给每个队列分配⼀个⽬录.⽬录的名字为data+队列名.形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件.

  • queue_data.txt 消息数据文件 用来保存消息内容
  • queue_stat.txt消息统计文件 用来保存消息统计信息

文件格式

queue_data.txt 文件格式

使用二进制方式存储 每个消息分为四个部分:

  • 前四个字节表示Message对象的长度
  • 后面若干字节 表示Message 内容
  • 消息和消息之间首尾相连

每个Message基于Java标准库的 ObjectInputStream/ObjectOutputStream序列化
在这里插入图片描述

Message 对象中的offsetBeg和offsetEnd正是⽤来描述每个消息体所在的位置.

queue_stat.txt文件格式

使用文本方式存储 文件中只包含一行 里面包含两列 使用\t分割
第一列表示当前的总消息数目 第二列表示有效消息数目
形如

2000\t1500

创建MessageFileManager类

创建 mqserver.database.MessageFileManager

java">public class MessageFileManager {//定义内部类表示该队列的统计信息static public class Stat {public int totalCount;//总数量public int validCount;//有效消息数量}public void init(){//暂时不需要初始化}//约定消息文件所在的目录和文件名//获取指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}//这个方法用来获取该队列的消息数据文件路径private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}//这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}

包含一个内部类stat 用来表示消息统计文件的内容

实现统计文件读写

java">private Stat readStat(String queueName) {Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}//写统计文件private void writeStat(String queueName, Stat stat) {//使用printWrite//outputstream打开文件默认情况下啊 会把源文件清空 此时相当于旧文件覆盖新文件try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}

实现创建队列目录

每个队列都有⾃⼰的⽬录和配套的⽂件.通过下列⽅法把⽬录和⽂件先准备好.

java">    public void createQueueFiles(String queueName) throws IOException {// 1.创建⽬录指定队列的⽬录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException(" 创建⽬录失败 ! baseDir=" + baseDir.getAbsolutePath());}}// 2.创建队列数据⽂件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException(" 创建⽂件失败 ! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3.创建队列统计⽂件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException(" 创建⽂件失败 ! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4.给队列统计⽂件写⼊初始数据Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);}

把上述约定的⽂件都创建出来,并对消息统计⽂件进⾏初始化

实现删除队列目录

如果队列需要删除,则队列对应的⽬录/⽂件也需要删除

java">//删除队列的目录和文件//队列也是可以删除的 当队列删除之后 对应的消息文件也会被删除public void destroyQueueFiles(String queueName) throws IOException {//先删除里面的文件 在删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {//其中有任意一个失败 都算整体删除失败throw new IOException("删除队列目录和文件失败 baseDir=" + baseDir.getAbsolutePath());}}

注意:File类的delete⽅法只能删除空⽬录.因此需要先把内部的⽂件先删除掉.

检查队列文件是否存在

判定该队列的消息⽂件和统计⽂件是否存在.⼀旦出现缺失,则不能进⾏后续⼯作

java">//检查队列的目录和文件是否存在public boolean checkFilesExists(String queueName) {//判定队列的数据文件 和统计文件是否都存在File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}

实现消息对象序列化/反序列化

Message对象需要转成二进制写入文件,并且也需要吧文件中的二进制读出来解析成Message对象 此处针对这里的逻辑进行封装

创建common.BinaryTool包

java">public class BinaryTool {//将一个对象 序列化成一个数组public static byte[] toBytes(Object object) throws IOException {//这个流对象相当于一个变长的字节数组//就可以把object序列化的数据给逐渐的写入到byteArrayOutputStream中 在同一转化成byte[]try( ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){//此处的writeObject就会把该对象进行序列化 生成的二进制数据就会写入到ObjectOutputStream中//由于objectOutputStream关联到byteArrayOutputStream 所以最终结果写入到byteArrayOutputStreamobjectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}//将一个字节数字 反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){//从data这个byte[]数组中获取数据并进行反序列化object = objectInputStream.readObject();}}return object;}
}
  • 使⽤ByteArrayInputStream/ByteArrayOutputStream针对byte[]进⾏封装,⽅便后续操作.(这两个流对象是纯内存的,不需要进⾏close).
  • 使⽤ObjectInputStream/ObjectOutputStream进⾏序列化/反序列化操作.通过内部的readObject/writeObject即可完成对应操作.
  • 此处涉及到的序列化对象,需要实现Serializable接⼝.这⼀点咱们的Message对象已经实现过了.

实现写入消息文件

java">//把一个新的消息 放到队列对应的文件中public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {//检查档期那要写入的队列对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应文件不存在 queueName = " + queue.getName());}//把message对象进行序列化 转化成二进制字节数组byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {//获取到当前队列数据文件的长度 用来计算message的 offsetBeg和offsetEnd//offsetEnd 就是当前文件长度 + 4 + message自身长度File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);//写消息到文件中 追加到数据文件末尾 而不是覆盖try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {//接下来先写当前消息的长度//这个操作就是写入四个字节了dataOutputStream.writeInt(messageBinary.length);//写入消息本体dataOutputStream.write(messageBinary);}}//更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
  • 考虑线程安全 按照队列维度进行加锁
  • 需要记录Message对象在⽂件中的偏移量.后续的删除操作依赖这个偏移量定位到消息.offsetBeg是原有⽂件⼤⼩的基础上,再+4.4个字节是存放消息⼤⼩的空间.(参考上⾯的图)
  • 写完消息,要同时更新统计信息

创建common.MqException ,作为⾃定义异常类.后续业务上出现问题,都统⼀抛出这个异常..

java">/*** 自定义异常类*/
public class MqException extends Exception{public MqException(String reason) {super(reason);}
}

实现删除消息

此处删除消息只是逻辑删除 ",即把Message类中的isValid字段设置为0

java">    //删除消息//逻辑删除 将isValid属性设置成0//先把文件中的数据还原为message对象 isValid改成0 在重新写回去//此处message必须包含有效的offsetBeg 和OffsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {//随机访问 在文件的指定位置读写//seek方法移动光标synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {//先从文件中读取相应的message数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);//把二进制数据转化为Message对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);//设置isValiddiskMessage.setIsValid((byte) 0x0);//重新写入byte[] bufferDest = BinaryTool.toBytes(diskMessage);//此时重新 seek 虽然上面已经seek过 但是进行了读操作 导致文件光标向后移动到下一个消息的位置//因此需要重新seek 调整光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}//更新统计文件Stat stat = readStat(queue.getName());if ((stat.validCount > 0)) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}

实现消息加载

把消息内容从⽂件加载到内存中.这个功能在服务器重启,和垃圾回收的时候都很关键

java">public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {//模拟光标位置long currentOffset = 0;//循环读取消息while (true) {//读取当前消息//这里可能会读到文件末尾 readInt方法到达末尾会抛出EOFException异常int messageSize = dataInputStream.readInt();//根据消息长度 读取消息byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {throw new MqException("[MessageFileManager] 文件格式错误 queueName = " + queueName);}//把读到的二进制数据 反序列化为Message对象Message message = (Message) BinaryTool.fromBytes(buffer);//判断这个小时是否为无效对象if (message.getIsValid() != 0x1) {//无效数据直接跳过currentOffset += (4 + messageSize);continue;}//有效数据 则需要把这个数据加入到链表中 加入前要填写offsetBeg offsetEnd//进行计算时 需要知道当前光标的位置//此时手动计算下标message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);//添加到链表中messages.add(message);}}catch (EOFException e){//此时不是真正处理异常 而是处理正常业务逻辑System.out.println("[MessageFileManager] 恢复Message数据完成");}}return messages;}

实现垃圾回收

上述删除操作,只是把消息在⽂件上标记成了⽆效.并没有腾出硬盘空间.最终⽂件⼤⼩可能会越积越多.因此需要定期的进⾏批量清除.此处使⽤类似于复制算法.当总消息数超过2000,并且有效消息数⽬少于50%的时候,就触发GC. GC的时候会把所有有效消息加载出来,写⼊到⼀个新的消息⽂件中,使⽤新⽂件,代替旧⽂件即可.

java">//检查当前是否要针对该队列的消息数据文件进行GCpublic boolean checkGC(String queueName){//判定是否要GC 是根据总消息数和有效消息数 这两个值 是在消息统计文件中的Stat stat = readStat(queueName);if(stat.totalCount > 2000 && (double)stat.validCount /(double) stat.totalCount < 0.5){return true;}return false;}

gc

java">//垃圾回收操作//使用复制算法//创建新文件 名字为queue_data_new.txt//把之前的消息数据文件中有效的消息读取出来 写到新文件中//删除旧的消息 把文件名改回queue_data.txt//同时要记得更新消息统计文件public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {//进行gc时 其他线程不能对文件进行修改synchronized (queue){//由于gc操作比较耗时 此处统计消耗时间long gcBeg = System.currentTimeMillis();//创建一个新文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){//正常情况下 这个文件不应该存在 说明上一次gc一半 程序意外结束throw  new MqException("[MessageFileManager] gc 时发现该队列的queue_data_new 已经存在");}boolean ok = queueDataNewFile.createNewFile();if(!ok){throw new MqException("[MessageFileManager] 创建文件失败 queueDataNewFile =" + queueDataNewFile.getAbsolutePath() );}//从旧文件中获取出所有有效消息LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());//把有效消息 写入到新文件try(OutputStream outputStream = new FileOutputStream(queueDataNewFile,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message message : messages){byte[] buffer = BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//删除旧文件 把新文件重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if(!ok){throw new MqException("[MessageFileManager] 删除旧数据失败 queueDataOldFile" + queueDataOldFile.getAbsolutePath());}//重命名文件ok = queueDataNewFile.renameTo(queueDataOldFile);if(!ok){throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile" + queueDataNewFile.getAbsolutePath() +", queueDataOldFile = " + queueDataOldFile.getAbsolutePath());}//更新统计文件Stat stat = new Stat();stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕 queueName = " + queue.getName() +"time: " + (gcEnd - gcBeg) + " ms");}}

整合数据库和文件

上述代码中,使⽤数据库存储了Exchange,Queue,Binding,使⽤⽂本⽂件存储了Message.接下来我们把两个部分整合起来,统⼀进⾏管理

创建DiskDataCenter

使⽤DiskDataCenter来综合管理数据库和⽂本⽂件的内容.DiskDataCenter 会持有DataBaseManager和MessageFileManager对象

java">public class DiskDataCenter {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();public void init(){//对上面两个类进行初始化dataBaseManager.init();messageFileManager.init();}//封装交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//封装队列操作public void insertQueue(MSGQueue queue) throws IOException {//创建队列的同时 还要创建对应的目录dataBaseManager.insertQueue(queue);messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {//删除队列也要删除对应的目录dataBaseManager.deleteQueue(queueName);messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueues();}//封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}//封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}

⼩结

通过上述封装,把数据库和硬盘⽂件两部分合并成⼀个整体.上层代码在调⽤的时候则不再关⼼该数据 是存储在哪个部分的.
这个类的整体实现并不复杂,关键逻辑在之前都已经准备好了

内存数据结构设计

硬盘上存储数据,只是为了实现"持久化"这样的效果.但是实际的消息存储/转发,还是主要靠内存的结构.
对于MQ来说,内存部分是更关键的,内存速度更快,可以达成更⾼的并发

创建MemoryDataCenter

创建mqserver.datacenter.MemoryDataCenter
使用这个类来管理所有内存中的数据

  • 使⽤四个哈希表,管理Exchange,Queue,Binding,Message.
  • 使⽤⼀个哈希表+链表管理队列->消息之间的关系.
  • 使⽤⼀个哈希表+哈希表管理所有的未被确认的消息.

为了保证消息被正确消费了,会使⽤两种⽅式进⾏确认.⾃动ACK和⼿动ACK.
其中⾃动ACK是指当消息被消费之后,就会⽴即被销毁释放.
其中⼿动ACK是指当消息被消费之后,由消费者主动调⽤⼀个basicAck⽅法,进⾏主动确认.服务器
收到这个确认之后,才能真正销毁消息.
此处的"未确认消息"就是指在⼿动ACK模式下,该消息还没有被调⽤basicAck.此时消息不能删除,
但是要和其他未消费的消息区分开.于是另搞了个结构.
当后续basicAck到了,就可以删除消息了

java">/*** 使用这个类来统一管理内存中的所有数据*/
public class MemoryDataCenter {//key是exchangeName, value是exchange对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//key是queueName value是MSGQueue对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();//key是exchangeName 第二个key是queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//key是messageId value是一个Message对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();//key是queueName value是一个Message的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功 exchangeName = " + exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功 exchangeName = " + exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功 queueName = " + queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter]  队列删除成功 queueName = " + queueName);}public void insertBinding(Binding binding) throws MqException {
//        //先使用exchangeName来查找哈希表是否有存在
//        ConcurrentHashMap<String, Binding> bindingMap = this.bindingsMap.get(bingding.getExchangeName());
//        if (bindingMap == null){
//            bindingsMap.put(binding.getExchangeName(),bindingMap);
//        }//现根据exchangeName查一下 对应的哈希表是否存在 不存在就创建一个ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindingMap) {//再根据queueName查找一下 如果已经存在 就抛出异常 不存在才能插入if (bindingsMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在 exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功 exchangeName = " + binding.getExchangeName() +",queueName" + binding.getQueueName());}//获取绑定//1. 根据exchangeName和queueName确定唯一一个Binding//2. 根据exchangeName获取到所有的bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}//删除绑定public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null) {//该交换机没有绑定任何队列throw new MqException("[MemoryDataCenter] 绑定不存在 exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 删除绑定成功 exchangeName = " + binding.getExchangeName() +",queueName" + binding.getQueueName());}//添加消息public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功 messageId = " + message.getMessageId());}//根据id查消息public Message getMessage(String messageId) {return messageMap.get(messageId);}//根据id删消息public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除 messageId = " + messageId);}//发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {//先把消息放在指定数据结构中//现根据队列的名称 找到该队列对应的消息链表
//        LinkedList<Message> messages = queueMessageMap.get(queue.getName());
//        if (messages == null) {
//            messages = new LinkedList<>();
//            queueMessageMap.put(queue.getName(), messages);
//        }LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());//把新消息添加到message中synchronized (messages){messages.add(message);}//在这里把该消息向消息中心插入addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中 messageId = " + message.getMessageId());}//从队列中取消息public Message pollMessage(String queueName){//根据队列名 查找对应的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);//如果没找到 则说明队列中没有任何消息if(messages == null){return null;}synchronized (messages){if(messages.size() == 0){return null;}//链表中有元素 就进行头删 取出该元素Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出 messageId = " + currentMessage.getMessageId());return currentMessage;}}//获取指定队列中消息的个数public int getMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null){//队列中无消息return 0;}synchronized (messages){return messages.size();}}//未确认消息的操作//添加未确认的消息public void addMessageWaitAck(String queueName, Message message){ConcurrentMap<String,Message> messageMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 消息进入到待确认队列 messageId=" + message.getMessageId());}//删除未确认的消息public void removeMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String, Message> messageMap = queueMessageWaitAckMap.get(queueName);if(messageMap == null){return;}messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除 messageId=" + messageId);}//获取指定未确认的消息public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String, Message> messageMap = queueMessageWaitAckMap.get(queueName);if(messageMap == null){return null;}return messageMap.get(messageId);}//当服务器重启后 要从硬盘上读取数据 回复到内存中public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {//清空所有旧数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();//恢复所有交换机List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(),exchange);}//恢复所有队列List<MSGQueue> queueList = diskDataCenter.selectAllQueues();for(MSGQueue queue : queueList){queueMap.put(queue.getName(),queue);}//恢复所有绑定List<Binding> bindingList = diskDataCenter.selectAllBindings();for(Binding binding : bindingList){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(),binding);}//恢复所有消息//遍历所有的队列 根据每个队列的名字 获取到所有的信息for(MSGQueue queue : queueList){LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());//将消息加载到内存queueMessageMap.put(queue.getName(),messages);for(Message message : messages){messageMap.put(message.getMessageId(),message);}}//未确认的消息 不需要从硬盘获取//在等待ack的过程中 未被确认的消息就转变为未被取走的消息}
}

虚拟主机设计

⾄此,内存和硬盘的数据都已经组织完成.接下来使⽤"虚拟主机"这个概念,把这两部分的数据也串起来.并且实现⼀些MQ的关键API

注意:在RabbitMQ中,虚拟主机是可以随意创建/删除的.咱们此处为了实现简单,并没有实现虚拟主机的管理.因此我们默认就只有⼀个虚拟主机的存在.但是在数据结构的设计上我们预留了对于多虚拟主机的管理.保证不同虚拟主机中的Exchange,Queue,Binding,Message都是相互隔离的

创建VirtualHost

创建mqserver.VirtualHost

java">@Slf4j
@Getter
//通过这个类表示虚拟主机
//作为业务逻辑的整合 就需要对代码中抛出的异常进行处理了
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataCenter diskDataCenter = new DiskDataCenter();private Router router = new Router();private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;//对于memoryDataCenter不需要初始化 在类内部已经初始化过了//对于diskDataCenter来说 要进行初始化操作diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();log.info("恢复内存数据失败");}}//创建交换机//如果交换机不存在 则创建 如果存在 直接返回//返回值是boolean 创建成功true 失败 返回falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) {//把交换机的名字 加上虚拟主机作为前缀exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange != null) {//该交换机已经存在log.info("交换机已经存在 exchangeName = " + exchangeName);return true;}//真正创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);//把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}//4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);log.info("交换机创建完成 exchangeName = " + exchangeName);//先硬盘 后内存 因为内存容易失败 如果硬盘失败 就不向内存中存储}return true;} catch (Exception e) {log.info("交换机创建失败 exchangeName = " + exchangeName);e.printStackTrace();return false;}}public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {//先找到对应的交换机Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if (toDelete == null) {throw new MqException("交换机不存在 无法删除");}//删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}//删除内存中的交换机数据memoryDataCenter.deleteExchange(exchangeName);log.info("交换机删除成功 exchangeName = " + exchangeName);}return true;} catch (Exception e) {log.info("交换机删除失败 exchangeName = " + exchangeName);e.printStackTrace();return false;}}//创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {//拼接队列名字queueName = virtualHostName + queueName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if (existsQueue != null) {log.info("队列已经存在 queueName = " + queueName);return true;}//创建队列对象MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//写硬盘if (durable) {diskDataCenter.insertQueue(queue);}//写内存memoryDataCenter.insertQueue(queue);log.info("队列创建成功 queueName = " + queueName);}return true;} catch (Exception e) {log.info("队列创建失败 queueName = " + queueName);e.printStackTrace();return false;}}//删除队列public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;try {MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("队列不存在 无法删除 queueName = " + queueName);}//删除硬盘数据if (queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);log.info("交换机删除成功 exchangeName = " + queueName);return true;} catch (Exception e) {log.info("队列创建失败 + queueName = " + queueName);e.printStackTrace();return false;}}//绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding != null) {throw new MqException("binding 已经存在 queueName = " + queueName);}//验证bindingkey是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException("bindingKey非法 bindingKey = " + bindingKey);}//创建Binding对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//获取对应的交换机和队列 如果交换机或者队列不存在 也无法创建MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("队列不存在 queueName = " + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在 exchangeName = " + exchangeName);}//先写硬盘if (queue.isDurable() && exchange.isDurable()) {diskDataCenter.insertBinding(binding);}//在写内存memoryDataCenter.insertBinding(binding);log.info("绑定成功 exchangeName = " + exchangeName + "queueName + " + queueName);}}return true;} catch (Exception e) {log.info("绑定失败 exchangeName = " + exchangeName + "queueName + " + queueName);e.printStackTrace();return false;}}public boolean queueUnbind(String queueName, String exchangeName) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {//获取binding看是否已经存在Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("删除绑定失败 绑定不存在 exchangeName" + exchangeName + ", queueName" + queueName);}
//            //获取一下对应的队列和交换机 看是否存在
//            MSGQueue queue = memoryDataCenter.getQueue(queueName);
//            if(queue == null){
//                throw new MqException("对应的队列不存在 queueName = " + queueName);
//            }
//            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
//            if(exchange == null){
//                throw new MqException("对应的交换机不存在 exchangeName" + exchangeName);
//            }//删除硬盘上的数据
//            if(queue.isDurable() && exchange.isDurable()){
//                diskDataCenter.deleteBinding(binding);
//            }//无论绑定是否持久化 都进行删除diskDataCenter.deleteBinding(binding);//删除内存的数据memoryDataCenter.deleteBinding(binding);}}return true;} catch (Exception e) {log.info("删除绑定失败");e.printStackTrace();return false;}}//发送交换机到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {//转换交换机的名字exchangeName = virtualHostName + exchangeName;//检查routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法 routingKey = " + routingKey);}//查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在 exchangeName = " + exchangeName);}//判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {//直接交换机的转发规则//以routingKey作为队列的名字 直接把消息写入指定的队列中//此时可以无视绑定关系String queueName = virtualHostName + routingKey;//构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);//查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new Exception("[VirualHost] 队列不存在 queueName = " + queueName);}//队列存在 直接给队列中写入消息sendMessage(queue, message);} else {//以fanout和topic的方式转发//找到该交换机关联的所有绑定 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {//获取到绑定对象Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {//此处不跑出异常 可能有多个这样的队列//希望不要因为一个队列的失败 影响其他队列消息的传输log.info("basicPublish 发送消息时 发现消息不存在 queueName = " + binding.getQueueName());continue;}//构造消息对象Message message = Message.createMessageWithId(exchangeName, basicProperties, body);//判断这个消息是否能转发给该队列//如果是fanout 则所有绑定的队列都要转发//如果是topic 还要判定下 bindkey和routingKey是否匹配if (!router.route(exchange.getType(), binding, message)) {continue;}//真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {log.info("消息发送失败");e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息 就是把消息写入硬盘和内存中int deliverMode = message.getDeliverMode();//deliverMode 为1 不持久化 为2 不持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}memoryDataCenter.sendMessage(queue, message);//通知消费者可以消费消息了consumerManager.notifyConsumer(queue.getName());}//添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsumer成功 queueName :{}", queueName);return true;} catch (Exception e) {log.info("basicConsumer 失败 queueName :{}", queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {//获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 要确认的消息不存在 messageId = " +messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[virtualHost] 要确认的队列不存在 queueName = " + queueName);}//删除硬盘上的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck成功 消息被成功确认 queueName:{},messageId:{}",queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info("basicAck失败 消息确认失败 queueName:{},messageId:{}",queueName,messageId);return false;}}
}

路由规则

java">//实现交换机的转发规则 和验证routingKey是否合法
public class Router {//bindingKey构造规则//数字字母下划线//使用.分割//允许*和#public boolean checkBindingKey(String bindingKey) {//todoif (bindingKey.length() == 0) {//空字符串 也是合法情况return true;}//检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}//检查*或者#是否是独立的部分String[] words = bindingKey.split("\\.");for (String word : words) {//检查word长度 >1 且包含#或*就不合法if (word.length() > 1 && word.contains("*") || word.contains("#")) {return false;}}//约定通配符之间的相邻关系//形如这种 aaa.#.#.bbb aaa.*.#.bbb aaa.#.*.bbb非法//aaa.*.*.bbb合法for (int i = 0; i < words.length - 1; i++) {//是否为连续两个#if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}//#连着*if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}//*连着#if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}//数字字母下划线//使用.分割public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {//空字符串 合法 在使用fanout交换机时 routingKey用不上return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);//判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}//判断该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}//判断该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}//判定是否是_或者.if (ch == '_' || ch == '.') {continue;}//该字符不满足任何一种情况 就返回falsereturn false;}return true;}//这个方法用来判定该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {//TODO//根据不同的exchangeType使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {//所有都转发 直接返回true;return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding, message);} else {//不应该存在这种情况throw new MqException("[Router] 交换机类型非法 exchangeType = " + exchangeType);}}private boolean routeTopic(Binding binding, Message message) {//先把这两个key进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;//此处使用whilewhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {//如果遇到* 直接进入下一轮 *可以匹配到任意一个部分if (bindingTokens[bindingIndex].equals("*")) {bindingIndex++;routingIndex++;continue;} else if(bindingTokens[bindingIndex].equals("#")){//如果遇到#号 要先看看有没有下一个位置bindingIndex++;if(bindingIndex == bindingTokens.length){//该#后面没有东西 是最后一个字符return true;}//后面还有东西//拿着这个内容 在routingKey中往后找 找到对应的位置//findNextMatch 这个方法用来查找该部分 在routingKey中的位置 返回该下标 没找到返回-1routingIndex = findNextMatch(routingTokens,routingIndex,bindingTokens[bindingIndex]);if(routingIndex == -1){return false;}//找到的匹配的情况 就继续往后匹配bindingIndex++;routingIndex++;}else {//如果遇到普通字符串if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}//判定是否是双方到达末尾//比如 aaa.bbb.ccc和aaa.bbb是要匹配失败的if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}}

订阅消息

添加一个订阅者

java">/添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsumer成功 queueName :{}", queueName);return true;} catch (Exception e) {log.info("basicConsumer 失败 queueName :{}", queueName);e.printStackTrace();return false;}}

Consumer相当于一个回调函数 放在common.Consumer

java">@FunctionalInterface
public interface Consumer {//每次服务器收到消息 调用 把消息推送给消费者//此处参考rabbitMqvoid handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}

创建订阅者管理类

创建mqserver.core.ConsumerManager

java">//通过这个类来实现消费者消费消息的逻辑
public class ConsumerManager {//持有virualhost来操作数据private VirtualHost parent;//指定一个线程池 执行具体的回调函数private ExecutorService workPool = Executors.newFixedThreadPool(4);//存放令牌的队列 实际上就是队列名 为了让线程池知道是哪一个队列的消息private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();//扫描线程private Thread scannerThread = null;public ConsumerManager(VirtualHost parent) {this.parent = parent;scannerThread = new Thread(()->{while (true){try {//1.拿到令牌String queueName = tokenQueue.take();//2.根据令牌 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 取令牌后发现 该队列名不存在 queueName = "+ queueName);}//3.从这个队列中消费一个消息synchronized (queue){consumerMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});//设置为后台线程scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsumer(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 队列不存在 queueName = " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);//如果此时队列中已经有一些消息了 就需要立即消费int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {//这个方法调用一次就消费一条消息consumerMessage(queue);}}}private void consumerMessage(MSGQueue queue) {//消费消息//1.按照轮询的方式 找到消费者ConsumerEnv luckyDog = queue.chooseConsumerEnv();if(luckyDog == null){//当前对象没有消费者 暂时不需要 等有消费者再说return;}//从队列中取出一条消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){//当队列中还没有消息,也不需要消费return;}/*** 为了达成消息不丢失这样的效果* 1. 在真正执行回调之前 先把这个消息放在待确认集合中 避免因为回调失败 导致的消息丢失* 2. 真正执行回调* 3. 如消费者采用的是autoAck = true 默认回调函数执行结束之后不抛出异常 就算消费成功 然后就可以删除消息*    硬盘 内存消息中心的哈希表 上面的待确认消息集合* 4. 当前消费者采取的是autoAck = false 手动应答 需要消费者自己在自己的回调函数内部 调用basicAck这个API**///把消息带入到消费者的回调方法中 去给线程池执行parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);//真正执行回调workPool.submit(()->{try{luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());log.info("消息成功消费 queueName = " + queue.getName());//如果是自动应答 就可以直接把消息删除了if(luckyDog.isAutoAck()){//删硬盘if(message.getDeliverMode() == 2){parent.getDiskDataCenter().deleteMessage(queue,message);}//删待确认集合parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());//删除内存中的消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());log.info("消息成功消费 queueName :{}" ,queue.getName());}//如果是手动应答 先不处理 等消费者调用basicAck处理}catch (Exception e){e.printStackTrace();}});}
}
  • parent ⽤来记录虚拟主机.
  • 使⽤⼀个阻塞队列⽤来触发消息消费.称为令牌队列.每次有消息过来了,都往队列中放⼀个令牌(也就是队列名),然后消费者再去消费对应队列的消息.
  • 使⽤⼀个线程池⽤来执⾏消息回调

这样令牌队列的设定避免搞出来太多线程.否则就需要给每个队列都安排⼀个单独的线程了,如果队列很多则开销就⽐较⼤了

消息确认

java">//只有在手动应答时 才调用 应答成功 删除这条消息public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {//获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 要确认的消息不存在 messageId = " +messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[virtualHost] 要确认的队列不存在 queueName = " + queueName);}//删除硬盘上的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck成功 消息被成功确认 queueName:{},messageId:{}",queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info("basicAck失败 消息确认失败 queueName:{},messageId:{}",queueName,messageId);return false;}}

网络通信协议设计

⽣产者和消费者都是客⼾端,都需要通过⽹络和BrokerServer进⾏通信.

此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服务器这边功能的远程调⽤

要调⽤的功能有:

  • 创建channel
  • 关闭channel
  • 创建exchange
  • 删除exchange
  • 创建queue
  • 删除queue
  • 创建binding
  • 删除binding
  • 发送message
  • 订阅message
  • 发送ack
  • 返回message(服务器->客⼾端)

设计应用层协议

因为Message的消息体本⾝就是⼆进制的.因此不太⽅便使⽤json等⽂本格式的协议
在这里插入图片描述
其中type表⽰请求响应不同的功能.取值如下:

  • 0x1 创建channel
  • 0x2 关闭channel
  • 0x3 创建exchange
  • 0x4 销毁exchange
  • 0x5 创建queue
  • 0x6 销毁queue
  • 0x7 创建binding
  • 0x8 销毁binding
  • 0x9 发送message
  • 0xa 订阅message
  • 0xb 返回ack
  • 0xc 服务器给客⼾端推送的消息.(被订阅的消息)响应独有的.

其中payload部分,会根据不同的type,存在不同的格式.
对于请求来说,payload表⽰这次⽅法调⽤的各种参数信息.
对于响应来说,payload表⽰这次⽅法调⽤的返回值

定义Request/Response

创建common.Request common.Response

java">@Data
//表示一个网络通信中的请求对象 按照自定义协议的格式来展开的
public class Request {private  int type;private  int length;private  byte[] payload;
}
java">//这个对象表示一个响应
@Data
public class Response {private int type;private int length;private byte[] payload;
}

定义参数父类

构造⼀个类表⽰⽅法的参数,作为Request的payload.

不同的⽅法中,参数形态各异,但是有些信息是通⽤的,使⽤⼀个⽗类表⽰出来.具体每个⽅法的参数再通过继承的⽅式体现

common.BasicArguments

java">@Data
//使用这个类表示方法的公共参数
//后续每个方法都会有一些不同的参数 不同的参数再分别使用不同的子类来表示
public class BasicArguments implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId;
}

定义返回值父类

和参数同理,也需要构造⼀个类表⽰返回值,作为Response的payload
common.BasicReturns

java">@Data
//表示返回值的公共信息
public class BasicReturns implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId;//表示方法的返回值protected boolean ok;
}

定义其他参数类

针对每个VirtualHost提供的⽅法,都需要有⼀个类表⽰对应的参数

ExchangeDeclareArguments

java">@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String, Object> arguments;
}

⼀个创建交换机的请求,形如:

  • 可以把ExchangeDeclareArguments转成byte[],就得到了下列图⽚的结构.
  • 按照length⻓度读取出payload,就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象
    在这里插入图片描述

ExchangeDeleteArguments

java">@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}

QueueDeclareArguments

java">@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String,Object> arguments;}

QueueDeleteArguments

java">@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;}

QueueBindArguments

java">@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}

QueueUnbindArguments

java">@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;}

BasicPublishArguments

java">@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}

BasicConsumeArguments

java">@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//这个类对应的方法中 还有一个参数 是回调函数 这个回调函数是不能网络传输的}

SubScribeReturns

  • 这个不是参数,是返回值.是服务器给消费者推送的订阅消息.
  • consumerTag其实是channelId.
  • basicProperties 和 body 共同构成了Message
java">@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}

实现BrokerServer

java">/*** 本质上就是一个TCP服务器*/
@Slf4j
public class BrokerServer {private ServerSocket serverSocket = null;//当前考虑一个BrokerServer上只有一个虚拟主机private VirtualHost virtualHost = new VirtualHost("default");//使用这个哈希表 表示当前所有的会话//此处的key是channelId value为对应的socket对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ExecutorService executorService = null;//引入一个boolean变量 控制服务器是否继续运行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}public void start() throws IOException {log.info("brokerServer 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() -> {processConnection(clientSocket);});}}catch (SocketException e){log.info("服务器停止运行");}}//停止服务器public void stop() throws IOException {runnable = false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}//通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码  如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info("connection关闭! 客户端的地址: {} : {}", clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info("connection 出现异常");e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}//遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info("清理session完成 被清理的sessionId :{}",toDeleteChannelId);}//处理一次请求 返回一次响应private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info("rid:{} , channelId:{} , type:{}, length:{}", basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok = true;if (request.getType() == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info("销毁channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {//删除交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {//删除队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() == 0x7) {//创建绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() == 0x8) {//删除绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {//发布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {//订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokeServer] 订阅消息的客户端已经关闭");}//构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {//调用basicAck确认消息BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException("[BrokerServer] 未知的 type type:" + request.getType());}//构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("rid:{} , channelId:{}, type:{}, length:{}", basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错");}request.setPayload(payload);return request;}
}
  • virtualHost 表⽰服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.
  • sessions ⽤来管理所有的客⼾端的连接.记录每个客⼾端的socket.
  • serverSocket 是服务器⾃⾝的socket
  • executorService 这个线程池⽤来处理响应.
  • runnable 这个标志位⽤来控制服务器的运⾏停⽌

启动/停止服务器

java">public void start() throws IOException {log.info("brokerServer 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() -> {processConnection(clientSocket);});}}catch (SocketException e){log.info("服务器停止运行");}}//停止服务器public void stop() throws IOException {runnable = false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}

实现处理连接

对于EOFException和SocketException,我们视为客⼾端正常断开连接. ◦

如果是客⼾端先close,后调⽤DataInputStream的read,则抛出EOFException ◦
如果是先调⽤DataInputStream的read,后客⼾端调⽤close,则抛出SocketException

java">//通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码  如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info("connection关闭! 客户端的地址: {} : {}", clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info("connection 出现异常");e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

实现readRequest

java">private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错");}request.setPayload(payload);return request;}

实现writeResponse

java">private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}

实现处理请求

  • 先把请求转换成BaseArguments,获取到其中的channelId和rid
  • 再根据不同的type,分别处理不同的逻辑.(主要是调⽤virtualHost中不同的⽅法).
  • 针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.
  • 最后构造成统⼀的响应
java">private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info("rid:{} , channelId:{} , type:{}, length:{}", basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok = true;if (request.getType() == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info("销毁channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {//删除交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {//删除队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() == 0x7) {//创建绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() == 0x8) {//删除绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {//发布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {//订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokeServer] 订阅消息的客户端已经关闭");}//构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {//调用basicAck确认消息BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException("[BrokerServer] 未知的 type type:" + request.getType());}//构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("rid:{} , channelId:{}, type:{}, length:{}", basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}

实现clearClosedSessio

  • 如果客⼾端只关闭了Connection,没关闭Connection中包含的Channel,也没关系,在这⾥统⼀进⾏清理.
  • 注意迭代器失效问题
java">//遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info("清理session完成 被清理的sessionId :{}",toDeleteChannelId);}

实现客户端

创建包mqclient

创建 ConnectionFactory

⽤来创建连接的⼯⼚类.

java">@Data
public class ConnectionFactory {//broker server的ip地址private String host;//broker server的端口号private int port;public Connection newConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}
}

Connection 和Channel的定义

⼀个客⼾端可以创建多个Connection.
⼀个Connection对应⼀个socket,⼀个TCP连接.
⼀个Connection可以包含多个Channel

Connection的定义

java">public class Connection {private Socket socket = null;//使用哈希表 把若干个channel对象组织起来private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataOutputStream dataOutputStream;private DataInputStream dataInputStream;private ExecutorService callbackPool = null;
}
封装请求响应读写操作
java">public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info("发送请求 type:{} ,length:{}", request.getType(), request.getLength());}//读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整");}response.setPayload(payload);log.info("收到响应 type:{}, length:{}", response.getType(), response.getLength());return response;}
创建channel
java">//创建Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID();Channel channel = new Channel(channelId, this);//把这个 Channel对象 放到Connection 管理channel 的哈希表中channelMap.put(channelId, channel);//同时也需要把创建channel这个消息告诉服务器boolean ok = channel.createChannel();if (!ok) {//整个这次创建channel操作不顺利//把哈希表中的键值对删除channelMap.remove(channelId);return null;}return channel;}

Channel的定义

java">public class Channel {private String channelId;//当前channel属于哪个连接private Connection connection;//记录后续客户端收到的服务器的响应private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果当前Channel订阅了某个队列 就需要在此处记录下对应回调是啥,当该队列的消息返回来的时候,调用回调//此处约定一个Channel中只能有一个回调private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}
}
  • channelId 为channel的⾝份标识,使⽤UUID标识.
  • Connection 为channel对应的连接.
  • baseReturnsMap ⽤来保存响应的返回值.放到这个哈希表中⽅便和请求匹配.
  • consumer为消费者的回调(⽤⼾注册的).对于消息响应,应该调⽤这个回调处理消息.
创建channel
java"> //在这个方法中 和服务器进行交互 告诉服务器 此时客户端创建了新的channelpublic boolean createChannel() throws IOException {//对于创建Channel操作来说 payload就是一个basicArguments对象BasicArguments arguments = new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(arguments);//构造type = 0x1的对象Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//等待服务器的响应BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
实现generateRid
java">private String generateRid() {return "R-" + UUID.randomUUID();}
实现waitResult
  • 由于服务器的响应是异步的.此处通过waitResult实现同步等待的效果
java">private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {//如果查询结果为null 说明包裹还没有回来//此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}//读取成功后 把消息从哈希表中删除掉basicReturnsMap.remove(rid);return basicReturns;}
关闭channel
java">//关闭channel 发送type = 0x2public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}
创建交换机
java">//创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}
删除交换机
java">//删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setRid(generateRid());exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());return basicReturns.isOk();}
创建队列
java">//创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}```####  删除队列```java
//删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
创建绑定
java">//创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
删除绑定
java">//解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
发送消息
java">//发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setBasicProperties(basicProperties);arguments.setRoutingKey(routingKey);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
订阅消息
java">//订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {//先设置回调if (this.consumer != null) {throw new MqException("该channel已经设置过消费信息的回调了 不能重复设置");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);//consumerTag 也是用channelId来表示了arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
确认消息
java">//确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

处理响应

创建扫描线程

创建⼀个扫描线程,⽤来不停的读取socket中的响应数据

注意:⼀个Connection中可能包含多个channel,需要把响应分别放到对应的channel中

java">public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//创建一个扫描线程 由这个线程负责不停的对socket中读取响应数据 把这个响应数据再交给对应的channel负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {//连接正常断开 忽略这个异常log.info("连接正常断开");} catch (IOException | ClassNotFoundException | MqException e) {log.info("连接异常断开");e.printStackTrace();}});t.start();}
实现响应的分发

给Connection创建dispatchResponse⽅法

java">//使用这个方法 分别来处理//这个消息是 针对控制请求的响应 还是服务器推送的消息private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {//服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根据channelId 找到对应的channel对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connect] 该消息对应的channel 在客户端中不存在 channelId = " + channel.getChannelId());}//执行该channel对象内部的回调callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {//当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把这个结果放在对应的channel的hash表中Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该队列对应的channel在客户端中不存在");}channel.putReturns(basicReturns);}}
java">public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){//当前也不知道多少线程在等待上述的这个响应//把所有的等待线程都唤醒notifyAll();}}

关闭Connection

java">//关闭channel 发送type = 0x2public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}

http://www.ppmy.cn/news/1438927.html

相关文章

Linux安装Kubernetes(k8s)详细教程

系统初始化 生产环境肯定要更高配置&#xff0c;虚拟机以保守的最低配置。 机器ip规格master192.168.203.111核2线程、2G内存、40G磁盘node2192.168.203.121核2线程、2G内存、40G磁盘node3192.168.203.131核2线程、2G内存、40G磁盘 修改为静态ip vi /etc/resolv.conf追加内容…

如何在Pycharm中使用Git来进行版本管理

推荐视频:git pycharm的使用 连接github_哔哩哔哩_bilibilipycharm git的使用简单介绍 最近应该不会更新技能相关视频了 准备开题, 视频播放量 13042、弹幕量 2、点赞数 208、投硬币枚数 143、收藏人数 343、转发人数 58, 视频作者 呃呃燕, 作者简介 努力入门的计算机双非研究生…

原型模式(上机考试抽题)

定义 原型模式主要解决的问题就是创建复对象&#xff0c;⽽这部分 对象 内容本身⽐较复杂&#xff0c;⽣成过程可能从库或者RPC接⼝中获取数据的耗时较⻓&#xff0c;因此采⽤克隆的⽅式节省时间。 上机考试抽题 从⼀部分可以上机考试的内容开始&#xff0c;在保证⼤家的公平…

《第二行代码》第二版学习笔记(6)——内容提供器

文章目录 一 运行时权限2.权限分类3 运行时申请权限 二、内容提供器1、 ContentResolver的基本用法2、现有的内容提供器3、创建自己的内容提供器2.1 创建内容提供器的步骤2.2 跨程序数据共享 内容提供器&#xff08;Content Provider&#xff09;主要用于在不同的应用程序之间实…

海外市场稳步推进,俄罗斯客户莅临湖南创远洽谈合作

4月18日&#xff0c;来自俄罗斯的意向合作客户到访湖南创远参观考察&#xff0c;并就双方今后合作进行了深入沟通。王毅董事长代表公司热情接待了远道而来的客人&#xff0c;对他们的到来表示热烈的欢迎。 客户在参观公司生产车间时&#xff0c;对公司天井钻机产品进行了详细了…

vue 关键字变红

1.html <div v-html"replaceKeywordColor(item.title)" ></div> 2.js //value为搜索框内绑定的值 replaceKeywordColor(val) {if (val?.includes(this.value) && this.value ! ) {return val.replace(this.value,<font color"red&…

SVM原理:怎么实现维度映射的

目录 SVM原理:怎么实现维度映射的 忽略异常点​ 软间隔:忽略部分异常值

泛微E9开发 如何自定义流程标题

1、功能背景 主表中有“选择类别”下拉框字段&#xff0c;用户可以根据需求来选择申请类别&#xff0c;一般多个相似流程的申请可以合并成一个&#xff0c;但是为了区分&#xff0c;我们可以通过将标题修改的方式来使整个显示页面更明确。 2、展示效果 3、实现方法 注意&…