Spring boot框架下的RabbitMQ消息中间件

server/2025/1/18 15:39:18/

1. RabbitMQ 基础概念

1.1 消息处理流程与组件配合

  1. Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。
  2. Exchange(交换机) 接收到消息后,根据 Routing Key(路由键)Binding(绑定规则),决定将消息发送到哪些 Queue(队列)
  3. Queue(队列) 存储消息,等待 Consumer(消费者) 消费。
  4. Consumer(消费者) 从队列中接收并处理消息。

Producer(生产者)

作用:负责发送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。

关键点

  • Producer 只需要知道 Exchange 和 Routing Key,不关心队列。
  • Producer 不直接与队列交互,消息的路由和存储由 Exchange 和 Binding 决定。

代码示例

java">import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);System.out.println("Sent message: " + message);}
}

调用示例

java">producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
  • direct-exchange:目标交换机。
  • key1:消息的路由键。

Exchange(交换机)

作用:接收来自 Producer 的消息,并根据 Routing Key 和 Binding 的配置,决定将消息发送到哪些队列。

Exchange 通常需要手动注册为 Bean。

  • RabbitMQ 的 Exchange 是通过名称来标识的。

  • 在 Spring Boot 中,您通过 @Bean 方法注册 Exchange 时,实际上是将 Exchange 的名称和类型绑定到 RabbitMQ 服务器。

  • 发送消息时,RabbitMQ 客户端会根据 Exchange 的名称找到对应的 Exchange,并根据 Routing Key 将消息路由到队列。

类型

  • Direct Exchange:精确匹配 Routing Key。消息的 Routing Key 必须与 Binding 的 Routing Key 完全一致。

  • Topic Exchange:支持通配符匹配。例如,with("key.*") 可以匹配 key.1key.2 等。

  • Fanout Exchange:忽略 Routing Key,消息会被广播到所有绑定的队列。

  • Headers Exchange:忽略 Routing Key,根据消息头属性匹配。

代码示例(定义交换机):

java">import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ExchangeConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct-exchange");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout-exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic-exchange");}@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers-exchange");}
}

Queue(队列)

作用:消息的存储容器,等待消费者从中取出消息进行处理。

Queue 也需要手动注册为 Bean。Spring Boot 不会自动注册队列,因为队列的名称和属性(如是否持久化、是否排他等)需要根据业务需求进行配置。

关键点

  • 消息会保存在队列中,直到被消费。
  • 队列可以是持久化的(重启 RabbitMQ 后消息仍然存在)或非持久化的。

代码示例(定义队列):

java">import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QueueConfig {@Beanpublic Queue demoQueue() {return new Queue("demo-queue", true); // 持久化队列}
}

Routing Key(路由键)

作用:决定消息如何从交换机路由到队列。

关键点

  • Routing Key 由 Producer 指定。
  • 在 Direct 和 Topic 类型的 Exchange 中,Routing Key 决定队列是否接收消息。

Binding(绑定)

  • 作用:将队列与交换机连接,并定义路由规则。
  • 关键点
    • Binding 定义了队列接受消息的条件。
    • 结合 Routing Key 和交换机类型,共同决定消息的路由方式。

代码示例(定义绑定):

java">import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class BindingConfig {@Beanpublic Binding binding(Queue demoQueue, DirectExchange directExchange) {return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");}
}

with("key1") 的作用是 指定 Binding 的 Routing Key。它的含义是:

  • 当消息发送到 Exchange 时,Exchange 会根据消息的 Routing Key 和 Binding 的 Routing Key 进行匹配。

  • 如果匹配成功,消息会被路由到对应的队列;如果匹配失败,消息会被丢弃或进入死信队列(如果有配置)。


Consumer(消费者)

作用:从队列中接收并处理消息。

关键点

  • 消费者与队列直接关联。
  • 多个消费者可以监听同一队列,实现负载均衡。

代码示例

java">import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class Consumer {@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

1.2 RabbitMQ 消息传输模型

点对点模型

定义:消息从生产者发送到队列,由消费者从队列中接收,消息只能被一个消费者消费。

实现

  • 使用默认交换机(空字符串 "")。
  • 直接将消息发送到队列。

代码示例

java">rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");

发布订阅模型

定义:生产者将消息发送到 Fanout 类型的交换机,消息会广播到所有绑定的队列。

实现

  • 不需要 Routing Key。
  • 所有绑定到 Fanout 交换机的队列都会接收消息。

代码示例

java">rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");

路由模型

定义:生产者将消息发送到 Direct 类型的交换机,根据 Routing Key 精确匹配队列。

实现

  • 队列通过 Binding 绑定到交换机时,指定 Routing Key。
  • 消息的 Routing Key 必须与 Binding 的 Routing Key 一致。

代码示例

java">rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");

2. 环境准备

2.1 安装与配置 RabbitMQ

下载 Docker

  • 访问 Docker 官方网站:Docker: Accelerated Container Application Development。

  • 根据您的操作系统(Windows、macOS 或 Linux)下载并安装 Docker Desktop。

启动 Docker

  • 安装完成后,启动 Docker Desktop。

  • 确保 Docker 正在运行(任务栏或菜单栏中可以看到 Docker 图标)。

使用 Docker 快速部署 RabbitMQ

Docker 是部署 RabbitMQ 的最简单方式。通过以下命令,您可以快速启动一个 RabbitMQ 容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

参数说明

  • -d:以后台模式运行容器。

  • --name rabbitmq:为容器指定名称(rabbitmq)。

  • -p 5672:5672:将容器的 5672 端口映射到主机的 5672 端口(RabbitMQ 的消息通信端口)。

  • -p 15672:15672:将容器的 15672 端口映射到主机的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。

  • rabbitmq:management:使用带有管理插件的 RabbitMQ 镜像。

验证 RabbitMQ 是否运行

运行以下命令,查看容器是否正常运行:

docker ps

如果看到 rabbitmq 容器正在运行,说明 RabbitMQ 已成功启动。


2.2 使用 RabbitMQ 管理插件

RabbitMQ 提供了一个 Web 管理界面,方便您监控和管理 RabbitMQ。

访问管理界面

  1. 打开浏览器,访问 http://localhost:15672

  2. 使用默认用户名和密码登录:

    • 用户名:guest

    • 密码:guest

管理界面功能

  • Overview:查看 RabbitMQ 的整体状态,如连接数、队列数、消息速率等。

  • Connections:查看当前连接到 RabbitMQ 的客户端。

  • Channels:查看当前打开的通道。

  • Exchanges:查看和管理 Exchange。

  • Queues:查看和管理 Queue。

  • Admin:管理用户和权限。

2.3 用户与权限配置

默认情况下,RabbitMQ 只有一个用户 guest,密码也是 guest。为了安全性和权限管理,建议创建新用户并分配权限。

1. 创建新用户

在 RabbitMQ 管理界面中:

  • 点击顶部导航栏的 Admin

  • 在用户列表下方,点击 Add a user

  • 输入用户名和密码,例如:

    • 用户名:admin

    • 密码:admin123

  • 点击 Add user 完成创建。

2. 分配权限

  • 在用户列表中,找到刚创建的用户(如 admin)。

  • 点击用户右侧的 Set permission

  • 在权限设置页面:

    • Virtual Host:选择 /(默认的虚拟主机)。

    • Configure:输入 .*,表示允许用户配置所有资源。

    • Write:输入 .*,表示允许用户写入所有资源。

    • Read:输入 .*,表示允许用户读取所有资源。

  • 点击 Set permission 完成权限分配。

3. 使用新用户登录

  • 退出当前用户(点击右上角的 guest,选择 Log out)。

  • 使用新用户(如 admin)登录。

2.4  Spring Boot 中引入 RabbitMQ 依赖 

在 pom.xml 中添加以下依赖:

<dependencies><!-- RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>

spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依赖,它包含了以下内容:

  • RabbitMQ 客户端库

    • 自动引入 RabbitMQ 的 Java 客户端库(amqp-client),用于与 RabbitMQ 服务器通信。

  • Spring AMQP 支持

    • 提供了 Spring 对 AMQP(Advanced Message Queuing Protocol)的支持,包括 RabbitTemplate@RabbitListener 等。


2.5 Spring Boot 配置 RabbitMQ

在 Spring Boot 项目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的连接信息。

示例配置

# RabbitMQ 连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin123

配置说明

  • spring.rabbitmq.host:RabbitMQ 服务器地址(默认 localhost)。

  • spring.rabbitmq.port:RabbitMQ 消息通信端口(默认 5672)。

  • spring.rabbitmq.username:RabbitMQ 用户名。

  • spring.rabbitmq.password:RabbitMQ 密码。


3. Spring Boot 集成 RabbitMQ 的消息生产和消费

3.1 消息生产者(Producer)

在 Spring Boot 中,我们使用 RabbitTemplate 来发送消息。它由 spring-boot-starter-amqp 自动配置成为一个 Bean,可直接通过 @Autowired 注入。

如果 message 不是 String 类型的处理

  • Spring AMQP(spring-boot-starter-amqp)在使用 RabbitTemplate 时,默认的消息转换器(MessageConverter)通常会将对象序列化为 JSON 或者将字符串消息转换为字节。
  • 如果你的业务数据不是 String,常见做法是:
    1. 在发送时把非字符串对象序列化(如转换为 JSON 字符串);
    2. 或者配置自定义的 MessageConverter,让 Spring 帮你把对象自动序列化/反序列化。

典型做法:手动序列化为 JSON 再发送

java">@Service
public class CustomObjectProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendCustomObject(String queueName, MyCustomObject obj) {// 1. 将自定义对象序列化为 JSON 字符串String jsonString = new Gson().toJson(obj);// 2. 发送 JSON 字符串到 RabbitMQrabbitTemplate.convertAndSend(queueName, jsonString);}
}
  • 在消费者端,你也可以将消息(JSON 字符串)反序列化为 MyCustomObject

配置自定义 Converter(可选)

  • Spring AMQP 提供了 Jackson2JsonMessageConverter 等现成转换器。
java">@Configuration
public class RabbitConfig {@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置 RabbitTemplate 使用该转换器@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());return template;}
}
  • 这样一来,rabbitTemplate.convertAndSend(queueName, myObject) 会自动把 myObject 转成 JSON 发送;消费者端则自动解析为同样的 Java 对象。

1)基本消息发送

场景
将消息直接发送到指定的队列,跳过交换机的路由,让 RabbitMQ 把消息放到这个队列中。

核心代码示例

java">@Service
public class BasicProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;  // 1.自动注入的 RabbitTemplate/*** 2.发送基本消息到指定的队列* @param queueName  目标队列名称* @param message    消息内容*/public void sendToQueue(String queueName, String message) {// 3.调用 convertAndSend,直接将消息放入指定队列rabbitTemplate.convertAndSend(queueName, message);System.out.println("Message sent to queue: " + queueName + ", content: " + message);}
}

代码详解

  • @Autowired private RabbitTemplate rabbitTemplate;`

    • Spring Boot 自动为我们配置了 RabbitTemplate,不用手动定义 Bean。

    • 通过依赖注入即可使用所有与 RabbitMQ 交互的方法。

  • public void sendToQueue(String queueName, String message)

    • 方法参数包括:

      • queueName: 目标队列的名称。

      • message: 要发送的字符串类型消息内容。

  • rabbitTemplate.convertAndSend(queueName, message)

    • convertAndSend 方法会将消息转换(转换为字节)并发送到指定队列。

    • 如果该队列不存在,RabbitMQ 会尝试自动创建(前提是 Broker 端配置允许自动创建队列)。


2)发送到交换机

场景
将消息发送到一个交换机(Exchange),再由交换机通过 Routing Key 将消息路由到匹配的队列中。

核心代码示例

java">@Service
public class ExchangeProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息到指定交换机* @param exchangeName  交换机名称* @param routingKey    路由键* @param message       消息内容*/public void sendToExchange(String exchangeName, String routingKey, String message) {// 将消息发送到 exchangeName 指定的交换机,使用 routingKey 进行路由rabbitTemplate.convertAndSend(exchangeName, routingKey, message);System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);}
}

代码详解

  • exchangeName

    • 要发送到的交换机名称,例如 "direct-exchange""fanout-exchange" 等。
  • routingKey

    • 路由键,用来匹配绑定(Binding)。
    • 例如:对 DirectExchange 而言,需要队列绑定时的路由键与发送时的路由键相同,消息才能到达队列。
  • rabbitTemplate.convertAndSend(exchangeName, routingKey, message)

    • 将消息先发送到交换机,再根据路由键将消息投递到目标队列。

3)发送带消息属性的消息

场景
需要为消息设置 TTL(过期时间)或优先级等属性,控制消息在队列中的行为。

核心代码示例

java">@Service
public class PropertyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送带消息属性的消息(如 TTL, 优先级)*/public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {// 1.创建 MessageProperties 对象,用于指定消息的属性MessageProperties properties = new MessageProperties();properties.setExpiration("10000"); // 过期时间:10秒 (单位:毫秒)properties.setPriority(5);        // 优先级设为 5// 2.根据消息体和属性构建 Message 对象Message message = new Message(messageContent.getBytes(), properties);// 3.使用 send 方法(而非 convertAndSend)直接发送 Message 对象rabbitTemplate.send(exchange, routingKey, message);System.out.println("Message with properties sent: " + messageContent);}
}

代码详解

  • MessageProperties properties = new MessageProperties();

    • MessageProperties 用于设置 AMQP 协议层的各种消息头信息。
  • properties.setExpiration("10000");

    • setExpiration 设置消息的 TTL(Time-To-Live),单位是毫秒。
    • 如果到达时间后消息仍未被消费,RabbitMQ 会将其从队列中移除并送入死信队列(如果配置了死信队列)。
  • properties.setPriority(5);

    • 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定 x-max-priority)。
  • new Message(messageContent.getBytes(), properties)

    • 将纯文本消息转换为 Message 对象,结合了消息属性和消息体。
  • rabbitTemplate.send(exchange, routingKey, message);

    • convertAndSend 不同,它不会尝试进行消息转换(如 JSON、字符串),而是直接发送完整的 AMQP Message 对象。

Message 构造函数 

java">public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties;
}
  • body:消息体的字节数组。
  • messageProperties:AMQP 的消息属性,包括 TTL、优先级、headers 等。、

如果消息体不是String类型

  • 手动转换为字节:你可以先将自定义对象转换为字节数组(例如通过 JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一个参数。
java">MyCustomObject obj = new MyCustomObject();
// 假设你想用 JSON
String jsonString = new Gson().toJson(obj);
byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);MessageProperties properties = new MessageProperties();
// 设置一些属性
Message message = new Message(body, properties);
  • 为什么不会自动转 JSON?使用 new Message(...) 构造方法是“纯” AMQP 层的做法,不会调用 Spring 的转换器,因此你必须自己处理序列化。
  • 使用 Message 构造函数 时,你必须自行处理对象到 byte[] 的转换(无论是字符串、JSON,还是其他格式)。
  • 如果想让 Spring AMQP 自动转换,你通常使用 rabbitTemplate.convertAndSend(Object msg) 这种高级 API,或者配置自定义 MessageConverter

3.2 消息消费者(Consumer)

消费者的核心功能是在指定的队列中监听消息,并根据配置的确认模式(自动确认或手动确认)对消息进行处理或拒绝。

1)监听队列并消费消息

核心代码示例(自动确认模式)

java">@Service
public class Consumer {/*** 使用注解 @RabbitListener 指定要监听的队列* 由于默认为 auto-ack 模式,* 当消息到达后,RabbitMQ 会自动确认并从队列中删除该消息。*/@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {// 1.从 queueName 队列中取到的消息内容System.out.println("Received message: " + message);// 2.在 auto-ack 模式下,无需手动 ack//  如果这里出现异常,RabbitMQ 不会再次发送消息给消费者,消息会丢失。}
}

代码详解(自动确认模式)

  • @RabbitListener(queues = "demo-queue")

    • 声明监听名为 demo-queue 的队列。
    • 一旦有新消息到达该队列,就会自动回调此方法。
  • public void receiveMessage(String message)

    • 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为 String 并注入到 message 中。
  • 自动确认(auto-ack)的风险

    • 如果消费者在处理消息时抛出异常,消息已经被 RabbitMQ 标记为“已确认”,不会再重新发送或进入死信队列,导致消息丢失。

2)确认机制

自动确认(auto-ack)
  • 行为

    • 当消费者从队列中获取消息后,RabbitMQ 会立即将该消息标记为已确认(acknowledged),并从队列中删除。

  • 问题

    • 如果消息处理失败(例如消费者抛出异常),消息已经被确认并从队列中删除,无法重新处理。

    • 如果消费者崩溃或断开连接,未处理的消息会丢失。

  • 适用场景

    • 对消息处理的可靠性要求不高的场景。


手动确认(manual-ack)

  • 行为

    • 消费者处理完消息后,必须显式调用 basicAck 方法确认消息。

    • 如果消息处理失败,可以调用 basicNack 或 basicReject 方法拒绝消息。

  • 优点

    • 确保消息处理的可靠性。

    • 支持消息重新入队或发送到死信队列。

  • 适用场景

    • 对消息处理的可靠性要求较高的场景。

核心代码示例:

java">@Service
public class ManualAckConsumer {/*** 在 application.properties 中配置:* spring.rabbitmq.listener.simple.acknowledge-mode=manual* 使得 RabbitMQ 使用手动确认模式*/@RabbitListener(queues = "demo-queue")public void receiveMessage(Message message, Channel channel) throws IOException {try {// 1.从消息中获取消息体String body = new String(message.getBody());System.out.println("Processing message: " + body);// 2.如果业务处理成功,则调用 basicAck 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {System.err.println("Message processing failed: " + e.getMessage());// 3.如果处理失败,需要决定是重新入队还是拒绝并进入死信队列// requeue = true  -> 重新入队// requeue = false -> 丢弃或进入死信队列(根据队列配置)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}

代码详解

  • 配置手动确认

    • application.properties 添加
      java">spring.rabbitmq.listener.simple.acknowledge-mode=manual
      
    • 表示 Spring AMQP 使用手动确认模式(manual-ack)。
  • public void receiveMessage(Message message, Channel channel)

    • 与自动确认不同,这里不仅接收字符串,还接收了 org.springframework.amqp.core.Message 对象和 com.rabbitmq.client.Channel
    • Message:包含消息体(body)和消息属性(headers 等)。
    • Channel:给我们提供了 basicAck, basicNack, basicReject 等底层 AMQP 操作。
  • 手动确认成功

    • channel.basicAck(deliveryTag, multiple)
      • deliveryTag:本次消息的唯一标记,从 message.getMessageProperties().getDeliveryTag() 获取。
      • multiple = false:只确认当前这条消息。

basicAck(long deliveryTag, boolean multiple)

  • 这里的 deliveryTag 并不是在你构造 Message 时生成的,而是 RabbitMQ Broker 在投递消息给消费者时由底层 AMQP 协议自动分配的一个递增的序号
  • java">long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
  • 手动确认失败

    • channel.basicNack(deliveryTag, multiple, requeue)basicReject
      • requeue = true:将消息重新放回队列等待下一次消费(可能导致死循环,如处理一直失败)。
      • requeue = false:拒绝消息,若配置了死信队列,则进入死信队列;否则丢弃消息。

3)处理消费失败

自动确认模式下的处理

  • 在自动确认模式下,如果消息处理失败,RabbitMQ 不会重新发送消息,因为消息已经被确认并从队列中删除。

  • 问题

    • 消息丢失,无法重新处理。

手动确认模式下的处理

  • 在手动确认模式下,如果消息处理失败,可以通过以下方式处理:
  • 重新入队

    • 调用 basicNack 或 basicReject 方法,并将 requeue 参数设置为 true

    • 消息会重新进入队列,等待下一次消费。

  • 发送到死信队列

    • 调用 basicNack 或 basicReject 方法,并将 requeue 参数设置为 false

    • 如果队列配置了死信队列,消息会被发送到死信队列。

重试机制(Spring AMQP 提供的简单重试)(只支持手动确认机制)

是重试失败了才会将消息重新入队 ,所以重试在前,重新入队在后

java"># 启用重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 间隔倍数
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重试间隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
  • Spring AMQP 提供了 重试机制,可以在消费者处理消息失败时,自动进行多次重试,而不是直接将消息重新入队。

行为

  • 当消息处理失败时,Spring AMQP 会在 本地 进行重试(即不将消息重新入队),直到达到最大重试次数。

  • 如果重试次数用尽,消息会被拒绝(basicNack 或 basicReject),并根据配置决定是否重新入队或发送到死信队列。

死信队列(DLQ)

  • 当消息被拒绝或过期时,RabbitMQ 会将其发送到我们配置的死信交换机(DLX),再路由到死信队列(DLQ)。

  • 配置示例

    java">@Configuration
    public class RabbitConfig {@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // 指定死信交换机.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由键.build();}@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead-letter-exchange");}@Beanpublic Queue deadLetterQueue() {return new Queue("dead-letter-queue");}@Beanpublic Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}
    }
    
  • 原理

    • 正常队列通过 x-dead-letter-exchange 指定死信交换机,一旦消息被拒绝(requeue=false)或超时(TTL 到期),RabbitMQ 会把消息发送到 dead-letter-exchange
    • dead-letter-exchangedead-letter-queue 进行绑定(路由键 dead-letter-routing-key),从而实现死信队列的存储。
  • 重新入队 vs 发送到死信队列

    • 重新入队channel.basicNack(deliveryTag, false, true)
      • 适用于临时性错误,比如数据库锁冲突、网络抖动等,等待后续重新处理。
    • 发送到死信队列channel.basicNack(deliveryTag, false, false)
      • 适用于永久性错误,比如消息格式无法解析,或业务逻辑指定不应再尝试。

http://www.ppmy.cn/server/159380.html

相关文章

基于网络爬虫技术的网络新闻分析【源码+文档+部署讲解】

目 录 1 绪论 1.1 论文研究背景与意义 1.2 论文研究内容 2 系统需求分析 2.1 系统需求概述 2.2 系统需求分析 2.2.1 系统功能要求 2.2.2 系统IPO图 2.2 系统非功能性需求分析 3系统概要设计 3.1 设计约束 3.1.1需求约束 3.1.2设计策略 3.1.3 技术实现 3.3 模块…

DeepSeek-v3在训练和推理方面的优化

1. 基础架构&#xff1a;MLA&#xff0c;大幅减少了KV cache大小。&#xff08;计算量能不能减少&#xff1f;&#xff09; 2. 基础架构&#xff1a;MoE&#xff0c;同等参数量&#xff08;模型的”能力“&#xff09;下&#xff0c;训练、推理的计算量大幅减少。 3. MoE的load…

angular项目知识点

目录 前言 一、创建组件时不生成spec.ts 二、angular.json的详解 三、--prod代表意思 四、--base-href和--output-path 前言 记录一下对于angular项目里的配置文件的解析&#xff0c;以前都没有具体了解过 一、创建组件时不生成spec.ts 在使用指令ng g c componenet时&a…

微软确认Win10停更不碍Microsoft 365使用!未来是否更新成谜

快科技1月17日消息&#xff0c;微软澄清了关于Windows 10停止支持后Microsoft 365办公套件使用情况的误解。 前两天微软更新支持文档&#xff0c;表示2025年10月14日Windows 10停止支持之后&#xff0c;Microsoft 365应用程序将不再支持Windows 10设备&#xff0c;引发用户担忧…

【 PID 算法 】PID 算法基础

一、简介 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&#xff09;、Differential&#xff08;微分&#xff09;的缩写。也就是说&#xff0c;PID算法是结合这三种环节在一起的。粘一下百度百科中的东西吧。 顾名思义&#xff0c;…

MAC环境安装(卸载)软件

MAC环境安装&#xff08;卸载&#xff09;软件 mac配置jdk1.7和jdk1.8配置 jdknode安装node&#xff0c;并实现不同版本的切换背景 卸载node从node官网下载pkg安装的node卸载用 homebrew 安装的node如果你感觉删的不够干净&#xff0c;可以再细分删除验证删除结果 在macOS下创建…

Windows图形界面(GUI)-QT-C/C++ - QT 对话窗口

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 目录 模态对话框 非模态对话框 文件对话框 基本概念 静态函数 常见属性 颜色对话框 基本概念 静态函数 常见属性 字体对话框 基本概念 静态函数 常见属性 输入对话框 基本概念 …

【spark源码修改】hive3.1.3 spark3.5.4编译,需要修改源码,最终编译成功

【spark源码修改】hive3.1.3 spark3.5.4编译,需要修改源码,最终编译成功 1. 准备安装包与maven编译环境1.1 安装环境准备1.2 修改pom1.3 打包命令2. 编译与问题解决2.1 开始编译 失败, 缺包pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde2.2 Hive Spark Remote Client 模块…