[RabbitMQ] 保证消息可靠性的三大机制------消息确认,持久化,发送方确认

devtools/2024/11/26 21:17:19/

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 消息确认
    • 1.1 消息确认机制
    • 1.2 手动确认方法
    • 1.3 代码示例
      • 1.3.1 NONE
      • 1.3.2 AUTO
      • 1.3.3 MANUAL
  • 2. 持久性
    • 2.1 交换机持久化
    • 2.2 队列持久化
    • 2.3 消息持久化
  • 3. 发送方确认
    • 3.1 confirm确认模式
    • 3.2 return回退模式
  • 4. 常见面试题

1. 消息确认

1.1 消息确认机制

生产者发送消息之后,到达消费端之后,可能会有以下情况:一种是消息处理成功,一种是消息处理异常.
在这里插入图片描述
如果消息处理异常的情况下,这条消息就会被删除,此时就会造成消息的丢失.此时我们就需要保证消息可靠地到达消费者,RabbitMQ为此提供了消息确认的机制.
消费者在订阅队列的时候,可以之地宁autoAck参数,根据这个参数,消息确认的机制可以分为一下的两种:

  • 自动确认,autoAck参数为true.RabbitMQ的队列会把发送出去的消息自动置为确认,然后自动从内存或者硬盘中删除,而不管消费者是否真正地消费到了这些消息.
  • 手动确认,autoAck参数为false.RabbitMQ的队列会等待消费者显示调用Basic.Ack命令,回复确认信号之后才从内存或者硬盘中进行删除.
    对于手动确认的队列中的消息而言,队列中的消息分为两部分:
    ⼀是等待投递给消费者的消息.
    二是已经投递给消费者,但是还没有收到消费者确认信号的消息.
    如果RabbitMQ一直没有收到消费者的确认消息信号,则RabbitMQ会安排这个消息重新进入队列,等待投递个下一个消费者,当然下一个消费者还是有可能是原来的消费者.
    在这里插入图片描述

1.2 手动确认方法

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认应答的方式.一共有以下三种:

  1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
    这种方法表示的是,消费者已经成功接收到消息,可以将其丢弃了.参数分别表示的是:
    • deliveryTag: 消息的id,是消息的唯一标识.
    • multiple: 是否批量确认,值为true则会一次性ack所有小于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息.这种方式可以在一定程度上减少网络的开销.
      在这里插入图片描述
  2. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
    消费者拒绝接收这个消息.参数说明:
    • requeue: 表示在拒绝这条消息之后,这条消息会不会被放回到队列中.
  3. 否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
    次方法可以让消费者批量拒绝消息,参数说明:
    • multiple: 设置为true表示拒绝deliveryTag编号之前的所有未被消费消息.其他的参数和上面的类似.

1.3 代码示例

下面我们通过SpringBoot来演示消息确认的机制.
Spring-AMQP对消息确认的机制提供了三种策略.

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
  1. NONE: 在这种模式之下,不管消费者是否成功处理了消息,RabbitMQ会自动确认消息,从RabbitMQ的队列中自动移除消息,如果消息处理失败,消息会发生丢失.
  2. AUTO: 如果不对这个配置项进行配置的话,这个就是默认的确认机制,在这种模式之下,消费者会自动确认消息,但是如果在消费者处理消息的过程中抛出了异常,该消息不会被确认.
  3. MANUAL: 手动确认,消费者必须在成功处理之后显示调用basicAck方法来确认消息,如果消息未被确认
    ,RabbitMQ会认为消息没有被处理成功,并且会在消费者可用的时候重新投递该消息.如果消息被拒绝,可以按照自己的需求来处理消息是否重新进入队列中.

1.3.1 NONE

  • 配置确认机制
spring:application:name: rabbitmq-springrabbitmq:host: 39.105.137.64port: 5672username: jiangruijiapassword: ******virtual-host: /listener:simple:acknowledge-mode: none
  • 发送消息
    首先配置交换机和队列
public static final String ACK_QUEUE = "ack_queue";
public static final String ACK_EXCHANGE = "ack_exchange";
@Bean
public DirectExchange ackExchange(){return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
}
@Bean
public Queue ackQueue(){return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean
public Binding ackBinding(@Qualifier("ackExchange") DirectExchange exchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ack");
}

通过接口发送消息:

@RequestMapping("/ack")
public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE,"ack","Ack消息");return "发送成功";
}
  • 使用Postman发送消息
    我们先把消费者注释掉,不要运行消费者.
    在发送消息之后,我们看到队列中的消息有一条未消费的消息.
    在这里插入图片描述
    消费者消费者之后:
    在这里插入图片描述
    在这里插入图片描述
    我们看到控制台中接收到了消息,处理异常,但是队列中的消息已经被标记为了确认.

1.3.2 AUTO

  1. 配置消息确认机制
spring:application:name: rabbitmq-springrabbitmq:host: 182.92.204.253port: 5672username: jiangruijiapassword: *****virtual-host: /listener:simple:acknowledge-mode: auto
  1. 运行程序
    我们发现消息在不停地重发,并且消息的tag值在不停地增加.在这里插入图片描述
    在这里插入图片描述
    在管理界面上,我们发现未确认的消息一直为1,这是由于在消费者处理信息的时候出现了异常,这时候RabbitMQ会不断地对消息进行重发,消息不会被确认.导致了消息一直都是unacked状态.

1.3.3 MANUAL

  1. 修改确认机制
    listener:simple:acknowledge-mode: manual
  1. 修改消费者逻辑为手动确认
    首先我们正常处理消息,先不要抛出异常
@Component
public class AckListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException{System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());System.out.println("处理消息");
//            int i = 3/0;//消息处理异常channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

调用接口发送消息:
在这里插入图片描述
在这里插入图片描述

我们看到运行结果是正常的.队列中的消息成功被消费.
接下来我们把异常放开:

@Component
public class AckListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException{System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());System.out.println("处理消息");int i = 3/0;//消息处理异常channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

在这里插入图片描述
在这里插入图片描述
我们发现队列中的消息发生了积压,消息直接被退回的队列,控制台抛出了异常,消息没有被处理成功.
之后我们可以把代码改为出现异常的时候拒绝接收,其中basicRejectrequeue属性配置为true,让消息拒绝之后重新入队.

@Component
public class AckListener {@RabbitListener(queues = Constant.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException{System.out.printf("接收到消息:%s,消息tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {System.out.println("处理消息");Thread.sleep(1000);int i = 3/0;//消息处理异常channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
}

运行结果: 消费异常会被捕捉,拒绝接收回到队列之后,会不停进行重试.
在这里插入图片描述
在这里插入图片描述

这里与直接发生异常回到队列的结果不一样,发生异常回到队列不会反复重新处理,如果捕捉到异常之后拒绝接收,后续还是会不停地重新处理,与atuo的结果差不多.

2. 持久性

前面的消息确认机制是保证了消息从队列中到达消费者的过程中不发生丢失.但是如果RabbitMQ由于某种异常情况以外退出或者崩溃,交换机,队列或者消息可能会发生丢失.
RabbitMQ中的持久化分为三个部分: 交换机持久化,队列持久化,消息持久化.

2.1 交换机持久化

我们之前在进行RabbitMQ配置的时候,就曾经反复使用交换机的持久化.他是在声明交换机的时候,通过durable方法的参数设置为true来实现的.
相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机,交换机会自动建立,相当于一直存在.

public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constant.DIRECT_EXCHANGE).durable(true).build();
}

2.2 队列持久化

和交换机持久化一样,我们在之前也一直使用queue的持久化.持久化是通过在声明队列的时候,使用durable方法把其中的属性设置为队列名来实现的.

public Queue directQueue1(){return QueueBuilder.durable(Constant.DIRECT_QUEUE1).build();
}

当我们在查看队列持久化的源码的时候,我们发现队列的durable属性默认为true.

public static QueueBuilder durable(String name) {return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {this.durable = true;return this;
}

我们也可以使用nonDurable方法来创建非持久化的队列.

public Queue directQueue1(){return QueueBuilder.nonDurable(Constant.DIRECT_QUEUE1).build();
}

2.3 消息持久化

消息实现持久化,需要把消息的投递模式设置为2(也就是MessageProperties中的deliveryMode设置为MessageDeliveryMode.PERSISTENT),MessageDeliveryMode为一个枚举类.

public enum MessageDeliveryMode {NON_PERSISTENT,//⾮持久化 PERSISTENT;//持久化 

只有我们同时设置了队列和消息的持久化,才可以保证RabbitMQ服务在重启之后,消息依然是存在的.如果只是设置了队列的持久化或者是消息的持久化,重启之后消息依然会丢失.
接下来我们让生产者来发送一条持久化的消息:
交换机和队列的配置还是我们之前的配置

@RequestMapping("/ack")
public String ack(){String s = "hello ack";Message message = new Message(s.getBytes(StandardCharsets.UTF_8),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE,"ack",message);return "发送成功";
}

在默认的条件下,消息是持久化,除非队列被声明为非持久化或者是在发送消息的时候消息被标记为持久化.

如果所有的消息都被标记为了持久化,会严重影响RabbitMQ的性能.这是因为写硬盘会拖慢速度.对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量.在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做⼀个权衡.

如果把交换机,队列,消息都设置为持久化,就可以保证百分之百数据不丢失了吗?答案是不是的.

  1. 如果消费者在消费队列消息的时候,设置的是自动确认,那么当消费者还没有来得及处理消息,消费者就宕机了,就会造成消息丢失,这种情况就需要我们把消息的确认机制设置为手动确认即可.
  2. 如果消息在RabbitMQ中进行持久化的时候,在存硬盘的过程中发生了宕机,消息还没有来得及落盘,那么这些消息也会消失.
    这种情况如何解决呢?第一种方法就是使用仲裁队列,我们后面说明.第二种方法就是发送端引入发送方确认的机制来保证消息的可靠性.下面我们就来详细介绍发送方确认机制.

3. 发送方确认

在使用RabbitMQ的时候,可以通过持久化来解决RabbitMQ宕机而导致的消息丢失的问题,如果有一种情况,消息在到达服务器之前就已经发生了丢失,这时候消息根本没有到达RabbitMQ,持久化也解决不了这个问题.
RabbitMQ为我们提供了两种方案,一种是事务机制,一种是发送方确认机制.
由于事务机制比较消耗性能,一般情况下,我们不会使用事务.我们主要介绍发送方确认的机制.
RabbitMQ为我们提供了两个方式来控制消息可靠性的投递:

  1. confirm确认模式
  2. return退回模式

3.1 confirm确认模式

  1. 配置RabbitMQ
rabbitmq:host: 182.92.204.253port: 5672username: jiangruijiapassword: ******virtual-host: /listener:simple:acknowledge-mode: manualpublisher-confirm-type: correlated #消息发送确认
  1. 设置确认回调逻辑并发送消息
    无论消息发送是成功还是失败,都会调用ConfirmCallback中的confirm方法.如果消息成功发送到Broker,则ack为true.如果发送失败,ack为false,并且cause提供失败的原因.
    首先配置带有发送方确认机制的RabbitTemplate,使用setConfirmCallback来设置发送方确认,其中使用匿名内部类的方式重写confirm方法,在其中编写确认成功和确认失败的逻辑.
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//设置消息队列的确认机制@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){//如果确认成功,ack为trueSystem.out.println("发送方确认成功:"+correlationData.getId());//correlationData中包含消息的id}else {System.out.println("发送方确认失败:"+correlationData.getId()+",原因:"+cause);}}});return rabbitTemplate;
}

confirm中的三个参数的意思分别是:
- correlationData:消息发送时候的一些附加信息,其中包含一个id属性,通常用于在确认回调中识别特定的消息.
- ack: 交换机是否收到消息发送方的信息,收到为true,未收到为false.
- cause: 当消息确认失败的时候,这个字符串会提供失败的原因.

注意: 在我们在SpringIoC容器中自定义一个RabbitTemplate的Bean对象的时候,在我们对RabbitTemplate对象进行DI注入的时候,由于Spring的Bean的优先级管理机制,Spring不再会调用RabbitMQ原生的Bean对象,而是调用我们自定义的Bean对象.
配置交换机和队列

@Bean
public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constant.CONFIRM_EXCHANGE).durable(true).build();
}
@Bean
public Queue confirmQueue(){return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();
}
@Bean
public Binding confirmBinding(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("confirm");
}

之后在生产者使用@Resource注解注入我们写好的具有发送者确认机制的RabbitTemplate.之后进行消息发送.

@Resource
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm(){CorrelationData correlationData = new CorrelationData("1");//指定消息id,用于生产者进行消息确认confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm","confirm消息",correlationData);return "发送成功";
}

在发送消息的时候.我们需要指定消息的correlationData中的id,这个id可以让生产者进行对应消息的确认.
我们在之前的文章中也提到过发布确认模式,其中包含了一个确认监听器,叫做ConfirmListener接口,其中提供了handleAckhandleNack,一个用于处理消息确认成功时候的业务逻辑,一个是消息否定确认时候的业务逻辑.和ConfirmCallback.confirm方法中的ack参数类似.
3. 测试
我们使用Postman对接口进行调用,观察控制台.
在这里插入图片描述
我们看到发送方成功确认了消息的接收,说明了消息已经成功到达了交换机.
如果我们把交换机的名称改掉.

@RequestMapping("/confirm")
public String confirm(){CorrelationData correlationData = new CorrelationData("1");//指定消息id,用于生产者进行消息确认confirmRabbitTemplate.convertAndSend("Constant.CONFIRM_EXCHANGE","confirm","confirm消息",correlationData);return "发送成功";
}

在这里插入图片描述
我们看到发送方消息确认失败了,说明消息没有正确地到达交换机.

3.2 return回退模式

消息到达exchange之后,会根据路由规则进行匹配,把消息放入Queue中.Exchange到Queue的过程,如果一条消息服务被任何队列消费,可以选择把消息发回给生产者,我们可以设置一个一个返回回调方法,对消息进行处理.

  1. 配置RabbitMQ
    和confirm模式相同
  2. 设置返回回调逻辑并发送消息
    配置具有return退回模式的RabbitTemplate.
@Bean
public RabbitTemplate returnRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:"+returned);}});return rabbitTemplate;
}

这里在使用RabbitTemplate中的setMandatory方法的时候,如果设置为true,这个属性就是告诉Broker,如果一条消息没有被任何一条队列消费,那么就触发ReturnsCallback.
其中ReturnedMessage中有以下属性:

public class ReturnedMessage {private final Message message;private final int replyCode;private final String replyText;private final String exchange;private final String routingKey;

message表示返回消息的对象,包含了消息体和消息的属性.
replyCode表示Broker提供的回复码,表示消息无法路由的原因,有点类似与错误码.
replyText表示无法路由消息的额外信息和错误描述.
exchange,routingKey分别表示交换机的名称和路由键.
发送消息

@Resource
private RabbitTemplate returnRabbitTemplate;
@RequestMapping("/return")
public String confirmReturn(){CorrelationData correlationData = new CorrelationData("2");returnRabbitTemplate.convertAndSend(Constant.RETURN_EXCHANGE,"return","return消息",correlationData);return "发送成功";
}
  1. 测试
    使用Postman调用接口,观察控制台日志.
    当发送成功的时候,我们看到控制台上什么都没有打印,而且我们查看管理界面,发现队列中已经有消息准备被消费了.
    在这里插入图片描述
    如果发送的路由关键字配置错误.
@RequestMapping("/return")
public String confirmReturn(){CorrelationData correlationData = new CorrelationData("2");returnRabbitTemplate.convertAndSend(Constant.RETURN_EXCHANGE,"return1","return消息",correlationData);return "发送成功";
}

我们再次进行测试,发现消息被回退:
在这里插入图片描述

4. 常见面试题

这一个板块,会涉及到一个非常常见的面试题.就是如何保证RabbitMQ消息的可靠性.
在这里插入图片描述
我们可以根据消息可能丢失的场景来解决:

  1. 消息从生产者到交换机期间发生丢失
    • 出现原因: 网络问题等
    • 解决办法: confirm确认模式
  2. 消息从交换机无法路由到指定队列
    • 出现原因: 交换机与队列配置错误
    • 解决办法: return确认模式
  3. 消息队列自身数据发生丢失
    • 出现原因: RabbitMQ服务器宕机
    • 解决办法: 消息,队列,交换机持久化
  4. 消费者异常,导致消息丢失
    • 出现原因: 消费者宕机,消费者业务逻辑异常
    • 解决办法: 消息确认.
      对于解决办法,我们需要根据上述的讲解进行具体描述

http://www.ppmy.cn/devtools/137221.html

相关文章

AIX下crs-5005 ip address is aready in use in the network的解决办法

某业务生产系统中,三节点的rac数据库中3号节点因故障停机后,进行crs的重启。重启完成后,发现数据库的监听未起来,启动的过程中并提示crs-5005错误。 一、问题过程 查看监听,发现监听no service ywdb03/oracle/grid/c…

《硬件架构的艺术》笔记(七):处理字节顺序

介绍 本章主要介绍字节顺序的的基本规则。(感觉偏软件了,不知道为啥那么会放进《硬件架构的艺术》这本书)。 定义 字节顺序定义数据在计算机系统中的存储格式,描述存储器中的MSB和LSB的位置。对于数据始终以32位形式保存在存储器…

# [Unity] 【游戏开发】Unity开发基础2-Unity脚本编程基础详解

Unity脚本编程是创建互动式游戏体验的核心技能之一。本文将详细讲解Unity脚本编程的基础知识,包括变量和数据类型、程序逻辑、方法等方面,并通过实例展示如何使用这些基本知识完成简单功能的实现。 1. 新建Unity脚本的基本结构 当在Unity中创建一个脚本时,Unity会生成如下基…

本地可运行,jar包运行错误【解决实例】:通过IDEA的maven package打包多模块项目

这是一个排错的经验分享贴子。 最近在跟着知识星球做项目,到最后是部署到服务器,使用的方式是打包jar包上传到服务器运行。 但在实际打包的过程中,出现了本地IDEA可运行,jar包不可运行的错误。 具体错误截图如下: …

vue 预览pdf 【@sunsetglow/vue-pdf-viewer】开箱即用,无需开发

sunsetglow/vue-pdf-viewer 开箱即用的pdf插件sunsetglow/vue-pdf-viewer, vue3 版本 无需多余开发,操作简单,支持大文件 pdf 滚动加载,缩放,左侧导航,下载,页码,打印,文本复制&…

CodiMD导出pdf失败或无中文

CodiMD导出pdf失败,弹出文件保存窗口,有个pdf文件能下载,但是保存的时候提示“网站出问题了”,实际到服务器上看会发现docker崩溃了。 解决办法: 使用最新的CodiMD镜像,如nabo.codimd.dev/hackmdio/hackmd:…

计算机网络:应用层知识点概述及习题

网课资源: 湖科大教书匠 1、概述 习题1 1 在计算机网络体系结构中,应用层的主要功能是 A. 实现进程之间基于网络的通信 B. 通过进程之间的交互来实现特定网络应用 C. 实现分组在多个网络上传输 D. 透明传输比特流 2 以下不属于TCP/IP体系结构应用层范畴…

通过轻易云平台实现聚水潭数据高效集成到MySQL的技术方案

聚水潭数据集成到MySQL的技术案例分享 在本次技术案例中,我们将详细探讨如何通过轻易云数据集成平台,将聚水潭的数据高效、可靠地集成到MySQL数据库中。具体方案为“聚水谭-店铺查询单-->BI斯莱蒙-店铺表”。这一过程不仅需要处理大量数据的快速写入…