SpringCloud源码探析(六)-消息队列RabbitMQ

news/2025/1/11 18:31:28/

1.概述

RabbitMQ是一个开源的消息代理和队列服务器,它是基于Erlang语言开发,并且是基于AMQP协议的。由于Erlang语言最初使用与交换机领域架构,因此使得RabbitMQ在Broker之间的数据交互具有良好的性能。AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种消息队列应用层协议,专门面向消息的中间件而设计,类似于JAVA的JMS协议,基于此规范能够开发出各种各样的消息中间件。RabbitMQ能够与SpringAMQP完美整合,具有较强的扩展性和丰富的API,且具有高可靠性和低延时的优点,被业界广泛使用。本文将介绍SpringAMQP与Rabbit的使用,帮助读者更好地理解消息队列。

2.消息队列RabbitMQ使用

2.1 MQ对比

在这里插入图片描述
由上图可知,从可靠性和消息延迟的角度来说,RabbitMQ都是较为突出的,从吞吐量的角度来说可能一般。因此在选用消息队列中间件时,应根据使用场景选择更为合适的。

2.2 AMQP核心概念

名称概念
Server/Broker可以理解为消息队列实体
VHost虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离
Exchange消息交换机,它指定消息按一定规则,路由到对应队列
Queue消息队列载体,每个消息可能会被投到一个或多个队列
Producer消息生产者,消息投递方
Consumer消息消费者,接收消息的程序
Binding绑定器,它将exchange和queue按照路由规则绑定起来
channel消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
Routing Key路由关键字,exchange根据关键字将消息投递到指定queue

2.3 RabbitMQ架构图

在这里插入图片描述

由上图可知,生产者通过channel连接到Broker,将消息投送到Exchange,Exchange再按照指定规则将消息投送到对应的queue,消费者通过监听指定的queue来获取消息。生产者不需要关注投递到那个队列,消费者也不关心消息是从哪个Exchange发送而来,两者之间是松耦合的关系。Exchange的类型有:Fanout交换机、Direct交换机、Topic交换机。Fanout交换机是一种广播模式,消息进来时会投递到所有与之绑定的队列。Direct交换机是完全根据key进行匹配队列,key一致时才会投送。Topic交换机可以按一定的规则进行匹配key,匹配成功进行投递。

2.4 SpringBoot中使用RabbitMQ

2.4.1 引入pom文件

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.4.2 添加配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

2.4.3 编写生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitApplication.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}}

2.4.3 编写消费者

@Component
public class RabbitMQListener {@RabbitListener(queues = "simple.queue")public void receiveMessage(String message) {System.out.println("接收到消息:" + message);}
}

2.4.4 测试结果

在这里插入图片描述
运行完成测试代码后,消费者收到消息。上述代码展示的是RabbitMQ最基础的发送和接收模型,生成者将消息发送到指定队列,消费者订阅指定队列获取消息。

2.5 RabbitMQ核心消息发送模型

2.5.1 WorkQueue模型

该模型一个队列可以对应多个消费者,适用于发送者发送大量消息,容易发送积压,因此需要多个消费者来消费。
在这里插入图片描述
2.workQueue测试代码
测试类:

    @Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "work.queue";// 消息String message = "hello, world";for (int i = 1; i <= 50; i++) {// 发送消息String messageInfo = message + "..." + i;rabbitTemplate.convertAndSend(queueName, messageInfo);log.info("消息发送成功:{}", messageInfo);Thread.sleep(50);}}

消费代码:

    @RabbitListener(queues = "work.queue")public void receiveWorkMessage1(String message) throws InterruptedException {System.out.println("消息队列1...接收到消息:" + message);Thread.sleep(10);}@RabbitListener(queues = "work.queue")public void receiveWorkMessage2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);Thread.sleep(15);}

3.workQueue运行结果
在这里插入图片描述
在这里插入图片描述

2.5.2 FanoutExchange模型

1.FanoutExchange消息发送模型
在这里插入图片描述
FanoutExchanger模型比简单模型和WorkQueue模型多了一个交换机(Exchange),消息先会发送到Exchange,然后Exchange将消息路由到每一个与其绑定的queue。
2.代码配置

@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

发送部分测试代码:

  @Testpublic void testFanout() {// 队列名称String exchangeName = "fanout.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"", message);}

3.运行结果
在这里插入图片描述

2.5.3 DirectExchange模型

DirectExchange的消息发送模式与FanoutExchange模型基本一致,区别在于它会将收到的消息根据规则路由到指定的Queue,也就是说中间多了一层根据规则筛选发送队列,它的每一个Queue都与Exchange设置一个BindingKey,发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
1.代码如下
监听者部分代码如下:

 @RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void receiveDirectQueue1(String message) throws InterruptedException {System.out.println("消息队列1...接收到消息:" + message);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void receiveDirectQueue2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);}

测试代码如下:

 @Testpublic void testDirect() {// 队列名称String exchangeName = "direct.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"red", message);}@Testpublic void testDirectYellow() {// 队列名称String exchangeName = "direct.exchange";// 消息String message = "hello, world!";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"yellow", message);}

2.测试结果
在这里插入图片描述
在这里插入图片描述
由上述结果可知,当队列订阅同一个Exchange时,向指定Exchange并携带RoutingKey,只有包含对应RoutingKey的队列才能收到消息。

2.5.4 TopicExchange模型

TopicExchange与DirectExchange类似,区别在于RoutingKey必须是多个单词的列表,并且以.分割,Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词
*:代指一个单词。

1.测试代码

 @RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC),key = {"#.news"}))public void receiveTopicQueue2(String message) throws InterruptedException {System.out.println("消息队列2...接收到消息:" + message);}

发送消息部分代码:

@Testpublic void testTopicQueue() {// 队列名称String exchangeName = "topic.exchange";// 消息String message = "湖人总冠军";// 发送消息rabbitTemplate.convertAndSend(exchangeName,"sports.news", message);}

2.测试结果
在这里插入图片描述

3.小结

1.RabbitMQ是一个遵循AMQP协议的消息中间件,消息从生产者发送到消费者,他能根据规则指定消息发送对象,缓存或进行持久化;
2.Spring AMQP是基于AMQP协议开放的接口,能够无缝对接各种基于AMQP的协议,快速利用Spring进行开发;
3.RabbitMQ延时低,而可靠性高,适用于吞吐量较大且对信息实时性要求较高的场景。

4.参考文献

1.https://www.bilibili.com/video/BV1LQ4y127n4
2.https://juejin.cn/post/6844903903637536775
3.https://www.cnblogs.com/tinmh/p/6026726.html

5.附录

https://gitee.com/Marinc/nacos.git


http://www.ppmy.cn/news/67785.html

相关文章

生态、遥感、大气、水文水资源、地下水土壤、人工智能等多领域教程

理论讲解案例实战动手实操讨论互动 针对 生态农林、遥感、语言土壤、统计、人工智能等领域全套教程。包含:InVEST模型、DSSAT模型、CENTURY模型、CASA模型、SWH蒸散模型、BGC模型、MAXENT模型、CLM模式、CLUE模型、PROSAIL模型、Biomod模型、Hydrus模型、Meta分析、ArcGIS、MAT…

MUX VLAN原理与配置

MUX VLAN原理 主VLAN(Principal VLAN) 主VLAN(Principal VLAN):可以与MUX VLAN内的所有VLAN进行通信。 从VLAN(Subordinate VLAN)分为: 隔离型从VLAN(Separate VLAN):只能和Principal VLAN进行通信,和其他类型的VLAN完全隔离,Separate VLAN内部也完全隔离。 互通…

笔记-指针的进阶

1.字符指针 char arr[] "hello bit." char * p arr 这里p指向的是数组的首元素&#xff0c;arr数组是可以修改的。 &#xff08;const&#xff09;char * pstr "hello bit." 这里的字符串是常量字符串&#xff0c;不能修改。 这里有一个面试题&#xf…

unity实现子弹散射效果和闪电链效果

子弹散射 实现爆炸散射效果可以按照以下步骤进行: 1.准备子弹模型和爆炸特效模型,可以使用粒子特效或者模型。 2.创建子弹和敌人模型,同时添加刚体组件。 3.创建子弹的脚本,绑定到子弹上。 4.在脚本中,对子弹的 OnTriggerEnter 或 OnCollisionEnter 函数做出响应,检测敌…

肠道核心菌属——Lachnoclostridium

谷禾健康 Lachnoclostridium属是一类革兰氏阳性菌&#xff0c;专性厌氧、形成孢子、属于Clostridiales目、Lachnospiraceae科、Firmicutes门。该属最初被描述为Clostridium phytofermentans&#xff0c;后来被重新分类为Lachnoclostridium属。 Lachnoclostridium属包括来自Lach…

浏览csdn博客自动隐藏侧边栏并只看目录

背景 CSDN 总算做了点好事&#xff0c;能够隐藏大部分无关信息&#xff0c;只看博客内容本身。具体如图&#xff0c;还在测试版 以我的一篇博客为例&#xff0c;原始界面&#xff0c;花里胡哨一堆 点击隐藏侧栏后的清爽版 点击只看目录后的清爽版 前提提要 安装油猴脚本&…

ComPDFKit PDF SDK for Windows crack

ComPDFKit PDF SDK for Windows crack 增加了对新文本编辑功能的支持&#xff0c;如添加其他字体、设置粗体/斜体、复制文本样式和修改文本透明度。 增加了对新级别文档加密的支持&#xff0c;包括AES-128和AES-256。 ComPDFKit PDF SDK允许开发人员在Windows(iOS和Android平台…

在 AWS 中存储应用程序参数的最佳方式

许多应用程序现在托管在公共云平台上&#xff0c;因此必须利用云来存储其数据和应用程序参数。在最受欢迎的云提供商中&#xff0c;亚马逊网络服务&#xff08;AWS&#xff09;是使用最广泛的。虽然 AWS 提供了许多用于存储应用程序参数的解决方案&#xff0c;但了解哪个选项最…