使用RabbitMQ实现延迟消息的完整指南

devtools/2024/10/22 6:08:37/

在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。

本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。

什么是延迟消息?

延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。

RabbitMQ中的延迟消息原理

在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:

  1. TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
  2. 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
  3. x-dead-letter-exchangex-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。

 

 消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。

1. RabbitMQ的配置

首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。

java">package com.heima.stroke.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 延迟时间 单位:毫秒 (这里设为30秒)private static final long DELAY_TIME = 1000 * 30;// 行程超时队列public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";// 行程死信队列public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";// 行程超时队列交换机public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";// 行程死信队列交换机public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";// 行程超时交换机 Routing Keypublic static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";// 行程死信交换机 Routing Keypublic static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";/*** 声明行程超时队列,并设置其参数* x-dead-letter-exchange:绑定的死信交换机* x-dead-letter-routing-key:死信路由Key* x-message-ttl:消息的过期时间*/@Beanpublic Queue strokeOverQueue() {Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}@Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}@Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);}
}

解释:

TTL设置:我们通过x-message-ttl设置消息的过期时间为30秒。

死信队列绑定:通过x-dead-letter-exchangex-dead-letter-routing-key设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。

2. 生产者发送延迟消息

接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。

java">package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MQProducer {private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);@AutowiredRabbitTemplate rabbitTemplate;/*** 发送延时消息到行程超时队列** @param strokeVO 消息体*/public void sendOver(StrokeVO strokeVO) {String mqMessage = JSON.toJSONString(strokeVO);logger.info("send timeout msg:{}", mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);}
}

解释:

sendOver 方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。

3. 消费者监听死信队列

当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。

j

java">package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
public class MQConsumer {private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);@Autowiredprivate StrokeHandler strokeHandler;/*** 监听死信队列** @param message 消息体* @param channel RabbitMQ的Channel* @param tag 消息的Delivery Tag*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key = RabbitConfig.STROKE_DEAD_KEY)})@RabbitHandlerpublic void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);logger.info("get dead msg:{}", message.getBody());if (strokeVO == null) {return;}try {// 处理超时的行程消息strokeHandler.timeoutHandel(strokeVO);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}
}

解释:

@RabbitListener 注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。

使用 channel.basicAck(tag, false) 手动确认消息处理成功,确保消息不会重复消费。

4. 处理超时业务逻辑

在我们的业务中,当消息超时未处理时,将其状态设置为超时。

java">public void timeoutHandel(StrokeVO strokeVO) {// 获取司机行程ID和乘客行程IDString inviterTripId = strokeVO.getInviterTripId();String inviteeTripId = strokeVO.getInviteeTripId();// 检查邀请状态是否为未确认String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新为超时状态redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));}
}


http://www.ppmy.cn/devtools/127753.html

相关文章

tkintrt.Button位置试炼——计算器“键盘”

pack、grid、place由你摔摆&#xff0c;pack简单易用&#xff1b;grid网格安排&#xff1b;place最最随意&#xff0c;位置最细粒度随你捏拿。 (笔记模板由python脚本于2024年10月21日 19:21:44创建&#xff0c;本篇笔记适合喜欢python喜欢GUI的coder翻阅) 【学习的细节是欢悦的…

理解ES6中的Generator

Generator是ES6引入的一种特殊的函数&#xff0c;允许函数执行过程可以暂停和恢复&#xff0c;具有异步编程的优势。通过function*声明生成器函数&#xff0c;使用yield关键字来暂停函数执行&#xff0c;并通过next()方法来恢复执行。 特点与机制&#xff1a; 暂停执行&#…

工程师 - Toradex公司简介

官网&#xff1a;Single Board Computers (SBCs), Computer on Modules, System on Modules Toradex是一家专注于嵌入式计算解决方案的全球性科技公司。作为嵌入式计算机模块和单板计算机的领先供应商&#xff0c;Toradex致力于为客户提供高品质、可靠的产品和服务。 公司背景 …

Word、PDF转换为图片Java

Word、PDF转换为图片Java 需求要在小程序端展示文档内容&#xff0c;所以将文档每页转换为图片后显示 参考和其他等方案&#xff1a; https://blog.csdn.net/strggle_bin/article/details/140599514 https://www.modb.pro/db/566986 https://blog.csdn.net/spring_is_comin…

处理Hutool的Http工具上传大文件报OOM

程序环境 JDK版本&#xff1a; 1.8Hutool版本&#xff1a; 5.8.25 问题描述 客服端文件上传主要代码&#xff1a; HttpRequest httpRequest HttpUtil.createPost(FILE_UPLOAD_URL); Resource urlResource new UrlResource(url, fileName); httpRequest.form("file&q…

每日一题|910.最小差值II|数组排序思路、单调性

显然&#xff0c;较小的值增加&#xff0c;较大的值减小&#xff0c;能够得到较小的差值。 为此&#xff0c;我们用一个i把当前的排序后的数组分成两个部分&#xff0c;i以前的部分是较小值&#xff0c;i以后是较大值。 前一个部分增加k&#xff0c;后一个部分减小k。我们思考…