Spring Boot 3 集成 RabbitMQ 实践指南

ops/2025/2/28 12:55:37/

Spring Boot 3 集成 RabbitMQ 实践指南

1. RabbitMQ 核心原理

1.1 什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,使用Erlang语言开发,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。

1.2 核心概念

1.2.1 基础组件
  1. Producer(生产者)

    • 消息的发送者
    • 负责创建消息并发布到RabbitMQ中
  2. Consumer(消费者)

    • 消息的接收者
    • 连接到RabbitMQ服务器并订阅队列
  3. Exchange(交换机)

    • 接收生产者发送的消息并根据路由规则转发到队列
    • 类型:
      • Direct Exchange:根据routing key精确匹配
      • Topic Exchange:根据routing key模式匹配
      • Fanout Exchange:广播到所有绑定队列
      • Headers Exchange:根据消息属性匹配
  4. Queue(队列)

    • 消息存储的地方
    • 支持持久化、临时、自动删除等特性
  5. Binding(绑定)

    • 交换机和队列之间的虚拟连接
    • 定义消息路由规则
1.2.2 高级特性
  1. 消息持久化

    • 交换机持久化:创建时设置durable=true
    • 队列持久化:创建时设置durable=true
    • 消息持久化:设置delivery-mode=2
  2. 消息确认机制

    • 生产者确认:Publisher Confirm和Return机制
    • 消费者确认:自动确认、手动确认、批量确认
  3. 死信队列(DLX)

    • 消息被拒绝且不重新入队
    • 消息过期(TTL)
    • 队列达到最大长度

1.3 应用场景

  1. 异步处理

    • 发送邮件、短信通知
    • 日志处理、报表生成
    • 文件处理、图片处理
  2. 应用解耦

    • 系统间通信
    • 服务解耦
    • 流程分离
  3. 流量控制

    • 削峰填谷
    • 请求缓冲
    • 流量整形
  4. 定时任务

    • 延迟队列
    • 定时处理
    • 任务调度

2. 环境搭建

2.1 基础环境

  • Spring Boot: 3.x
  • Java: 17+
  • RabbitMQ: 3.12+
  • Maven/Gradle

2.2 依赖配置

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

2.3 基础配置

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /# 消息确认配置publisher-confirm-type: correlated  # 开启发布确认publisher-returns: true             # 开启发布返回template:mandatory: true                   # 消息路由失败返回# 消费者配置listener:simple:acknowledge-mode: manual        # 手动确认prefetch: 1                     # 每次获取消息数量retry:enabled: true                 # 开启重试initial-interval: 1000        # 重试间隔时间max-attempts: 3               # 最大重试次数multiplier: 1.0              # 重试时间乘数# SSL配置(可选)ssl:enabled: falsekey-store: classpath:keystore.p12key-store-password: passwordtrust-store: classpath:truststore.p12trust-store-password: password

3. 核心配置类

3.1 RabbitMQ配置类

@Configuration
@EnableRabbit
public class RabbitMQConfig {// 交换机名称public static final String BUSINESS_EXCHANGE = "business.exchange";public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";// 队列名称public static final String BUSINESS_QUEUE = "business.queue";public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";// 路由键public static final String BUSINESS_KEY = "business.key";public static final String DEAD_LETTER_KEY = "dead.letter.key";// 业务交换机@Beanpublic DirectExchange businessExchange() {return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE).durable(true).build();}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}// 业务队列@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>(3);// 消息过期时间args.put("x-message-ttl", 60000);// 队列最大长度args.put("x-max-length", 1000);// 死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();}// 死信队列@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}// 业务绑定@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_KEY);}// 死信绑定@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);}// 消息转换器@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// RabbitTemplate配置@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}
}

3.2 消息确认配置

@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功: correlationData={}", correlationData);} else {log.error("消息发送到交换机失败: correlationData={}, cause={}", correlationData, cause);// 处理失败逻辑,如重试、告警等}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息路由到队列失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",returned.getExchange(),returned.getRoutingKey(),returned.getReplyCode(),returned.getReplyText(),new String(returned.getMessage().getBody()));// 处理失败逻辑,如重试、告警等}
}

4. 消息生产者

4.1 消息发送服务

@Service
@Slf4j
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(Object message, String exchange, String routingKey) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());try {rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);log.info("消息发送成功: message={}, exchange={}, routingKey={}, correlationData={}",message, exchange, routingKey, correlationData);} catch (Exception e) {log.error("消息发送异常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",message, exchange, routingKey, correlationData, e.getMessage());throw new RuntimeException("消息发送失败", e);}}public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());MessagePostProcessor messagePostProcessor = msg -> {msg.getMessageProperties().setDelay((int) delayMillis);return msg;};try {rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);log.info("延迟消息发送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",message, exchange, routingKey, delayMillis, correlationData);} catch (Exception e) {log.error("延迟消息发送异常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",message, exchange, routingKey, delayMillis, correlationData, e.getMessage());throw new RuntimeException("延迟消息发送失败", e);}}
}

5. 消息消费者

5.1 消息处理服务

@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 获取消息内容String messageBody = new String(message.getBody());log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);// 业务处理processMessage(messageBody);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("消息处理成功: deliveryTag={}", deliveryTag);} catch (Exception e) {log.error("消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());// 判断是否重新投递if (message.getMessageProperties().getRedelivered()) {log.error("消息已重试,拒绝消息: deliveryTag={}", deliveryTag);channel.basicReject(deliveryTag, false);} else {log.info("消息首次处理失败,重新投递: deliveryTag={}", deliveryTag);channel.basicNack(deliveryTag, false, true);}}}private void processMessage(String message) {// 实现具体的业务逻辑log.info("处理消息: {}", message);}
}

5.2 死信消息处理

@Service
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)public void handleDeadLetter(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String messageBody = new String(message.getBody());log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);// 死信消息处理逻辑processDeadLetter(messageBody);channel.basicAck(deliveryTag, false);log.info("死信消息处理成功: deliveryTag={}", deliveryTag);} catch (Exception e) {log.error("死信消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());channel.basicReject(deliveryTag, false);}}private void processDeadLetter(String message) {// 实现死信消息处理逻辑log.info("处理死信消息: {}", message);}
}

6. 接口控制器

@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {@Autowiredprivate MessageProducer messageProducer;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {try {messageProducer.sendMessage(message.getContent(),RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_KEY);return ResponseEntity.ok("消息发送成功");} catch (Exception e) {log.error("消息发送失败: {}", e.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("消息发送失败: " + e.getMessage());}}@PostMapping("/send/delay")public ResponseEntity<String> sendDelayMessage(@RequestBody MessageDTO message,@RequestParam long delayMillis) {try {messageProducer.sendDelayMessage(message.getContent(),RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_KEY,delayMillis);return ResponseEntity.ok("延迟消息发送成功");} catch (Exception e) {log.error("延迟消息发送失败: {}", e.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("延迟消息发送失败: " + e.getMessage());}}
}

7. 监控与运维

7.1 RabbitMQ管理界面

  • 访问地址:http://localhost:15672
  • 默认账号:guest/guest
  • 主要功能:
    • 队列监控
    • 交换机管理
    • 连接状态
    • 消息追踪

7.2 Prometheus + Grafana监控

# prometheus.yml
scrape_configs:- job_name: 'rabbitmq'static_configs:- targets: ['localhost:15692']

7.3 日志配置

logging:level:org.springframework.amqp: INFOcom.your.package: DEBUGpattern:console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7.4 告警配置

@Configuration
public class RabbitMQAlertConfig {@Value("${alert.dingtalk.webhook}")private String webhookUrl;@Beanpublic AlertService alertService() {return new DingTalkAlertService(webhookUrl);}
}

8. 最佳实践

8.1 消息幂等性处理

@Service
public class MessageIdempotentHandler {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public boolean isProcessed(String messageId) {String key = "mq:processed:" + messageId;return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));}
}

8.2 消息重试策略

@Configuration
public class RetryConfig {@Beanpublic RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000);retryTemplate.setBackOffPolicy(backOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;}
}

8.3 消息序列化

@Configuration
public class MessageConverterConfig {@Beanpublic MessageConverter jsonMessageConverter() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();converter.setCreateMessageIds(true);return converter;}
}

8.4 消息追踪

@Aspect
@Component
@Slf4j
public class MessageTraceAspect {@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {String messageId = MDC.get("messageId");log.info("开始处理消息: messageId={}", messageId);try {Object result = joinPoint.proceed();log.info("消息处理完成: messageId={}", messageId);return result;} catch (Exception e) {log.error("消息处理异常: messageId={}, error={}", messageId, e.getMessage());throw e;}}
}

9. 常见问题与解决方案

9.1 消息丢失问题

  1. 生产者确认机制
  2. 消息持久化
  3. 手动确认模式
  4. 集群高可用

9.2 消息重复消费

  1. 幂等性处理
  2. 消息去重
  3. 业务检查

9.3 消息堆积问题

  1. 增加消费者数量
  2. 提高处理效率
  3. 队列分片
  4. 死信队列处理

9.4 性能优化

  1. 合理设置预取数量
  2. 批量确认消息
  3. 消息压缩
  4. 连接池优化

10. 高可用部署

10.1 集群配置

spring:rabbitmq:addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672username: adminpassword: passwordvirtual-host: /

10.2 镜像队列

# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

10.3 负载均衡

# nginx.conf
upstream rabbitmq_cluster {server rabbit1:15672;server rabbit2:15672;server rabbit3:15672;
}

11. 参考资源

  1. Spring AMQP官方文档
  2. RabbitMQ官方文档
  3. Spring Boot官方文档

http://www.ppmy.cn/ops/161967.html

相关文章

【FL0086】基于SSM和微信小程序的垃圾分类小程序

&#x1f9d1;‍&#x1f4bb;博主介绍&#x1f9d1;‍&#x1f4bb; 全网粉丝10W,CSDN全栈领域优质创作者&#xff0c;博客之星、掘金/知乎/b站/华为云/阿里云等平台优质作者、专注于Java、小程序/APP、python、大数据等技术领域和毕业项目实战&#xff0c;以及程序定制化开发…

etcd 3.15 三节点集群管理指南

本文档旨在提供 etcd 3.15 版本的三节点集群管理指南&#xff0c;涵盖节点的新增、删除、状态检查、数据库备份和恢复等操作。 1. 环境准备 1.1 系统要求 操作系统&#xff1a;Linux&#xff08;推荐 Ubuntu 18.04 或 CentOS 7&#xff09; 内存&#xff1a;至少 2GB 磁盘&a…

编写第一个 C++ 程序 – Hello World 示例

“Hello World”程序是学习任何编程语言的第一步&#xff0c;也是您将学习的最直接的程序之一。它是用于演示编码过程如何工作的基本程序。您所要做的就是在输出屏幕上显示 “Hello World”。 C Hello World 程序 下面是在控制台屏幕上打印 “Hello World” 的 C 程序。 // …

SQL Server 视图的更新排查及清除缓存

目录 前言排查方向 前言 获取数据的时候&#xff0c;发现数据少了两个字段值&#xff0c;归根原因是Java中的实体类少写了两个&#xff0c;后续补充上就好了&#xff01; 但也正了解到视图中的刷新原理以及排查机制&#xff0c;如果确认是视图等引起&#xff0c;可结合如下文…

nginx 搭建 IPv6 -> IPv4 反向代理服务器

背景 在实际生产过程中&#xff0c;由于各种原因&#xff0c;我们的在线服务搭建在火山云服务器上&#xff0c;使用火山云包括 ECS、CLB、PLB 等组件进行网络通信&#xff0c;并且通过专线接受来自某公司内部流量。但是在大概 22~23 年&#xff0c;某公司要把所有网络流量变为…

C#与AI的交互(以DeepSeek为例)

C#与ai的交互 与AI的交互使用的Http请求的方式&#xff0c;通过发送请求&#xff0c;服务器响应ai生成的文本 下面是完整的代码&#xff0c;我这里使用的是Ollama本地部署的deepseek&#xff0c;在联网调用api时&#xff0c;则url会有不同 public class OllamaRequester {[Se…

十一、大数据治理平台总体功能架构

大数据治理平台的功能架构图中心主题&#xff1a;数据治理 核心重点是建立健全大数据资产管理框架&#xff0c;确保数据质量、安全性、可访问性和合规性。 大数据治理平台总体功能架构图 关键功能领域 1.数据资产平台&#xff08;左侧&#xff09; 此部分主要关注数据资产本身…

Stale file handle

Stale file handle 错误通常表示文件句柄已失效&#xff0c;这往往在多种场景下出现&#xff0c;下面为你详细分析可能的原因及对应的解决办法。 可能的原因 NFS&#xff08;网络文件系统&#xff09;挂载问题 当 NFS 服务器端的文件系统结构发生改变&#xff0c;例如文件被移…