第十六章 RabbitMQ延迟消息之延迟插件优化

news/2024/10/17 18:49:08/

目录

一、引言

二、优化方案 

三、核心代码实现

3.1. 生产者代码

3.2. 消息处理器 

3.3. 自定义多延迟消息封装类 

3.4. 订单实体类 

3.5. 消费者代码 

四、运行效果


一、引言

上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息(如10分钟),并不是最好的解决方案,因为假如我们的订单是在5分钟支付的,那么剩余的5分钟时间,RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息,那么对于服务来说压力是很大的,同时会耗费庞大昂贵的资源。因此,本章节我们就来近一步对延迟插件的消息进行优化。

我们通过下面的流程图来做近一步分析:

1. 用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:

2. 已支付:更新订单状态为已支付

3. 未支付:更新订单状态为关闭订单,恢复商品库存

常规延迟插件消息使用的弊端总结:

1. 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

2. 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大

3. 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

二、优化方案 

如下图所示,我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。

消息初次发送的延迟时间设定为10s,10s过后如果订单还是未支付状态,我们判断延迟时间数组里还有没有剩余延迟时间,如果有则继续发送延迟消息,时间设定为数组中的第二个时间10s,直到订单支付成功终止循环,或是最后一份时间消耗完依然未支付,我们取消订单。

三、核心代码实现

3.1. 生产者代码

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {Order order = Order.builder().orderId(1L).content("生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!").build();MultiDelayMessage<Order> msg = MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(msg.removeNextDelay());return message;}});}
}

3.2. 消息处理器 

package com.example.publisher;import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** 消息请求处理器*/
@AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final Long delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(delay);return message;}
}

3.3. 自定义多延迟消息封装类 

package com.example.publisher;import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;import java.io.Serializable;
import java.util.List;/*** 自定义的多延时消息封装类* @param <T>*/
@Data
@NoArgsConstructor
public class MultiDelayMessage<T> implements Serializable {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long...delayMillis) {return new MultiDelayMessage<>(data, (List<Long>) CollectionUtils.arrayToList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay() {return delayMillis.remove(0);}/*** 是否还有下一个延迟时间* @return*/public boolean hasNextDelay() {return !delayMillis.isEmpty();}
}

3.4. 订单实体类 

package com.example.publisher;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 订单类 * 此处为了演示,将真实业务中的订单类做了简化* 只包含一个订单ID和自定义消息内容*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {private Long orderId;private String content;
}

3.5. 消费者代码 

package com.example.consumer;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消费者* 因为作为演示,所以商城支付、订单、及扣减库存的业务代码已注释* 注释中保留了整个商城下单支付扣减库存的流程步骤*/
@Slf4j
@Component
public class SimpleListener {@Resourceprivate RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listener(MultiDelayMessage<Order> msg) throws Exception {System.out.println(((Order)msg.getData()).getContent());// 1. 查询订单状态// Order order = orderService.getById(msg.getData())// 2. 判断是否已支付
//        if (Order == null || order.status == 2) {
//            订单不存在或者已处理则直接返回
//            return;
//        }// 主动去支付服务查询真正的支付状态
//        PayOrder payOrder = payService.getById(order.getId());// 2.1. 已支付,则标记订单为已支付
//        if (payOrder.isPay()) {
//            orderService.markOrderPaySuccess(order.getId());
//            return;
//        }// 2.2. 未支付,获取下次订单延迟时间// 3. 判断是否存在延迟时间if (msg.hasNextDelay()) {// 3.1 存在,重发延迟消息Long nextDelay = msg.removeNextDelay();rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(nextDelay);return message;}});return;}// 3.2 不存在,取消订单
//        orderService.lambdaUpdate()
//                .set(Order::getStatus, 5);
//                .set(Order::getCloseTime, LocalDateTime.now());
//                .eq(Order::getId, order.getId())
//                .update();// 4. 恢复库存}
}

四、运行效果

最终我们会看到每间隔一段时间消费者就会消费一条消息,这个间隔时间就是我们设定的分段时间数组,这么做就能极大地减少资源消耗和服务的压力:


http://www.ppmy.cn/news/1539777.html

相关文章

【优选算法】(第三十七篇)

目录 在每个树⾏中找最⼤值&#xff08;medium&#xff09; 题目解析 讲解算法原理 编写代码 最后⼀块⽯头的重量&#xff08;easy&#xff09; 题目解析 讲解算法原理 编写代码 在每个树⾏中找最⼤值&#xff08;medium&#xff09; 题目解析 1.题目链接&#xff1a;…

leetcode 刷题day44动态规划Part13( 647. 回文子串、516.最长回文子序列)

647. 回文子串 动规五部曲&#xff1a; 1、确定dp数组&#xff08;dp table&#xff09;以及下标的含义 按照之前做题的惯性&#xff0c;定义dp数组的时候很自然就会想题目求什么&#xff0c;就如何定义dp数组。但是对于本题来说&#xff0c;这样定义很难得到递推关系&#x…

【原创】java+springboot+mysql智能农村管理系统设计与实现

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…

利用sessionStorage收集用户访问信息,然后传递给后端

这里只是简单的收集用户的停留时间、页面加载时间、当前页面URL及来源页面&#xff0c;以做示例 <html><head><meta http-equiv"content-type" content"text/html; charsetUTF-8"/><title>测试sessionStorage存储用户访问信息<…

面试经典150题刷题记录

数组部分 1. 合并两个有序的子数组 —— 倒序双指针避免覆盖 88. 合并两个有序数组 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使…

高级算法设计与分析 学习笔记14 FFT

​ 本章我们研究多项式乘法。 我们直接乘&#xff0c;时间复杂度是n^2。使用FFT则可以变成nlgn ​编辑 可以看到两个n的多项式&#xff0c;我们直接乘&#xff0c;每种组合都要试一遍&#xff0c;就会要是n^2遍 ​编辑 那么要怎么加速呢&#xff1f; ​编辑 首先多项式可…

数据结构——八大排序(下)

数据结构中的八大排序算法是计算机科学领域经典的排序方法&#xff0c;它们各自具有不同的特点和适用场景。以下是这八大排序算法的详细介绍&#xff1a; 五、选择排序&#xff08;Selection Sort&#xff09; 核心思想&#xff1a;每一轮从未排序的元素中选择最小&#xff0…

SparkSQL介绍及使用

文章目录 1. SparkSQL介绍及使用1.1 SparkSQL介绍1.2 数据结构的形式1.3 Spark SQL 特点1.4 Spark SQL 和 Hive SQL关系 1. SparkSQL介绍及使用 1.1 SparkSQL介绍 Spark SQL是Apache Spark 用于处理结构化数据&#xff08;DataFrame和Datasets&#xff09;的模块。 在Spark1.0…