RabbitMQ - 死信队列,延时队列

news/2024/11/13 4:06:41/

Time-To-Live and Expiration — RabbitMQ

一、死信队列

Dead Letter Exchanges — RabbitMQ

死信队列:

DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

死信消息:

  • 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  • 消息过期(消息TTL过期。TTL:Time To Live的简称,即过期时间)
  • 队列达到最大的长度

过期消息:

在 rabbitmq 中存在2种方法可设置消息的过期时间:

  • 第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间
  • 第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样

如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息

队列设置:在队列申明的时候使用** x-message-ttl **参数,单位为 毫秒;

  • 队列中这个属性的设置要在第一次声明队列的时候设置才有效,如果队列一开始已存在且没有这个属性,则要删掉队列再重新声明才可以。
  • 队列的 TTL 只能被设置为某个固定的值,一旦设置后则不能更改,否则会抛出异常

单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒。

说明:

对于第一种设置队列属性的方法,一旦消息过期,就会从队列中抹去;而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的

 1. 生产者:
  声明队列的时候用属性指定其死信队列交换机名称。

测试:

package rabbitmq;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static ConnectionFactory getConnectionFactory() {// 创建连接工程,下面给出的是默认的caseConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.99.100");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");return factory;}public static void main(String[] args) throws IOException, TimeoutException  {ConnectionFactory connectionFactory = getConnectionFactory();Connection newConnection = null;Channel createChannel = null;try {newConnection = connectionFactory.newConnection();createChannel = newConnection.createChannel();// 声明一个正常的direct类型的交换机createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);// 声明死信交换机为===order.dead.exchangeString dlxName = "order.dead.exchange";createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);// 声明队列并指定死信交换机为上面死信交换机Map<String, Object> arg = new HashMap<String, Object>();arg.put("x-dead-letter-exchange", dlxName);createChannel.queueDeclare("myQueue", true, false, false, arg);String message = "测试消息";createChannel.basicPublish("order.exchange", "routing_key_myQueue", null, message.getBytes());System.out.println("消息发送成功");} catch (Exception e) {e.printStackTrace();} finally {if (createChannel != null) {createChannel.close();}if (newConnection != null) {newConnection.close();}}}
}

结果:

(1)生成两个Exchange

 (2)队列myQueue的死信队列有属性

2. 消费者: 
  一个消费者监听正常队列,一个消费者监听死信队列。(只是绑定的交换机不同)

消费者一:监听正常队列

package rabbitmq;import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Consumer {public static ConnectionFactory getConnectionFactory() {// 创建连接工程,下面给出的是默认的caseConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.99.100");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");return factory;}public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = getConnectionFactory();Connection newConnection = null;Channel createChannel = null;try {newConnection = connectionFactory.newConnection();createChannel = newConnection.createChannel();// 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])createChannel.queueBind("myQueue", "order.exchange", "routing_key_myQueue");createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {System.out.println("consumerTag: " + consumerTag);System.out.println("envelope: " + envelope);System.out.println("properties: " + properties);String string = new String(body, "UTF-8");System.out.println("接收到消息: -》 " + string);long deliveryTag = envelope.getDeliveryTag();Channel channel = this.getChannel();System.out.println("拒绝消息, 使之进入死信队列");System.out.println("时间: " + new Date());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {}// basicReject第二个参数为false进入死信队列或丢弃channel.basicReject(deliveryTag, false);}});} catch (Exception e) {e.printStackTrace();} finally {}}
}

消费者二:监听死信队列

package rabbitmq;import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Consumer2 {public static ConnectionFactory getConnectionFactory() {// 创建连接工程,下面给出的是默认的caseConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.99.100");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");return factory;}public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = getConnectionFactory();Connection newConnection = null;Channel createChannel = null;try {newConnection = connectionFactory.newConnection();createChannel = newConnection.createChannel();// 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue");createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {System.out.println("时间: " + new Date());System.out.println("consumerTag: " + consumerTag);System.out.println("envelope: " + envelope);System.out.println("properties: " + properties);String string = new String(body, "UTF-8");System.out.println("接收到消息: -》 " + string);long deliveryTag = envelope.getDeliveryTag();Channel channel = this.getChannel();channel.basicAck(deliveryTag, true);System.out.println("死信队列中处理完消息息");}});} catch (Exception e) {e.printStackTrace();} finally {}}
}

结果: 消费者一先正常监听到,basicReject为false拒绝后进入死信队列;消费者二监听的死信队列收到消息。

消费者一打出的日志如下:

consumerTag: amq.ctag-0noHs24F0FsGe-dfwwqWNw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 测试消息
拒绝消息, 使之进入死信队列
时间: Sat Nov 07 12:18:44 CST 2020

消费者二打出的日志如下:

时间: Sat Nov 07 12:18:47 CST 2020
consumerTag: amq.ctag-ajYMpMFkXHDiYWkD3XFJ7Q
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 01:52:19 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 测试消息
死信队列中处理完消息息

注意:

  进入死信队列之后,headers 加了一些死信相关的信息,包括原队列以及进入死信的原因。

补充:在队列进入死信队列之前也可以修改其routingKey,而且只有在指定x-dead-letter-exchange的前提下才能修改下面属性,否则会报错

(1)修改生产者声明队列的方式,如下:

// 声明一个正常的direct类型的交换机createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);// 声明死信交换机为===order.dead.exchangeString dlxName = "order.dead.exchange";createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);// 声明队列并指定死信交换机为上面死信交换机Map<String, Object> arg = new HashMap<String, Object>();arg.put("x-dead-letter-exchange", dlxName);// 修改进入死信队列的routingkey,如果不修改会使用默认的routingKeyarg.put("x-dead-letter-routing-key", "routing_key_myQueue_dead");createChannel.queueDeclare("myQueue", true, false, false, arg);

(2)修改监听死信队列的消费者二:

// 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue_dead");

结果,收到消费者二收到的信息如下:

时间: Sat Nov 07 12:27:08 CST 2020
consumerTag: amq.ctag-THqpEdYH_-iNeCIccgpuaw
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue_dead)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 02:00:41 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到消息: -》 测试消息
死信队列中处理完消息

二、延时队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

RabbitMQ本身没提供延时队列,我们可以利用消息的生存时间和死信队列实现延时

典型的应用场景就是订单30分钟内未支付就关闭订单,还有一种场景,账单24小时未确认,就发送提醒消息

延时队列插件安装

2.1.1、yml配置

spring:rabbitmq:host: 192.168.99.12port: 5672username: guestpassword: guest# 发送确认publisher-confirms: true# 路由失败回调publisher-returns: truetemplate:# 必须设置成true 消息路由失败通知监听者,false 将消息丢弃mandatory: true#消费端listener:simple:# 每次从RabbitMQ获取的消息数量prefetch: 1default-requeue-rejected: false# 每个队列启动的消费者数量concurrency: 1# 每个队列最大的消费者数量max-concurrency: 1# 签收模式为手动签收-那么需要在代码中手动ACKacknowledge-mode: manual
#邮件队列
email:queue:name: demo.email#邮件交换器名称
exchange:name: demoTopicExchange#死信队列
dead:letter:queue:name: demo.dead.letterexchange:name: demoDeadLetterTopicExchange#延时队列
delay:queue:name: demo.delayexchange:name: demoDelayTopicExchange

2.1.2、延时队列配置

/*** rabbitmq 配置** @author DUCHONG* @since 2020-08-23 14:05**/
@Configuration
@Slf4j
public class RabbitmqConfig {@Value("${email.queue.name}")private String emailQueue;@Value("${exchange.name}")private String topicExchange;@Value("${dead.letter.queue.name}")private String deadLetterQueue;@Value("${dead.letter.exchange.name}")private String deadLetterExchange;@Value("${delay.queue.name}")private String delayQueue;@Value("${delay.exchange.name}")private String delayExchange;@Beanpublic Queue emailQueue() {Map<String, Object> arguments = new HashMap<>(2);// 绑定死信交换机arguments.put("x-dead-letter-exchange", deadLetterExchange);// 绑定死信的路由keyarguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");return new Queue(emailQueue,true,false,false,arguments);}@BeanTopicExchange emailExchange() {return new TopicExchange(topicExchange);}@BeanBinding bindingEmailQueue() {return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");}//私信队列和交换器@Beanpublic Queue deadLetterQueue() {return new Queue(deadLetterQueue);}@BeanTopicExchange deadLetterExchange() {return new TopicExchange(deadLetterExchange);}@BeanBinding bindingDeadLetterQueue() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");}//延时队列@Beanpublic Queue delayQueue() {return new Queue(delayQueue);}@BeanCustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "topic");//参数二为类型:必须是x-delayed-messagereturn new CustomExchange(delayExchange, "x-delayed-message", true, false, args);}@BeanBinding bindingDelayQueue() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();}
}

2.2、消息发送方

30分钟时间太久了,这里延时2分钟来看效果

@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {@AutowiredRabbitTemplate rabbitTemplate;@Value("${exchange.name}")private String topicExchange;@Value("${delay.exchange.name}")private String delayTopicExchange;@Scheduled(cron = "0 0/1 * * * ?")public void sendEmailMessage() {String msg = RandomStringUtils.randomAlphanumeric(8);JSONObject email=new JSONObject();email.put("content",msg);email.put("to","duchong@qq.com");CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());}@Scheduled(cron = "0 0/1 * * * ?")public void sendDelayOrderMessage() throws Exception{//订单号 id实际是保存订单后返回的,这里用uuid代替String orderId = UUID.randomUUID().toString();// 模拟订单信息JSONObject order=new JSONObject();order.put("orderId",orderId);order.put("goodsName","vip充值");order.put("orderAmount","99.00");CorrelationData correlationData=new CorrelationData(orderId);MessageProperties messageProperties = new MessageProperties();messageProperties.setMessageId(orderId);//30分钟时间太长,这里延时120s消费messageProperties.setHeader("x-delay", 120000);Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);log.info("---发送 order 消息---{}---orderId---{}",order,correlationData.getId());//睡一会,为了看延迟效果TimeUnit.MINUTES.sleep(10);}
}

2.3、消息消费方

@Component
@Slf4j
public class MessageHandler {/*** 邮件发送* @param message* @param channel* @param headers* @throws IOException*/@RabbitListener(queues ="demo.email")@RabbitHandlerpublic void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {try {String msg=new String(message.getBody(), CharEncoding.UTF_8);JSONObject jsonObject = JSON.parseObject(msg);jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));log.info("---接受到消息---{}",jsonObject);//主动异常int m=1/0;//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}", headers.get("spring_returned_message_correlation"));//异常,ture 重新入队,或者false,进入死信队列channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}/*** 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack* @param message*/@RabbitListener(queues = "demo.dead.letter")public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {//可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));//回复ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}/*** 延时队列消费* @param message* @param channel* @param headers* @throws IOException*/@RabbitListener(queues ="demo.delay")@RabbitHandlerpublic void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {try {String msg=new String(message.getBody(), CharEncoding.UTF_8);JSONObject jsonObject = JSON.parseObject(msg);log.info("---接受到订单消息---orderId---{}",message.getMessageProperties().getMessageId());log.info("---订单信息---order---{}",jsonObject);//业务逻辑,根据订单id获取订单信息,如果还未支付,设置关闭状态,如果已支付,不做任何处理//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {log.info("handleOrderDelayMessage捕获到异常,重新入队---orderId---{}", headers.get("spring_returned_message_correlation"));//异常,ture 重新入队,或者false,进入死信队列channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}}

2.4、结果

运行结果显示,同一个订单号的消息,发送过后2分钟,消费者才接受到,符合预期

https://www.cnblogs.com/geekdc/p/13550620.html

消息队列RabbitMQ(五):死信队列与延迟队列

rabbitmq的延迟队列和死信队列_死信队列和延时队列的区别_zhuwenaptx的博客-CSDN博客

RabbitMQ的死信队列和延时队列 - 简书

RabbitMQ死信队列与延迟队列_51CTO博客_rabbitmq延迟队列


http://www.ppmy.cn/news/331829.html

相关文章

stm32f4xx-ADC

文章目录 一、定义二、模数转换过程三、ADC1.精度的理解2.原理图&#xff1a;3.存储对齐方式&#xff1a;4.ADC采集时间5.stm32通道组6.ADC1的通道5&#xff08;PA5)进行单次转化 四、test 一、定义 ADC&#xff0c;Analog-to-Digital Converter的缩写&#xff0c;指模/数转换…

TMS320F28xx ADC转换图解

ADC模块框图ADC Module Block Diagram 说明&#xff1a; 左上角为ADC的核心模块&#xff0c;实现AD转换功能。左下角为参考电压选择模块右上角为模数封装逻辑。 其中&#xff1a; 核心部分为采样保持电路&#xff08;S/H Circuit&#xff09;和AD转换器。有16路启动转换&…

stm32+LCD12864+ADC实现将ADC采样的值实时显示

前言 这篇文章以上一篇文章为基础&#xff0c;着重讲如何将ADC采样得到的值显示在LCD12864上面&#xff0c;关于如何点亮LCD屏幕&#xff0c;以及实物硬件连线图、原理&#xff0c;请参考之前一篇文章。加上上一篇文章&#xff0c;这算是一个十分迷你的项目了~还有一个比较常用…

TI的DRV8841可以被国产电机驱动芯片TMI8263所取代

自2018年1月以来&#xff0c;电机驱动芯片处于持续涨价的状态&#xff0c;涨幅甚至高达15%-20%&#xff0c;并且供货期货期短则14周长则30周。为了降低设计成本&#xff0c;第一时间设计出优秀的产品占据市场高位。笔者推荐拓尔微使用国产电机驱动芯片进行替换设计。拓尔微电子…

TMS320F28374S之DAC

介绍 DAC 模块由一个内部 12 位 DAC 和一个能够驱动外部负载的模拟输出缓冲器组成。 DAC 输出上的集成下拉电阻有助于在禁用输出缓冲器时提供已知的引脚电压。 这个下拉电阻不能被禁用&#xff0c;并且仍然作为引脚上的无源组件&#xff0c;即使对于其他共享的 pinmux 功能也是…

震旦ad369s_震旦ad369s驱动

震旦 AD369s是一款集复印、打印、扫描为一体的多功能一体机&#xff0c;在使用打印机之前需要安装震旦 AD369s对应的驱动&#xff0c;小编提供的驱动能够为用户解决打印机无法被连接或者无法被识别等问题&#xff0c;安装后打印机就可以打印了! 安装说明 1、先选择解压路径&…

K_A11_001 基于STM32等单片机驱动DHT11 串口与OLED0.96双显示

K_A11_001 基于STM32等单片机驱动DHT11 串口与OLED0.96双显示 一、资源说明二、基本参数1.参数2.引脚说明 三、驱动说明时序对应程序: 四、部分代码说明1、接线说明1.1、STC89C52RCDHT11模块1.2、STM32F103C8T6DHT11模块 五、基础知识学习与相关资料下载六、视频效果展示与程序…

基于STM32单片机和AD9850的智能DDS函数信号发生器

CSDN话题挑战赛第2期 参赛话题&#xff1a;学习笔记 文章目录 1、整体设计2、硬件方案3、软件程序4、实物验证 1、整体设计 有一天&#xff0c;我在浏览CSDN时看到一篇关于 AD9850 的帖子。AD9850是一款可以产生1hz到40mhz左右正弦波的芯片。淘宝的产品经销商能够将芯片与提供…