2024.2.29 模拟实现 RabbitMQ —— 项目展示

news/2025/1/3 5:31:42/

目录

项目介绍

核心功能

核心技术

演示直接交换机

演示扇出交换机

演示主题交换机


项目介绍

  • 此处我们模拟 RabbitMQ 实现了一个消息队列服务器

核心功能

  • 提供了 虚拟主机交换机队列绑定消息 概念的管理
  • 九大核心 API 创建队列销毁队列创建交换机销毁交换机创建绑定解除绑定发布消息订阅消息确认消息
  • 实现了三种典型的消息转换方式 直接交换机(Direct)扇出交换机(Fanout)主题交换机(Topic)
  • 交换机队列绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
  • 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作

核心技术

  • Spring Boot / MyBatis / Lombok
  • SQLite
  • TCP

  • 关于该项目的需求分析,可点击下方链接跳转

模拟实现 RabbitMQ —— 需求分析


  • 关于该项目的核心类,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现核心类


  • 关于该项目的数据库操作,可点击下方链接跳转

模拟实现 RabbitMQ —— 数据库操作


  • 关于该项目的消息持久化,可点击下方链接跳转

模拟实现 RabbitMQ —— 消息持久化


  • 关于该项目的内存数据管理,可点击下方链接跳转

模拟实现 RabbitMQ —— 内存数据管理


  • 关于该项目的虚拟机设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 虚拟主机设计


  • 关于该项目的交换机转发规则,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现转发规则


  • 关于该项目的消费逻辑,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现消费消息逻辑


  • 关于该项目网络通信设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 网络通信设计(服务器)

模拟实现 RabbitMQ —— 网络通信设计(客户端)

演示直接交换机

  • 简单写一个 demo 模拟 跨主机的生产者消费者模型
  • 此处为了方便,就在本机演示

  • 此处我们创建的交换机类型为 直接交换机

1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!

@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}
}

2、编写生产者代码

/*
* 这个类用来表示一个生产着
* 通常这是一个单独的服务器程序
* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//        创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);//        创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者代码

/*
* 这个类表示一个消费者
* 通常这个类也应该是在一个独立的服务器中被执行
* */
public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台

演示扇出交换机

  • 此处我们创建的交换机类型为 扇出交换机

 1、编写生产者代码

/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者A 代码

/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null);
//        创建队列channel.queueDeclare("testQueue1",true,false,false,null);
//        设置绑定channel.queueBind("testQueue1","testExchange","");
//        订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、编写消费者B 代码

/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建队列channel.queueDeclare("testQueue2",true,false,false,null);
//        设置绑定channel.queueBind("testQueue2","testExchange","");
//        订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

5、启动 Spring Boot 项目(启动 Broker Server)


6、运行消费者A 代码


7、运行消费者B 代码


8、运行生产者代码


9、继续观察消费者A 的控制台


10、继续观察消费者B 的控制台

演示主题交换机

  • 此处我们创建的交换机为 主题交换机

 1、编写生产者代码

/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//        创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);//        创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);//        创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者代码

/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
//        创建队列channel.queueDeclare("testQueue",true,false,false,null);
//        设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb");
//        订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});
//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台


http://www.ppmy.cn/news/1363057.html

相关文章

Linux ip route命令

理解ip route命令 ip route是Linux系统中的一个非常常用的命令,它用于配置和管理Linux的路由表。通过ip route命令,管理员可以查看、添加、删除或修改Linux系统的路由表,从而决定数据包如何在网络中传输。例如,当一台Linux机器需要…

Visual Studio清单作用

1、作用: 制定程序依赖的C运行库的dll及版本,包括mfc,atl,crt等,在Visual Studio安装目录下的vc/redist下有debug和release版本 2、确定应用程序依赖哪些visual C 库方法: 查看项目-》项目设置-》常规&…

C语言--贪吃蛇

目录 1. 实现目标2. 需掌握的技术3. Win32 API介绍控制台程序控制台屏幕上的坐标COORDGetStdHandleGetConsoleCursorinfoCONSOLE_CURSOR_INFOSetConsoleCursorInfoSetConsoleCursorPositionGetAsyncKeyState 4. 贪吃蛇游戏设计与分析地图<locale.h>本地化类项setlocale函…

2279. 网络战争(最小割,01分数规划,二分)

活动 - AcWing 给出一个带权无向图 G(V,E)&#xff0c;每条边 e 有一个权 we。 求将点 s 和点 t 分开的一个边割集 C&#xff0c;使得该割集的平均边权最小&#xff0c;即最小化&#xff1a; ∑(e∈C)we/|C| 注意&#xff1a; 边割集的定义与最小割中的割边的集合不同。在本…

『NLP学习笔记』图解 BERT、ELMo和GPT(NLP如何破解迁移学习)

图解 BERT、ELMo和GPT(NLP如何破解迁移学习) 文章目录 一. 前言二. 示例-句子分类三. 模型架构3.1. 模型输入3.2. 模型输出四. BERT VS卷积神经网络五. 词嵌入新时代5.1. 简要回顾词嵌入Word Embedding5.2. ELMo: 上下文语境很重要5.3. ELMo的秘密是什么?5.4. ULM-FiT:将迁移…

程序员的护城河是什么?最终走向……?

程序员未来会大量失业&#xff0c;就是因为社会需求少&#xff0c;导致开发者岗位减少&#xff0c;人力资源过剩所导致。Android刚开始的零几年非常火热&#xff0c;是个人都要。到如今的内卷&#xff0c;高级开发都拿着中低程序员的薪资。这是因为头部大厂形成标准化&#xff…

Java 中常用的数据结构类 API

目录 常用数据结构API 对应的线程安全的api 高可用衡量标准 常用数据结构API ArrayList: 实现了动态数组&#xff0c;允许快速随机访问元素。 import java.util.ArrayList; LinkedList: 实现了双向链表&#xff0c;适用于频繁插入和删除操作。 import java.util.LinkedLis…

xss靶场实战(xss-labs-master靶场)

xss-labs-master靶场链接&#xff1a;https://pan.baidu.com/s/1X_uZLF3CWw2Cmt3UnZ1bTw?pwdgk9c 提取码&#xff1a;gk9c xss-labs level 1 修改 url 地址中的name<script>alert(1)</script>&#xff0c;便可以通关 level 2 在搜索框中输入的 JS 代码无法执行 …