RabbitMQ五种消息模型

devtools/2025/3/14 22:15:48/

RabbitMQ 是一款基于 AMQP 协议的高性能消息中间件,广泛应用于分布式系统中,用于实现服务之间的异步通信、解耦和负载均衡。RabbitMQ 提供了五种常见的消息模型,每种模型都有其独特的特点和适用场景。本文将详细介绍这五种消息模型,帮助读者更好地理解和使用 RabbitMQ。

一、简单模式(Simple Queue)

1.1 模型介绍

简单模式是最基础的消息传递模型,包含一个生产者、一个队列和一个消费者。生产者将消息发送到队列,消费者从队列中接收消息。这种模式适用于一对一的通信场景。

1.2 工作流程

  1. 生产者将消息发送到指定的队列。

  2. 消费者监听队列,获取并处理消息。

  3. 消息被消费后从队列中删除。

1.3 应用场景

适用于简单的任务分配场景,例如日志记录、邮件通知等。

创建一个工程demo,两个子模块consumer和publisher

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

配置:我是把mq安装到的windows

spring:rabbitmq:host: localhost # 你的虚拟机IPport: 5672 # 端口virtual-host: /yyf # 虚拟主机username: yyf # 用户名password: 123456 # 密码

 根基mq配置

生产消息

package com.itfly;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;/*** Unit test for simple App.*/
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "test.queue";// 消息String message = "hello, spring amqp! yyf";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

 接收消息:在消费者模块

package com.itfly.controller;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "test.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

启动消费者服务,再启动测试类

二、工作模式(Work Queue)

2.1 模型介绍

工作模式用于在多个消费者之间分配任务。一个生产者将消息发送到队列,多个消费者可以并发地从队列中获取任务并处理。这种模式可以实现任务的负载均衡。

一条消息只能被一个消费者处理

2.2 工作流程

  1. 生产者将消息发送到队列。

  2. 多个消费者监听同一个队列,竞争获取消息。

  3. 消费者处理完消息后,消息从队列中删除。

2.3 应用场景

适用于需要并发处理的任务分配场景,例如批量处理订单、视频转码等。

 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

 

三、发布/订阅模式(Publish/Subscribe)

发布/订阅模式是一种一对多的消息传递模型,生产者将消息发送到交换机(Exchange),交换机根据绑定规则将消息分发到多个队列,从而实现消息的广播。

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

3.1 广播模式(Fanout Exchange)

Fanout 交换机将消息广播到所有绑定的队列,不关心消息的路由键。

 

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "test.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);//三个参数
}

3.2 路由模式(Direct Exchange)

Direct 交换机根据路由键将消息发送到指定的队列。生产者在发送消息时指定路由键,队列在绑定交换机时也指定路由键,只有匹配的队列才能接收消息。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

  • @Test
    public void testSendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

    消息只会被路由到Routingkey为red的队列

3.3 主题模式(Topic Exchange)

Topic 交换机允许使用通配符匹配路由键,从而实现更灵活的消息订阅。例如,路由键可以使用 *#,其中 * 表示一个单词,# 表示零个或多个单词。

3.4 应用场景

适用于需要一对多消息传递的场景,例如日志收集、事件通知等。

 

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

四、RPC 模式(Remote Procedure Call)

RPC 模式用于实现远程过程调用。客户端发送请求消息到队列,服务端处理请求后将响应消息发送到另一个队列,客户端从该队列中获取响应。

4.1 工作流程

  1. 客户端发送请求消息到请求队列。

  2. 服务端处理请求,将响应消息发送到响应队列。

  3. 客户端从响应队列中获取响应消息。

4.2 应用场景

适用于需要同步调用的场景,例如远程接口调用。

五、延迟队列模式(Delayed Queue)

延迟队列用于实现消息的延迟处理。生产者发送消息时可以指定延迟时间,消息在达到延迟时间后才会被消费者消费。

5.1 工作流程

  1. 生产者发送消息到队列,并指定延迟时间。

  2. 消息在队列中等待指定的延迟时间。

  3. 延迟时间到达后,消费者从队列中获取消息并处理。

5.2 应用场景

适用于需要延迟处理的场景,例如定时任务、订单超时处理等。

 


http://www.ppmy.cn/devtools/167127.html

相关文章

在 VMware 中安装 Ubuntu 的超详细实战分享

目录 1. 安装准备VMware 软件获取Ubuntu 镜像获取 2. 创建新的虚拟机基础配置自定义硬件设置 3. Ubuntu 系统安装过程启动虚拟机正式安装 Ubuntu安装过程中常见问题 4. 安装后优化安装 VMware Tools系统更新与软件安装分辨率与显示设置 5. 常见故障及解决方案黑屏或安装卡顿网络…

接口测试工具:postman详解

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Postman 是一款功能强大的 API 开发和测试工具&#xff0c;以下是一些高级用法的详细介绍和操作步骤。 一、环境和全局变量 环境变量允许你设置特定于环境&#…

Gartner发布量子网络安全策略指南:2030年量子计算将能够破坏传统的加密算法

攻击者采用“先收集后解密”策略&#xff0c;为企业带来隐患。加密数据流目前无法读取&#xff0c;但可以保存&#xff0c;直到量子计算能够解密。I&O 领导者可以通过实施后量子密码学策略来降低这种风险。 主要发现 密码相关量子计算机 (CRQC) 将能够在数小时而不是数年内…

Kubernetes(K8s)集群中使用 GPU

在 Kubernetes&#xff08;K8s&#xff09;集群中使用 GPU&#xff0c;需要完成安装驱动、部署插件、配置 containerd、实现 GPU 虚拟化及部分使用等一系列步骤&#xff0c;下面为你详细介绍。 1. 安装 GPU 驱动 以 NVIDIA GPU 为例&#xff0c;因为在深度学习和机器学习场景…

大语言模型-1.3-GPT、DeepSeek模型介绍

简介 本博客内容是《大语言模型》一书的读书笔记&#xff0c;该书是中国人民大学高瓴人工智能学院赵鑫教授团队出品&#xff0c;覆盖大语言模型训练与使用的全流程&#xff0c;从预训练到微调与对齐&#xff0c;从使用技术到评测应用&#xff0c;帮助学员全面掌握大语言模型的…

【面试题系列】 Redis 核心面试题(二)答案

本文主要介绍Redis 的面试题&#xff0c;涵盖持久化、集群、缓存策略、事务等方面 一、持久化机制 1. RDB 与 AOF 的核心区别及适用场景&#xff1f; 答案&#xff1a; 特性RDBAOF存储内容内存快照&#xff08;二进制文件&#xff09;写命令日志&#xff08;文本格式&#x…

k8s面试题总结(十二)

1.简述ETCD适应的场景&#xff1f; 适用于数据高一致性的场景&#xff0c;确保分布式环境中的数据是一致的。适用于服务高可用时的场景。适用于多节点数据分布式存储的场景。适用于服务之间协调和交互使用的场景。 2.Etcd集群之间是怎么同步数据的&#xff1f; 在etcd集群中…

Vue:其他指令

Vue&#xff1a;其他指令 2.13.1、v-text v-text 指令用于将数据填充到标签体当中&#xff0c;并且是以覆盖的形式填充。与原生JS中的 innerText 功能类似&#xff0c;填充的内容中即使存在HTML标签也只是会当做一个普通的字符串处理&#xff0c;不会解析。例如&#xff1a; …