MQ 架构设计原理与消息中间件详解(三)

news/2024/10/10 20:32:43/

RabbitMQ实战解决方案

RabbitMQ死信队列

死信队列产生的背景

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生死信队列的原因
  1. 消息投递到MQ中存放 消息已经过期  消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
  2. 队列达到最大的长度 (队列容器已经满了)
  3. 消费者消费多次消息失败,就会转移存放到死信队列中

 

代码整合 参考 Boyatop-springboot-rabbitmq|#中order-dead-letter-queue项目

死信队列的架构原理

死信队列和普通队列区别不是很大

普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

区别:

1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到

普通队列中缓存起来,普通队列对应有自己独立普通消费者。

2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

死信队列应用场景

1.30分钟订单超时设计

  1. Redis过期key :
  2. 死信延迟队列实现:

采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后

就会将该消息转移到死信备胎消费者实现消费。

备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下

则会开始回滚库存操作。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制
  1. 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下

在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。

需要人为指定重试次数限制问题

  1. 在什么情况下消费者需要实现重试策略?

A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?

该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。

B.消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?

该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。
可以将日志存放起来,后期通过定时任务或者人工补偿形式。

如果是重试多次还是失败消息,需要重新发布消费者版本实现消费

可以使用死信队列

Mq在重试的过程中,有可能会引发消费者重复消费的问题。

Mq消费者需要解决 幂等性问题

幂等性 保证数据唯一

方式1:

生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中。

Msg id=123456

Msg id=123456

Msg id=123456

消费者获取到我们该消息,可以根据该全局唯一id实现去重复。

全局唯一id 根据业务来定的  订单号码作为全局的id

实际上还是需要再db层面解决数据防重复。

业务逻辑是在做insert操作 使用唯一主键约束

业务逻辑是在做update操作 使用乐观锁

  1. 当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)
  2. 应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿
如何合理选择消息重试
  1. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试 ?
  2. 消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。

Rabbitmq如何开启重试策略

spring:
 
rabbitmq:
   
####连接地址
   
host: 127.0.0.1
   
####端口号
   
port: 5672
   
####账号
   
username: guest
   
####密码
   
password: guest
   
### 地址
   
virtual-host: /booya_rabbitmq
   
listener:
     
simple:
       
retry:
         
####开启消费者(程序出现异常的情况下会)进行重试
         
enabled: true
         
####最大重试次数
         
max-attempts: 5
         
####重试间隔次数
         
initial-interval: 3000

消费者重试过程中,如何避免幂等性问题

重试的过程中,为了避免业务逻辑重复执行,建议提前全局id提前查询,如果存在

的情况下,就无需再继续做该流程。

重试的次数最好有一定间隔次数,在数据库底层层面保证数据唯一性,比如加上唯一id

SpringBoot开启消息确认机制
配置文件新增
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: /booyaVirtualHostslistener:simple:retry:####开启消费者(程序出现异常的情况下会)进行重试enabled: true####最大重试次数max-attempts: 5####重试间隔次数initial-interval: 3000acknowledge-mode: manualdatasource:url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8username: rootpassword: rootdriver-class-name: com.mysql.jdbc.Driver
消费者ack代码
@Slf4j@Component@RabbitListener(queues = "fanout_order_queue")public class FanoutOrderConsumer {@Autowiredprivate OrderManager orderManager;@Autowiredprivate OrderMapper orderMapper;@RabbitHandlerpublic void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {//        try {log.info(">>orderEntity:{}<<", orderEntity.toString());String orderId = orderEntity.getOrderId();if (StringUtils.isEmpty(orderId)) {log.error(">>orderId is null<<");return;}OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);if (dbOrderEntity != null) {log.info(">>该订单已经被消费过,无需重复消费!<<");// 无需继续重试channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}int result = orderManager.addOrder(orderEntity);log.info(">>插入数据库中数据成功<<");if (result >= 0) {// 开启消息确认机制      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//        int i = 1 / 0;//        } catch (Exception e) {//            // 将失败的消息记录下来,后期采用人工补偿的形式//        }}}


http://www.ppmy.cn/news/1537108.html

相关文章

ubuntu 18.04虚拟机以太网网段与地平线J6板端连接配置

目录 1、板端IP地址确认 2、本机以太网端ip设置 3、虚拟机设置nat模式 4、虚拟机虚拟网络编辑 5、虚拟机端ip设置 6、虚拟机端验证是否可以ping通板端ip及是否可以联网 1、板端IP地址确认 ip&#xff1a;192.168.98.233 2、本机以太网端ip设置 ip&#xff1a; 3、虚拟…

android 全面屏最底部栏沉浸式

Activity的onCreate方法中添加 this.getWindow().addFlags(WindowManager.LayoutParams.FLAG_TRANSLUCENT_NAVIGATION); Android 系统 Bar 沉浸式完美兼容方案自 Android 5.0 版本&#xff0c;Android 带来了沉浸式系统 ba - 掘金 (juejin.cn)https://juejin.cn/post/7075578…

PHP魔幻(术)方法

PHP中的魔幻方法&#xff0c;也被称为魔术方法&#xff08;Magic Methods&#xff09;&#xff0c;是一组具有特殊功能的方法。这些方法在PHP中有固定的名称&#xff0c;并且会在特定的时机自动被PHP调用&#xff0c;而无需开发者显式调用。它们通常用于执行一些特殊的操作&…

4.资源《Arduino UNO R3 proteus 电机PID参数整定工程文件(含驱动代码)》说明。

资源链接&#xff1a; Arduino UNO R3 proteus 电机PID参数整定工程文件&#xff08;含驱动代码&#xff09; 1.文件明细&#xff1a; 2.文件内容说明 包含&#xff1a;proteus工程&#xff0c;内含设计图和工程代码。 3.内容展示 4.简述 工程功能可以看这个视频 PID仿真调…

npm的实现原理

npm&#xff08;Node Package Manager&#xff09;是Node.js的默认包管理器&#xff0c;用于管理JavaScript项目的依赖关系、脚本运行和包发布。理解npm的实现原理有助于更高效地使用它&#xff0c;优化项目的依赖管理&#xff0c;并在遇到问题时更快地定位和解决。以下将详细介…

Linux 查看当前正在使用的 Bash 版本

在 Linux 系统中&#xff0c;你可以通过几种不同的方法来查看当前正在使用的 Bash 版本。以下是一些常用的方法&#xff1a; 方法一&#xff1a;使用 bash --version 命令 打开终端&#xff0c;然后输入以下命令&#xff1a; bash --version这将显示 Bash 的版本信息&#x…

使用 Pandas 处理 .xlsx 文件的教程(Python)

使用 Pandas 处理 .xlsx 文件的教程 Pandas 是 Python 数据分析的核心库之一&#xff0c;它提供了丰富的数据处理功能&#xff0c;尤其在处理表格数据&#xff08;如 .xlsx 文件&#xff09;时非常强大。Pandas 结合了 Python 的灵活性和简洁性&#xff0c;让用户能够轻松地进…

NDC美国药品编码目录数据库查询方法

NDC&#xff08;National Drug Code&#xff09;翻译为“国家药品代码”&#xff0c;是美国食品药品监督管理局&#xff08;FDA&#xff09;制定的一种药品标识系统&#xff0c;用于唯一标识药品。这个编码系统主要目的是为精准识别和追踪不同药品而建设&#xff0c;行业人员和…