MQ高级:RabbitMQ小细节

news/2024/10/8 18:39:24/

在之前的学习中,我们只介绍了消息的发送,但是没有考虑到异常的情况,今天我们就介绍一些异常情况,和细节的部分。

目录

生产者可靠性

生产者重连

生产者确认

MQ可靠性

持久化

Lazy Queue

消费者可靠性

消费者确认机制

失败重试机制

业务幂等性

延迟消息

死信交换机

延迟消息插件


生产者可靠性

生产者重连

有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multipliermax-attempts: 3 # 最大重试次数

在停止mq服务之后,运行代码,会发现测试失败,因为连接失败。最大重试次数设置的是3,此处就重试了3次再停止。

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,
会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

在开启了生产者确认机制后,在MQ成功收到消息后会返回确认消息ACK给生产者,如果有异常,会返回NACK。

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

spring:rabbitmq:publisher-confirm-type: correlated# 开启publisher confirm机制,并设置confirm类型为correlatedpublisher-returns: true# 开启publisher return机制

 这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MO的回执消息
  • correlated:MO异步回调方式返回回执消息

但是!生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。
对于nack消息可以有限次数重试,依然失败则记录异常消息。

MQ可靠性

解决了生产者可靠性,还需要解决MQ的可靠性。

通过生产者发送的消息被MQ存放到内存中,经过某些特殊情况或者MQ重启后,这部分数据会丢失。并且内存空间是有限的,当消费者故障或者处理太慢时,会导致消息积压,导致MQ阻塞。

持久化

为了解决这个问题,MQ引入了数据持久化,包括交换机持久化、队列持久化、消息持久化。

前两者只需要在创建的时候设置成Durable(默认)即可。

消息持久化默认是不开启的,要手动开启。

当不开启磁盘持久化,消息会全部存放在内存中。但是发送消息过多,会占满内存。之后多出来的消息会存放到Paged out中,也就是磁盘中。等待内存中的消息被处理完后,会再把磁盘中的消息加载到内存中,再继续处理。

当开启了磁盘持久化,接收到的消息会在内存和磁盘中都存一份。此时处理的消息是从内存中处理。内存会在将要满的时候清理一次,再继续完成消息处理。磁盘则会把所有消息都保存下来。

Lazy Queue

Lazy Queue惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

并且惰性队列的性能很高,比之前的几种性能都会好一些。

消费者可靠性

消费者确认机制

当不开启消费者确认机制,生产者投递了一条消息,不管消费者是否处理完了,会马上被RabbitMQ删除,当做已经处理完了。但是如果消费者出现网络波动或者其他异常情况,会导致没有接收到这条消息,生产者这边还会认为消费者已经接收到消息了。告知RabbitMQ自己消息处理状态。处理消息结束后,应该向RabbitMQ发送一个回执,

回执有三种可选值:

  • ack:成功处理消息,RabbitMO从队列中删除该消息
  • nack:消息处理失败,RabbitMO需要再次投递消息
  • reiect:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理消息确认。消息投递给消费者后立刻ack,消息会立刻从MQ删除。这种方式非常不安全,不建议使用。

  • manual:手动模式。需要在业务代码中手动调用API发送ack或reject。虽然存在业务侵入,但提供了更大的灵活性。

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强。当业务正常执行时,自动返回ack。当业务出现异常时,根据异常类型自动返回不同的结果:
    1.如果是业务异常,自动返回nack。
    2.如果是消息处理或校验异常,自动返回reject。

spring:rabbitmq:listener:simple:acknowledge-mode: auto  # 可以设置为 none, manual, auto

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,形成无限循环。这会导致MQ的消息处理量飙升,给系统带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时进行本地重试,而不是无限制地requeue到MQ队列。并且指定最大重试次数。

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true  # 开启消费者失败重试initial-interval: 1000ms  # 初始的失败等待时长为1秒multiplier: 1  # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3  # 最大重试次数stateless: true  # true无状态; false有状态。如果业务中包含事务,这里改为false

  

在开启重试模式后,如果重试次数耗尽且消息依然失败,则需要有MessageRecoverer接口来处理。MessageRecoverer包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

当通过最后一种方式重试耗尽后,我们可以额外设置一个队列,比如error.queue,当发送失败的消息进入这个队列后,再通过邮件强提醒这样的机制推送给工作人员,可以有效解决消息发送失败的极端情况。

业务幂等性

在程序开发中,业务幂等性指的是同一个业务,执行一次或者执行多次对业务的状态是没有影响的。

唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

如何保证支付服务与交易服务之间的订单状态一致性?

为确保支付服务与交易服务之间的订单状态一致性,我们采取了以下措施:

  1. 消息通知

    • 支付服务在用户支付成功后,通过MQ(消息队列)发送消息通知交易服务,以完成订单状态的同步。
  2. 消息可靠性策略

    • 采用生产者确认机制、消费者确认和消费者失败重试等策略,确保消息的可靠投递和处理。
    • 开启MQ的持久化功能,避免因服务宕机导致消息丢失。
  3. 业务幂等性

    • 在交易服务更新订单状态时进行业务幂等性判断,防止因消息重复消费导致订单状态异常。

如果交易服务消息处理失败,有什么兜底方案?

  • 定时任务
    • 在交易服务中设置定时任务,定期查询订单的支付状态。
    • 即使MQ通知失败,定时任务也可以作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

延迟消息是消息队列中的一种重要功能,它允许消息在被发送到消息队列后并不会立即被消费者消费,而是在经过特定的时间延迟后才能被消费者获取和处理。这种特性在很多业务场景中都非常有用,比如订单处理超时、定时提醒等。

比如,用户A下单某商品的最后一件,订单确认后,迟迟不支付,但是又占用着这个名额。等到很久以后取消这个订单,此时想买的人没买到,商家没有卖掉,而这个人又没有买。这种情况用延迟消息就能很好的解决这个问题。当用户下单商品后,会设置一个延迟消息,假设30分钟内没有下单,这个延迟消息就会被发送到MQ,提醒数据库这个人订单超时了,强制让这个人取消订单。

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

死信消息,经过死信交换机可以变成延迟消息。

当publisher发布一个消息后,通过交换机进入队列。通过手动设置一个过期时间,让消息变成死信消息,此时消息会自动进入通过dead-letter-exchang设置的交换机dlx.direct,再一步步的进入到consumer。

延迟消息插件


RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="delay.queue", durable="true"),
exchange=@Exchange(name="delay.direct",delayed="true"),
key="delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()//设置delay的属性为true
.durable(true)//持久化
.build();
}

发送消息时需要通过消息头x-delay来设置过期时间:

@Test
void testPublisherDelayMessage() {//1.创建消息String message = "hello, delayed message";//2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}


http://www.ppmy.cn/news/1536241.html

相关文章

chatGPT对我学术写作的三种帮助

chatGPT对我学术写作的三种帮助 概述提高学术写作水平大模型选择概述上下文以提供精确的指令 提升同行评审优化编辑反馈 概述 从生成式人工智能中获得的价值并非来自于技术本身盲目地输出文本,而是来自于与工具的互动,并利用自身的专业知识来完善它所生…

Prompt 初级版:构建高效对话的基础指南

Prompt 初级版:构建高效对话的基础指南 文章目录 Prompt 初级版:构建高效对话的基础指南一 “标准”提示二 角色提示三 多范例提示四 组合提示五 规范化提示 本文介绍了提示词的基础概念与不同类型,帮助用户更好地理解如何在对话中构建有效的…

第三十九章 创建安全对话

文章目录 第三十九章 创建安全对话概述开始安全对话 第三十九章 创建安全对话 IRIS 支持安全对话&#xff0c;遵循 WS-SecureConversation 1.3 规范。本页介绍如何手动创建安全对话。 概述 在安全对话中&#xff0c;Web 客户端向 Web 服务发出初始请求并接收包含 <Securi…

【机器学习-无监督学习】降维与主成分分析

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…

MySQL存储过程原理、实现及优化

目录 第一章 存储过程概述 1.1 存储过程定义与作用 1.2 存储过程的优点与缺点 1.2.1 优点 1.2.2 缺点 1.3 MySQL中的存储过程 第二章 存储过程的原理 2.1 存储过程的执行流程 2.1.1 编译阶段 2.1.2 存储阶段 2.1.3 执行阶段 2.2 存储过程的存储机制 2.3 存储过程与…

Linux bash脚本 远程开发环境配置

参考资料 太香了&#xff0c;VSCode远程开发插件&#xff0c;值得一试Visual Studio Code で Remote SSH する。Managing extensions 目录 一. 远程开发必备二. 连接远程开发服务器三. 安装远程开发插件 一. 远程开发必备 ⏹ VSCode插件 Remote - SSH 通过使用 SSH 链接虚拟…

15分钟学 Python 第36天 :Python 爬虫入门(二)

Python 爬虫入门&#xff1a;环境准备 在进行Python爬虫的学习和实践之前&#xff0c;首先需要准备好合适的开发环境。本节将详细介绍Python环境的安装、必要库的配置、以及常用工具的使用&#xff0c;为后续的爬虫编写奠定坚实的基础。 1. 环境准备概述 1.1 为什么环境准备…

方法重写与多态

方法重写 1.在子类和父类直接 2.方法名相同 3.参数个数和类型相同 4.返回类型相同或是其父类 5.访问权限不能严于父类 package com.hz.ch04.test01;public abstract class Pet {private String name;private int love;private int health;public String getName() {retur…