RabbitMQ保证消息的可靠性

server/2024/10/18 14:03:54/

一、背景

消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。
在这里插入图片描述

1.1 生产者丢失

  • 生产者发送消息时连接MQ失败
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

1.2 MQ丢失

  • 消息到达MQ,保存到队列后,尚未消费就突然宕机

1.3 消费者丢失

  • 消息接收后尚未处理突然宕机
  • 消息接收后处理过程中抛出异常

1.4 总结(三方面入手)

  • 确保生产者成功把消息发送到MQ
  • 确保MQ不会丢失消息
  • 确保消费者成功处理消息

二、解决方案

配置

java">package com.qiangesoft.rabbitmq.producer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息发送配置** @author qiangesoft* @date 2024-05-08*/
@Configuration
public class MessageConfig {public static final String EXCHANGE = "simple.exchange";public static final String QUEUE = "simple.queue";public static final String ROUTING_KEY = "simple";@Beanpublic DirectExchange simpleExchange() {return ExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();}@Beanpublic Queue simpleQueue() {return QueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();}@Beanpublic Binding simpleBinding(Queue simpleQueue, DirectExchange simpleExchange) {return BindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY);}}

2.1 生产者

2.1.1 生产者重试机制

背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
解决方案:配置连接超时时间、重试机制。

spring:rabbitmq:    # 设置MQ的连接超时时间connection-timeout: 1stemplate:# 连接重试机制retry:enabled: true# 失败后的初始等待时间initial-interval: 1000ms# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermultiplier: 1# 最大重试次数max-attempts: 3

2.1.2 生产者确认机制

背景:

  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
  • 消息到达MQ后,处理消息的进程发生异常

解决方案:配置Publisher Confirm机制、Publisher Return机制。

spring:rabbitmq:# 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机publisher-confirm-type: correlated# 开启publisher return机制,确保消息到达队列publisher-returns: true
定义ConfirmCallback
java">package com.qiangesoft.rabbitmq.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 生产者** @author qiangesoft* @date 2024-05-08*/
@Slf4j
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredpublic RabbitTemplate rabbitTemplate;@GetMapping("/send")public void send(String content) {CorrelationData correlation = getCorrelationData();Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 正常发送rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);}private static CorrelationData getCorrelationData() {// 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】CorrelationData correlation = new CorrelationData();correlation.getFuture().addCallback(new ListenableFutureCallback<>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.info("触发【publisher confirm】机制");if (result.isAck()) {log.info("消息发送成功到达交换机,ID:{}", correlation.getId());} else {// 消息发送失败log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason());}}});return correlation;}}
定义ReturnCallback
java">package com.qiangesoft.rabbitmq.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 消息路由失败回退配置** @author qiangesoft* @date 2024-05-08*/
@Slf4j
@Configuration
public class ReturnsCallbackConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("触发【publisher return】机制");log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(),returned.getExchange(), returned.getRoutingKey(), returned.getMessage());}});}}

2.2 MQ

  • Exchange持久化
  • Queue持久化
  • Message持久化

2.2.1 Exchange

java">@Bean
public DirectExchange simpleExchange() {return ExchangeBuilder.directExchange(EXCHANGE)// 持久化交换机.durable(true).build();
}

2.2.2 Queue

java">@Bean
public Queue simpleQueue() {return QueueBuilder// 持久化队列.durable(QUEUE)// 避免消息堆积、懒加载.lazy().build();
}

2.2.3 Message

java">Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setMessageId(UUID.randomUUID().toString())// 消息持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);

2.3 消费者

2.3.1 消费者确认机制

spring:rabbitmq:listener:simple:# 自动ackacknowledge-mode: auto

2.3.2 消费者重试机制

spring:rabbitmq:listener:simple:# 消费者失败重试机制retry:enabled: true# 初始的失败等待时长为1秒initial-interval: 1000ms# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 1# 最大重试次数max-attempts: 3# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true

2.3.3 失败处理策略

java">package com.qiangesoft.rabbitmq.consumer;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息消费失败配置* ps:配置处理失败消息的交换机和队列** @author qiangesoft* @date 2024-05-08*/
@Configuration
public class ErrorMessageConfig {public static final String EXCHANGE = "error.exchange";public static final String QUEUE = "error.queue";public static final String ROUTING_KEY = "error";@Beanpublic DirectExchange errorMessageExchange() {return new DirectExchange(EXCHANGE);}@Beanpublic Queue errorQueue() {return new Queue(QUEUE, true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY);}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE, ROUTING_KEY);}}

http://www.ppmy.cn/server/38266.html

相关文章

机器视觉_联合编程(二)

链接相机,加载tb,检测 FrameGrabber链接相机拍照 using System; using System.Collections; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tas…

代码随想录算法训练营第六十二天| 503.下一个更大元素II,42. 接雨水

题目与题解 503.下一个更大元素II 题目链接&#xff1a;503.下一个更大元素II 代码随想录题解&#xff1a;503.下一个更大元素II 视频讲解&#xff1a;单调栈&#xff0c;成环了可怎么办&#xff1f;LeetCode&#xff1a;503.下一个更大元素II_哔哩哔哩_bilibili 解题思路&…

如何调用Java接口中默认方法?

从JDK8开始&#xff0c;接口支持默认方法实现&#xff0c;即在接口中可以有具体的实现&#xff0c;仅需使用关键字 default修饰方法即可&#xff0c;如&#xff1a; public interface MyInterface {default void call(String methodName) {System.out.println("MethodHan…

3、Qt--配置文件的使用

开发平台&#xff1a;Win10 64位 开发环境&#xff1a;Qt Creator 13.0.0 构建环境&#xff1a;Qt 5.15.2 MSVC2019 64位 一、需求及方案 实际开发过程中&#xff0c;我们需要根据本地的配置文件&#xff0c;去配置我们的程序&#xff0c;比如数据库地址、网络地址等信息&…

AI终端设备的自动化分级

摘要&#xff1a; 大语言模型&#xff08;LLM&#xff09;被认为是通用人工智能&#xff08;AGI&#xff09;的潜在火花&#xff0c;为构建通用人工智能代理带来了希望。在此基础上&#xff0c;客户端设备在人工智能的帮助下不断发展&#xff0c;从基于应用程序&#xff08;AP…

【微服务】配置管理

Nacos配置管理 配置管理配置共享配置热更新 配置管理 将微服务集群中常用&#xff0c;经常变化的配置都写到一个独立的配置文件微服务中进行统一管理 配置共享 在Nacos的界面当中进行配置管理&#xff0c;在配置列表中添加配置 比如各个服务中的jdbc的连接配置&#xff1a; …

数据库系统理论——关系数据库

文章目录 一、关系&#xff08;数据结构&#xff09;1、概述2、名词解释3、关系模式、关系数据库、关系数据库模式4、基本关系的性质 二、关系操作&#xff08;数据操作&#xff09;三、关系的完整性1、实体完整性2 、参照完整性3、用户自定义的完整性 四、关系代数五、习题 前…

【编程题-错题集】非对称之美(找规律 / 贪心)

牛客对应题目链接&#xff1a;非对称之美 (nowcoder.com) 一、分析题目 找规律&#xff1a; 判断是否全都是相同字符。判断本身是否是回文。 如果这个字符串每个字符相同&#xff0c;不存在非回文子串&#xff0c;直接返回 0。如果这个字符串不是回文&#xff0c;输出字符串长度…