RabbitMQ消息的可靠性

news/2025/3/23 22:41:48/

RabbitMQ消息的可靠性

一 生产者的可靠性

  1. 生产者重试
    有时候由于网络问题,会出现连接MQ失败的情况,可以配置重连机制
    注意:SpringAMQP的重试机制是阻塞式的,重试等待的时候,当前线程会等待。
spring:rabbitmq:connection-timout: 1s #设置MQ的连接超时时间templete:retry:enabled: true #开启超时重试机制initial-interval: 100ms #失败后的初始等待时间multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipiermax-attempts: 3 #最大重试次数
  1. 生产者确认

    (1)在生产者服务的yaml文件中配置一下内容
spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置为MQ异步回调方式返回回执信息publisher-returns: true #开启publisher return机制

(2)配置return-callback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContexAware{@Overridepublic void setApplicationContext(ApplicationContent applicationContext){// 获取MQRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置returnCallbackrabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{log.info("消息发送失败,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());
});}
}

(3)发送消息,指定消息ID,消息的ConfirmCallback
相比于发布消息,多了消息的confirm

@Test
public void testPubliserConfir()throw InterupteDException{// 创建correlationDataCorrelationData cd = new CorrelationData(UUID.randowUUID().toString());// 给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCllback<CorelationData.Confirm>(){@Overridepublic void onFailure(Throwable ex){// Future发生异常时的处理逻辑,一般不触发log.error("handle message ack fail",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result){// Future接收到回执的处理逻辑if(result.isAck()){log.debug("发送消息成功,收到ACK");}else{log.error("发送消息失败,收到NACK,reason:{}",result.getReson());}}
});
// 发送消息
rabbitTemplate.coverAndSend("hmall.direct","red","hello",cd);
}

二 MQ的可靠性

MQ的持久化可以使用Lazy Queue
(1)通过配置类

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue")// 队列名称.lazy()//开启lazybulid();
}

(1)基于注解

@RabbitListener(queuesToDeclare = @Queue(name="lazy.queue",durable="ture",arguments=@Argument(name="x-queue-mode",value="lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}",msg);
}

三 消费者确认

  1. 消费者确认机制
    在这里插入图片描述
    可以通过配置来进行确认
spring:rabbitmq:listenner:simple:prefetch: 1acknowledgs-mode: auto #确认机制 none-关闭ack,manual-手动ack,auto-自动
  1. 消费失败处理
    重试机制
spring:rabbitmq:listenner:simple:prefetch: 1retry:enabled: true #开启超时重试机制initial-interval: 100ms #失败后的初始等待时间multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipiermax-attempts: 3 #最大重试次数stateless: true #true为无状态,若业务包含事务,则使用false

失败处理策略
在这里插入图片描述
在这里插入图片描述
代码实现

@Slf4j
@Configureation
@ConditionalOnProperty(prefix="spring.rabbitmq.listenner.simple.retry",name="enable",havingValue="true")// 只有重试机制是true才生效
public class ErrorConfiguration{@Bbeanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(DirectExchange errorExchange,Queue errorQueue){return BindingBuilder.bind(errorQueue).to(errorExchange).with("eooro");}/*** 重试失败处理策略* RepublishMessageRecoverer:重试失败后,将消息发送到指定的队列中*/@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){log.info("MessageRecoverer 重试失败处理策略配置");return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

四 业务幂等性

  1. 消息唯一id
  2. 业务判断
    在这里插入图片描述

http://www.ppmy.cn/news/1227430.html

相关文章

springBoot 配置druid多数据源 MySQL+SQLSERVER

1:pom 文件引入数据 <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency>…

程序设计实践学习笔记

第1题 题目描述 创建一个返回四舍五入到最接近整数的分数之和的函数。在矩阵中有每行的第一个数字表示分子&#xff0c;第二个数子表示分母,挑战者需要将该分数的结果进行四舍五入并将矩阵中所有分数结果总和进行返回。 输入输出格式 输入格式 数字 N 表示的是矩阵的行数。…

springboot项目yml文件中使用${}配置

1、传统写法 &#xff08;1&#xff09;配置服务启动端口 # 服务端口 server:port: 9898 &#xff08;2&#xff09;使用idea启动 &#xff08;3&#xff09;使用jar包启动 2、使用${}写法 格式&#xff1a;${自定义参数名:默认值} 作用&#xff1a; 项目启动时动态配置变量…

Transformer笔记

Transformer encoder-decoder架构 Encoder&#xff1a;将输入序列转换为一个连续向量空间中的表示。Encoder通常是一个循环神经网络&#xff08;RNN&#xff09;或者卷积神经网络&#xff08;CNN&#xff09;&#xff0c;通过对输入序列中的每个元素进行编码&#xff0c;得到…

基于单片机的温度控制器系统设计

**单片机设计介绍&#xff0c; 基于单片机的温度控制器系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的温度控制器系统是一种利用单片机来检测环境温度并控制温度的系统。它通常由以下几个部分组成&#xff…

【Spring总结】注解开发

本篇讲的内容主要是基于Spring v2.5的注解来完成bean的定义 之前都是使用纯配置的方式来定义的bean 文章目录 前言1. Spring v2.5 注解开发定义bean第一步&#xff1a;在需要定义的类上写上注解Component第二步&#xff1a;在Spring Config中定义扫描包第三步&#xff1a;主方法…

Java Swing猜单词游戏

内容要求 1&#xff09; 本次程序设计是专门针对 Java 课程的,要求使用 Java 语言进行具有一定代码量的程序开发。程序的设计要结合一定的算法&#xff0c;在进行代码编写前要能够设计好自己的算法。 2&#xff09;本次程序设计涉及到 Java 的基本语法&#xff0c;即课堂上所…

【心得】Python基础梳理个人笔记

python 特点&#xff1a; 1 解释性语言 2 交互式语言 3 支持面向对象编程 4 初学者语言 基本语法 # -*- coding: utf-8 -*- #!/usr/bin/python3 #!/bin/sh python payload.py chmod x ./payload.py ./payload.py 直接python xxx.py 不需要声明#!/usr/bin/pytho…