RabbitMQ五种消息模型

news/2025/3/19 0:46:13/

RabbitMQ 是一款基于 AMQP 协议的高性能消息中间件,广泛应用于分布式系统中,用于实现服务之间的异步通信、解耦和负载均衡。RabbitMQ 提供了五种常见的消息模型,每种模型都有其独特的特点和适用场景。本文将详细介绍这五种消息模型,帮助读者更好地理解和使用 RabbitMQ。

一、简单模式(Simple Queue)

1.1 模型介绍

简单模式是最基础的消息传递模型,包含一个生产者、一个队列和一个消费者。生产者将消息发送到队列,消费者从队列中接收消息。这种模式适用于一对一的通信场景。

1.2 工作流程

  1. 生产者将消息发送到指定的队列。

  2. 消费者监听队列,获取并处理消息。

  3. 消息被消费后从队列中删除。

1.3 应用场景

适用于简单的任务分配场景,例如日志记录、邮件通知等。

创建一个工程demo,两个子模块consumer和publisher

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

配置:我是把mq安装到的windows

spring:rabbitmq:host: localhost # 你的虚拟机IPport: 5672 # 端口virtual-host: /yyf # 虚拟主机username: yyf # 用户名password: 123456 # 密码

 根基mq配置

生产消息

package com.itfly;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;/*** Unit test for simple App.*/
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "test.queue";// 消息String message = "hello, spring amqp! yyf";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

 接收消息:在消费者模块

package com.itfly.controller;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "test.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

启动消费者服务,再启动测试类

二、工作模式(Work Queue)

2.1 模型介绍

工作模式用于在多个消费者之间分配任务。一个生产者将消息发送到队列,多个消费者可以并发地从队列中获取任务并处理。这种模式可以实现任务的负载均衡。

一条消息只能被一个消费者处理

2.2 工作流程

  1. 生产者将消息发送到队列。

  2. 多个消费者监听同一个队列,竞争获取消息。

  3. 消费者处理完消息后,消息从队列中删除。

2.3 应用场景

适用于需要并发处理的任务分配场景,例如批量处理订单、视频转码等。

 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

 

三、发布/订阅模式(Publish/Subscribe)

发布/订阅模式是一种一对多的消息传递模型,生产者将消息发送到交换机(Exchange),交换机根据绑定规则将消息分发到多个队列,从而实现消息的广播。

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

3.1 广播模式(Fanout Exchange)

Fanout 交换机将消息广播到所有绑定的队列,不关心消息的路由键。

 

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "test.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);//三个参数
}

3.2 路由模式(Direct Exchange)

Direct 交换机根据路由键将消息发送到指定的队列。生产者在发送消息时指定路由键,队列在绑定交换机时也指定路由键,只有匹配的队列才能接收消息。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

  • @Test
    public void testSendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

    消息只会被路由到Routingkey为red的队列

3.3 主题模式(Topic Exchange)

Topic 交换机允许使用通配符匹配路由键,从而实现更灵活的消息订阅。例如,路由键可以使用 *#,其中 * 表示一个单词,# 表示零个或多个单词。

3.4 应用场景

适用于需要一对多消息传递的场景,例如日志收集、事件通知等。

 

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

四、RPC 模式(Remote Procedure Call)

RPC 模式用于实现远程过程调用。客户端发送请求消息到队列,服务端处理请求后将响应消息发送到另一个队列,客户端从该队列中获取响应。

4.1 工作流程

  1. 客户端发送请求消息到请求队列。

  2. 服务端处理请求,将响应消息发送到响应队列。

  3. 客户端从响应队列中获取响应消息。

4.2 应用场景

适用于需要同步调用的场景,例如远程接口调用。

五、延迟队列模式(Delayed Queue)

延迟队列用于实现消息的延迟处理。生产者发送消息时可以指定延迟时间,消息在达到延迟时间后才会被消费者消费。

5.1 工作流程

  1. 生产者发送消息到队列,并指定延迟时间。

  2. 消息在队列中等待指定的延迟时间。

  3. 延迟时间到达后,消费者从队列中获取消息并处理。

5.2 应用场景

适用于需要延迟处理的场景,例如定时任务、订单超时处理等。

 


http://www.ppmy.cn/news/1580196.html

相关文章

汉桑科技IPO:潜藏两大风险 公众投资者权益或受损

冰山之所以危险&#xff0c;是因为只有八分之一在水面上。 ——语出小说家海明威。 引 言 野村证券提供的一份报告显示&#xff0c;2025年前两个月&#xff0c;我国出口同比增长仅有2.3%&#xff0c;与去年四季度9.9%的增长显著下滑。与此同时&#xff0c;从2月1日开始&a…

基于金融产品深度学习推荐算法详解【附源码】

深度学习算法说明 1、简介 神经网络协同过滤模型(NCF) 为了解决启发式推荐算法的问题&#xff0c;基于神经网络的协同过滤算法诞生了&#xff0c;神经网络的协同过滤算法可以 通过将用户和物品的特征向量作为输入&#xff0c;来预测用户对新物品的评分&#xff0c;从而解决…

给文件提添加高亮信息

给文件提添加高亮信息 因为在查看log的时候需要人工校验标签&#xff0c;因此萌生了用插件高亮标签方便查看的想法。 效果展示&#xff1a; 设备&#xff1a;VScode 设置步骤 下载Highlight插件 点击管理→设置→在setting.json中编辑 添加以下内容 "(<…

Android手机中各类安全相关知识总结

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 1. Android 安全威胁2. Android 安全防护措施3. Android 安全建议和最佳实践4. Android 安全工具推荐5. Android 安全常见问题5.1 如何检测设备是否感染恶意软件?5.2 如何防止应用滥用权限?5.3 如何保护设备免受网络攻…

在 Qt 中自定义控件样式:使用 QProxyStyle 代理和修改绘制元素

文章目录 在 Qt 中自定义控件样式&#xff1a;使用 QProxyStyle 代理和修改绘制元素1. 什么是 QProxyStyle&#xff1f;QStyle 和 QProxyStyle何时使用 QProxyStyle&#xff1f;关键方法&#xff1a;drawPrimitive() 2. drawPrimitive() 方法详解参数解析1. PrimitiveElement e…

单元测试、系统测试、集成测试、回归测试的步骤、优点、缺点、注意点梳理说明

单元测试、系统测试、集成测试、回归测试的梳理说明 单元测试 步骤&#xff1a; 编写测试用例&#xff0c;覆盖代码的各个分支和边界条件。使用测试框架&#xff08;如JUnit、NUnit&#xff09;执行测试。检查测试结果&#xff0c;确保代码按预期运行。修复发现的缺陷并重新测…

【解决】XCode不支持旧版本的iOS设备

办法&#xff1a; 手动添加设备支持文件&#xff08;暂时解决方式&#xff09; 如果您无法立即升级 Xcode&#xff0c;也可以通过下载设备支持文件来暂时解决问题。 检查当前设备的 iOS 版本&#xff1a; 连接设备到 Mac&#xff0c;打开 Xcode 查看提示的 iOS 版本。例如&…

Vue中使用到的padStart方法是什么

padStart() 是 JavaScript 字符串对象的一个方法&#xff0c;用于在字符串的开头填充指定的字符&#xff0c;直到字符串达到指定的长度。这在需要对字符串进行格式化&#xff0c;使其保持固定长度时非常有用&#xff0c;比如在日期格式化时&#xff0c;确保月份、日期等为两位数…