SpringBoot 整合 RabbitMQ 实现延迟消息

embedded/2024/10/16 2:27:30/

一、业务场景说明

用于解决用户下单以后,订单超时如何取消订单的问题。

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

相关概念:(死信队列与延迟队列)

        死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

        死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

死信队列(Dead Letter Queue, DLQ)

目的: 处理无法成功消费的消息。

特点:

  1. 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
  2. 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
  3. 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。

实现方式:

  • 需要配置主队列和死信队列之间的关系。
  • 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。

延迟队列(Delayed Queue)

目的: 控制消息的处理时间。

特点:

  1. 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
  2. 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
  3. 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。

实现方式:

  • 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
  • 可以配置延迟时间,确保消息在设定的时间后进入主队列。

总结

  • 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
  • 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。

二、整合RabbitMQ实现延迟消息

整合依赖及配置

●在pom.xml中添加AMQP相关依赖;

<!--Spring AMQP相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 修改application.yml文件,在spring节点下添加RabbitMQ相关配置。
spring:rabbitmq:host: localhostport: 5672virtual-host: /mallusername: mallpassword: mallpublisher-returns: true #消息发送到队列确认publisher-confirm-type: simple #消息发送到交换器确认

实现延迟消息

  • 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
java">/*** @description 消息队列枚举配置*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换机名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
  • 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
java">/*** @description 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
  • 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;

  • 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
  • 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
java">/*** @description 取消订单消息的发出者*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}
  • 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
java">/*** @description 取消订单消息的处理者*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}
  • 添加OmsPortalOrderService接口;
java">/*** @description 前台订单管理Service*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}
  • 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
java">/*** @description 前台订单管理Service*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}
  • 添加OmsPortalOrderController定义接口;
java">/*** @description 订单管理Controller*/
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}

延迟消息功能演示

  • 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/

可以看到,经过了30秒后延迟消息发送出去了 


http://www.ppmy.cn/embedded/96935.html

相关文章

leetcode 885. Spiral Matrix III

题目链接 You start at the cell (rStart, cStart) of an rows x cols grid facing east. The northwest corner is at the first row and column in the grid, and the southeast corner is at the last row and column. You will walk in a clockwise spiral shape to visi…

优先级队列的实现

什么是优先级队列 优先级队列是一种特殊的数据结构&#xff0c;它类似于队列或栈&#xff0c;但是每个元素都关联有一个优先级或权重。在优先级队列中&#xff0c;元素的出队顺序不是简单地按照它们进入队列的先后顺序&#xff08;先进先出&#xff0c;FIFO&#xff09;&#…

【安卓】多线程编程

文章目录 线程的简单应用解析异步消息处理机制使用AsyncTask 线程的简单应用 新建一个AndroidThreadTest项目&#xff0c;然后修改activity_main.xml中的代码。 <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width…

Pytorch:加载断点(pth)权重参数

一、保存的模型参数及权重 #保存模型 torch.save(model_object,resnet.pth) #加载模型 modeltorch.load(resnet.pth)二、仅保存模型的权重 torch.save(my_resnet.state_dict(),"resnet.pth")resnet_model.load_state_dict(torch.load("resnet.pth"))三、仅…

ilo地址是什么

ilo地址是什么&#xff1f; iLO 地址一般是服务器的专用网络接口的 IP 地址&#xff0c;用于在服务器本地控制台不可用时对服务器进行远程管理和监控&#xff0c;例如进行远程开机、关机、安装操作系统、查看硬件状态等操作。 要获取 iLO 地址&#xff0c;通常可以在服务器的…

Redis的缓存淘汰策略

1. 查看Redis 最大的占用内存 打开redis配置文件, 设置maxmemory参数&#xff0c;maxmemory 是bytes字节类型, 注意转换 2. Redis默认内存多少可以用 注意: 在64bit系统下&#xff0c; maxmemory 设置为 0 表示不限制Redis内存使用 3. 一般生产上如何配置 一般推荐Redis 设置内…

IP基础(通俗易懂版)

IP 位于 TCP/IP 参考模型的第三层&#xff0c;也就是⽹络层。 ⽹络层的主要作⽤是&#xff1a;实现主机与主机之间的通信&#xff0c;也叫点对点通信。 1 、网络层&#xff08; IP) 与数据链路层 (MAC) 有什么关系呢&#xff1f; MAC 的作用&#xff1a; 实现【直连】的两个…

设计模式 - 状态模式

目录 1. 前言 2. 基本原理 3. UML模型 4. 例程 1. 前言 状态模式作为设计模式的一种&#xff0c;主要用于根据状态的改变执行不同的动作,它允许一个对象在其内部状态改变时改变它的行为。状态模式主要解决的是当控制一个对象状态转换的条件表达式过于复杂时的情况。把状态的…