什么是RabbitMQ?
它基于AMQP协议(Advanced Message Queuing Protocol),一种为应用构建消息队列的标准协议。过程中,它提供了一些重要模块:为消息发送的Producer(生产者),分发消息的Exchange(交换器),写入消息的Queue(队列),和读取消息的Consumer(消费者)。
MQ 的核心概念
1. 异步处理
问题:系统需要执行一些耗时操作(如发送邮件、生成报告),如果这些操作在主线程执行,会影响用户体验或导致系统响应变慢。
MQ 的解决方式:
- 生产者将任务消息放入队列,不需要等待任务完成。
- 消费者在后台异步处理任务。
示例:
- 用户下单后,系统需要发送订单确认邮件。如果没有 MQ,用户可能需要等待邮件发送完成后才能收到订单确认。
- 使用 MQ 后,生产者(订单服务)将“发送邮件”任务放入队列,消费者(邮件服务)异步处理。
2. 系统解耦
问题:系统服务之间高度耦合,一个服务的变化会导致多个服务需要修改,降低开发效率和系统灵活性。
MQ 的解决方式:
- 服务之间通过消息队列通信,而不是直接调用。
- 生产者只需要发送消息到 MQ,消费者负责处理消息,二者互不影响。
示例:
- 用户下单后,订单服务需要通知库存服务扣减库存、物流服务生成物流单。如果没有 MQ,订单服务需同步调用这些服务的接口,导致用户必须等待所有操作完成(线性操作),响应时间较长,且系统耦合度高。
- 使用 MQ 后,订单服务将消息发送到消息队列,并立即返回“下单成功”的响应。库存服务和物流服务异步订阅消息进行处理,各服务独立运行,彼此解耦。这样既提升了用户体验,也增强了系统的扩展性和稳定性。
3. 削峰填谷
问题:在高并发场景下,大量请求瞬间涌入,可能导致服务过载或崩溃。
MQ 的解决方式:
- 将高并发的请求存入队列,消费者按自己的能力逐步处理。
- 队列可以作为缓冲区,平衡生产者和消费者之间的处理速度。
示例:
- 秒杀活动中,用户请求大量涌入库存系统。没有 MQ,库存服务可能因并发过高而宕机。
- 使用 MQ 后,所有秒杀请求进入队列,库存服务按顺序逐一处理。
好处:
- 防止系统崩溃,保障服务稳定性。
4. 数据可靠性
问题:数据传输过程中,可能因为网络故障、系统宕机等原因导致消息丢失。
MQ 的解决方式:
- MQ 提供消息持久化功能,确保即使系统故障,消息也不会丢失。
- 支持消息重试机制,确保消息至少被处理一次。
示例:
- 支付系统发送“支付成功”消息给订单系统。如果没有 MQ,网络抖动可能导致消息丢失,订单状态无法更新。
- 使用 MQ 后,消息持久化到磁盘,消费者故障恢复后可继续消费消息。
1. RabbitMQ 的核心组件
Virtual Host(虚拟主机)
定义:RabbitMQ 中的逻辑隔离单位,类似于一个独立的命名空间。
作用:
- 用于实现不同用户或系统之间的隔离。
- 每个 Virtual Host 下可以有独立的 Exchange(交换机)、Queue(队列)和绑定关系。
- 一个 RabbitMQ 服务器可以有多个 Virtual Host。
应用场景:
- 多租户系统(例如,不同的业务模块可以使用不同的 Virtual Host)。
Publisher(消息发送者)
- 定义:负责向 RabbitMQ 发送消息的生产者应用程序。
- 功能:
- 将消息发送到 Exchange(交换机),而不是直接发送到 Queue。
- 注意:
- Publisher 和 Exchange 通过绑定关系决定消息的路由。
Consumer(消息消费者)
定义:负责从 Queue(队列)中接收消息并处理的应用程序。
功能:
- 消费者直接从队列中读取消息。
- 每个消息只会被一个消费者处理(点对点模式)。
Queue(队列)
定义:存储消息的缓冲区,用于临时保存消息。
功能:
- 消息最终会路由到队列,并由消费者从队列中消费。
- 队列可以绑定到多个 Exchange,并可根据路由规则接收不同的消息。
特点:
- 持久化:队列可以配置为持久化(即使 RabbitMQ 服务重启,消息也不会丢失)。
- 排队顺序:消息按照 FIFO(先进先出)的顺序进行消费。
Exchange(交换机)
定义:负责根据路由规则分发消息的组件。
功能:
- 接收 Publisher 发送的消息,并根据路由规则决定将消息发送到哪个 Queue。
- 不直接存储消息,消息总是路由到队列中。
2. Exchange 的类型
根据不同的消息路由方式,Exchange 有以下几种类型:
Direct(直连交换机):
- 根据完全匹配的路由键(Routing Key)将消息发送到指定的队列。
- 适用场景:精准匹配,例如订单状态更新。
Fanout(广播交换机):
- 将消息广播到所有绑定的队列,而不考虑路由键。
- 适用场景:日志广播、通知推送。
Topic(交换机):
- 根据通配符匹配路由键,将消息路由到符合条件的队列。
- 适用场景:动态路由,例如按照“日志级别.模块名”匹配日志消息。
3. 数据流向说明
Publisher -> Exchange:
- 消息发送者(Publisher)将消息发送到 RabbitMQ 的 Exchange(交换机)。
- 发送时需要指定 Routing Key,用于路由消息。
Exchange -> Queue:
- Exchange 根据绑定关系和路由规则,将消息分发到一个或多个队列(Queue)。
- 如果没有匹配的队列,消息可能会被丢弃或进入死信队列(DLQ,Dead Letter Queue)。
Queue -> Consumer:
- 消费者(Consumer)从队列中拉取消息并进行处理。
- 消费者可以是多个,每个消息只能被一个消费者消费(在同一队列中)。
如何安装RabbitQueue?
day06-MQ基础 - 飞书云文档 (feishu.cn)
查看以上文档的第二部分即可
WorkQueue(工作队列):
工作队列的核心理念
-
任务分发:
- 消息(任务)由一个生产者(Producer)发送到队列。
- 多个消费者(Consumer)从同一个队列中取出消息并处理。
-
负载均衡:
- 消息会按照一定规则分发给消费者,通常每个消费者会处理相同数量的任务(轮询机制)。
- 不同消费者可以根据其处理能力自行调整消费速率。可以通过配置的
prefetch来设置
-
解耦和异步:
- 生产者和消费者无需直接交互。生产者将任务发送到队列即可,消费者独立从队列中获取任务并处理。
Fanout交换机 :广播
通过 Fanout Exchange 可以实现消息的广播分发,将消息发送给所有绑定的队列。结合 Spring 提供的 RabbitTemplate
工具,可以方便地向交换机发送消息,极大简化了开发流程。这种模式非常适用于日志收集、通知广播等场景。
发送消息到交换机的 API 示例
java">@Test
public void testFanoutExchange() {// 定义交换机的名称String exchangeName = "itcast.fanout";// 定义消息内容String message = "Hello, everyone!";// 发送消息到指定交换机// 参数解释:// - exchangeName:交换机名称// - routingKey:路由键(在 Fanout 交换机中会被忽略)// - message:发送的消息内容rabbitTemplate.convertAndSend(exchangeName, "", message);
}
Direct交换机
Direct 交换机会根据消息携带的 RoutingKey
,将消息发送到与交换机绑定且 RoutingKey
完全匹配的队列。
路由机制 | 根据 RoutingKey 精确匹配分发消息 | 将消息广播到所有绑定的队列 |
RoutingKey 是否重要 | 必须匹配 | 不考虑,直接广播 |
绑定关系 | 每个队列可以绑定不同的 RoutingKey | 所有绑定的队列都会接收消息 |
适用场景 | 精确路由,如订单状态更新 | 广播通知,如日志、系统更新 |
Direct 交换机
场景:订单服务需要将不同类型的订单路由到对应的队列(如普通订单和优先订单)。
绑定关系:
- 队列
normal_orders
绑定RoutingKey = normal
。 - 队列
priority_orders
绑定RoutingKey = priority
。
消息发送:
- 消息
RoutingKey = normal
,会被路由到normal_orders
队列。 - 消息
RoutingKey = priority
,会被路由到priority_orders
队列。
生产者代码
生产者将消息发送到 Direct 交换机,并指定不同的 RoutingKey
。
java">import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;@Service
public class SimpleProducer {private final RabbitTemplate rabbitTemplate;public SimpleProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendMessage(String message) {// 直接发送消息到交换机rabbitTemplate.convertAndSend("simple_exchange", "simple_routing_key", message);System.out.println("Sent message: " + message);}
}
消费者代码
定义两个消费者,分别监听普通队列和优先队列。
java">import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class SimpleConsumer {// 监听队列,直接声明队列名称@RabbitListener(queues = "simple_queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}
Topic交换机
opic Exchange 是 RabbitMQ 的一种交换机类型,它根据消息的路由键(RoutingKey
)和绑定键(BindingKey
)的模式匹配规则,将消息路由到一个或多个队列。它是 Direct Exchange 的增强版,支持模糊匹配和通配符。
order.* | order.created | ✔️ |
order.* | order.created.new | ❌ |
匹配规则 | 精确匹配(完全相等) | 模糊匹配(支持 * 和 # 通配符) |
RoutingKey 示例 | order.created | order.* 或 order.# |
适用场景 | 简单、明确的路由需求 | 复杂、动态的路由需求 |
扩展性 | 固定路由,灵活性低 | 动态路由,灵活性高 |
SpringAMQP
Spring AMQP 是 Spring 提供的一个用于与 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 通信的模块化框架。它为基于 Spring 的应用程序集成 AMQP 消息中间件(例如 RabbitMQ)提供了便捷的方法,简化了消息的发送、接收和处理。
1. maven依赖:
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置服务端信息
spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 控制消费者预取的消息数量,处理完一条再处理,如果希望RabbitMQ轮询访问可以不设置这个
什么是 prefetch
?
prefetch
(预取数量) 是 RabbitMQ 的一个设置,用于控制消息消费者(Consumer)每次从队列中预取消息的数量。
它定义了在消费者确认(ACK)之前,RabbitMQ 可以向消费者发送的未确认消息的最大数量。
场景:
假如 prefetch
设置为 1,RabbitMQ 会向消费者一次发送 1 条消息,只有这条消息被确认后,才会发送下一条消息。
如果设置为一个较大的数字(例如 10),RabbitMQ 会一次性发送多条消息,消费者可以并行处理这些消息。
为什么需要设置 prefetch
?
在多个消费者监听同一个队列的场景下,prefetch
设置为 1,可以确保消息在消费者之间更均匀地分布。
防止某些消费者处理速度慢但仍然接收大量消息,导致处理延迟。
简单示例
发送消息
java">@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("exchange_name", "routing_key", message);
}
接收消息
java">@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("Received: " + message);
}
注解式声明队列和交换机
@RabbitListener:
用于监听队列,当队列接收到消息时,触发对应方法处理消息。
@QueueBinding:
声明队列与交换机的绑定关系。
- 包含:
@Queue:声明队列名称和属性。
- @Exchange:声明交换机名称、类型和属性。
- key:指定绑定时使用的路由键。
java"> @RabbitListener(bindings = @QueueBinding(value =@Queue(name="direct.queue1",durable = "true"),exchange = @Exchange(name="hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue(String message) throws InterruptedException {System.err.println("消费者1.....................接收到消息"+ message+","+ LocalTime.now());}
消息转换器
在 RabbitMQ 中,消息默认是以字节数组的形式在队列中传输的。如果我们希望以更方便的方式传递和处理对象(如 JSON、XML 或 Java 对象),就需要使用 消息转换器(Message Converter) 来完成消息的序列化与反序列化。
消息转换器主要负责:
- 序列化:将 Java 对象转换为消息格式(如 JSON、XML 或字节数组)发送到 RabbitMQ。
- 反序列化:将从 RabbitMQ 接收到的消息转换为 Java 对象,供消费者处理。
转换器 | 功能 |
---|---|
SimpleMessageConverter | 默认的消息转换器,支持简单类型(如 String 、byte[] 和 Serializable 对象)。 |
Jackson2JsonMessageConverter | 使用 Jackson 将 Java 对象转换为 JSON 格式,或将 JSON 消息转换为 Java 对象。 |
Jaxb2MarshallerMessageConverter | 使用 JAXB 将 Java 对象转换为 XML 格式,或将 XML 消息转换为 Java 对象。 |
ContentTypeDelegatingMessageConverter | 根据消息的 content_type 动态选择合适的消息转换器。 |
导入依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
记得要给生产者和消费者都需要设置消息转换器。
java"> @Beanpublic MessageConverter messageCoverter(){return new Jackson2JsonMessageConverter();}
- JSON 格式:需要配置
Jackson2JsonMessageConverter
。 - XML 格式:需要配置
Jaxb2MarshallerMessageConverter
。 - 字符串/字节数组:不需要额外配置,默认的
SimpleMessageConverter
即可。