RabbitMQ保证消息的可靠性

embedded/2024/9/22 11:04:31/

一、背景

消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。
在这里插入图片描述

1.1 生产者丢失

  • 生产者发送消息时连接MQ失败
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

1.2 MQ丢失

  • 消息到达MQ,保存到队列后,尚未消费就突然宕机

1.3 消费者丢失

  • 消息接收后尚未处理突然宕机
  • 消息接收后处理过程中抛出异常

1.4 总结(三方面入手)

  • 确保生产者成功把消息发送到MQ
  • 确保MQ不会丢失消息
  • 确保消费者成功处理消息

二、解决方案

配置

java">package com.qiangesoft.rabbitmq.producer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息发送配置** @author qiangesoft* @date 2024-05-08*/
@Configuration
public class MessageConfig {public static final String EXCHANGE = "simple.exchange";public static final String QUEUE = "simple.queue";public static final String ROUTING_KEY = "simple";@Beanpublic DirectExchange simpleExchange() {return ExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();}@Beanpublic Queue simpleQueue() {return QueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();}@Beanpublic Binding simpleBinding(Queue simpleQueue, DirectExchange simpleExchange) {return BindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY);}}

2.1 生产者

2.1.1 生产者重试机制

背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
解决方案:配置连接超时时间、重试机制。

spring:rabbitmq:    # 设置MQ的连接超时时间connection-timeout: 1stemplate:# 连接重试机制retry:enabled: true# 失败后的初始等待时间initial-interval: 1000ms# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermultiplier: 1# 最大重试次数max-attempts: 3

2.1.2 生产者确认机制

背景:

  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

解决方案:配置Publisher Confirm机制、Publisher Return机制。

spring:rabbitmq:# 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机publisher-confirm-type: correlated# 开启publisher return机制,确保消息到达队列publisher-returns: true
定义ConfirmCallback
java">package com.qiangesoft.rabbitmq.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 生产者** @author qiangesoft* @date 2024-05-08*/
@Slf4j
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredpublic RabbitTemplate rabbitTemplate;@GetMapping("/send")public void send(String content) {CorrelationData correlation = getCorrelationData();Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 正常发送rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);}private static CorrelationData getCorrelationData() {// 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】CorrelationData correlation = new CorrelationData();correlation.getFuture().addCallback(new ListenableFutureCallback<>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.info("触发【publisher confirm】机制");if (result.isAck()) {log.info("消息发送成功到达交换机,ID:{}", correlation.getId());} else {// 消息发送失败log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason());}}});return correlation;}}
定义ReturnCallback
java">package com.qiangesoft.rabbitmq.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 消息路由失败回退配置** @author qiangesoft* @date 2024-05-08*/
@Slf4j
@Configuration
public class ReturnsCallbackConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("触发【publisher return】机制");log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(),returned.getExchange(), returned.getRoutingKey(), returned.getMessage());}});}}

2.2 MQ

  • Exchange持久化
  • Queue持久化
  • Message持久化

2.2.1 Exchange

java">@Bean
public DirectExchange simpleExchange() {return ExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();
}

2.2.2 Queue

java">@Bean
public Queue simpleQueue() {return QueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();
}

2.2.3 Message

java">Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);

2.3 消费者

2.3.1 消费者确认机制

spring:rabbitmq:listener:simple:# 自动ackacknowledge-mode: auto

2.3.2 消费者重试机制

spring:rabbitmq:listener:simple:# 消费者失败重试机制retry:enabled: true# 初始的失败等待时长为1秒initial-interval: 1000ms# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 1# 最大重试次数max-attempts: 3# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true

2.3.3 失败处理策略

java">package com.qiangesoft.rabbitmq.consumer;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息消费失败配置* ps:配置处理失败消息的交换机和队列** @author qiangesoft* @date 2024-05-08*/
@Configuration
public class ErrorMessageConfig {public static final String EXCHANGE = "error.exchange";public static final String QUEUE = "error.queue";public static final String ROUTING_KEY = "error";@Beanpublic DirectExchange errorMessageExchange() {return new DirectExchange(EXCHANGE);}@Beanpublic Queue errorQueue() {return new Queue(QUEUE, true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY);}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE, ROUTING_KEY);}}

http://www.ppmy.cn/embedded/37956.html

相关文章

小麦穗检测数据集VOC+YOLO格式6508张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;6508 标注数量(xml文件个数)&#xff1a;6508 标注数量(txt文件个数)&#xff1a;6508 标注…

点云成图原理

点成图&#xff08;Point Cloud&#xff09;是指由一组离散的点构成的图形&#xff0c;它们在空间中没有任何连接关系。点成图通常是由激光雷达、相机或其他传感器获取的三维数据&#xff0c;用于表示现实世界中的物体或场景。 三角成图&#xff08;Triangulation&#xff09;…

Android 网络请求 实现

Android 网络请求 实现 一、背景 在Android开发中,网络请求是一个非常常见的需求。应用程序可能需要与远程服务器通信来获取数据、上传文件、验证用户身份等等。背景下,Android应用通常会面临以下几个主要情况和挑战: ①数据交互: 许多应用程序需要从服务器获取数据,例…

《Fundamentals of Power Electronics》——正则电路模型

所有PWM CCM dc-dc转换器执行类似的基本功能。第一&#xff0c;它们转换电压和电流水平&#xff0c;理想状态下效率为100%。第二&#xff0c;它们包含波形的低通滤波功能。虽然有必要去除高频开关纹波&#xff0c;但这种滤波也影响低频电压和电流的变化。第三&#xff0c;转换器…

浙大×移动云,携手点亮AI新时代

近年来&#xff0c;中国移动依托强大的算网资源优势&#xff0c;围绕大模型训练、推理和应用三大场景&#xff0c;打造了一站式智算产品体系。该体系旨在为客户提供覆盖资源、平台、应用的AI全链路服务。目前&#xff0c;一站式智算产品体系已在浙江大学智算中心和许昌中原智算…

西红柿叶病害检测(yolov8模型,从图像、视频和摄像头三种路径识别检测,包含登陆页面、注册页面和检测页面)

1.基于最新的YOLOv8训练的西红柿病害检测模型&#xff0c;和基于PyQt5制作的可视西红柿病害系统&#xff0c;包含登陆页面、注册页面和检测页面&#xff0c;该系统可自动检Bacterial Spot, Early_Blight, Healthy, Late_blight, Leaf Mold, Target_Spot, black spot&#xff0c…

Java反射机制与动态代理解析与应用

Java反射机制与动态代理是Java语言中强大的特性和功能。它们可以使程序在运行时动态地获取和操作类的信息&#xff0c;甚至可以在运行时生成和修改类的代码。本文将对Java反射机制和动态代理进行解析&#xff0c;并探讨它们在实际项目中的应用。 一、Java反射机制 Java反射机…

vue 开发环境的搭建

一、整个流程&#xff1a; 安装nodejs >> 安装vue >> 安装vue-cli >> 初始化 webpack(生成代码) >> 安装依赖 >> 运行vue程序 二、详细安装流程&#xff1a; 1.安装nodejs 下载&#xff1a;https://nodejs.org/dist/v12.18.3/node-v12.18.3-x…