RabbitMQ死信队列

news/2025/2/15 4:46:21/

目录

一、概念

二、出现死信的原因

三、实战

(一)代码架构图

(二)消息被拒

(三)消息TTL过期

(四)队列达到最大长度


一、概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

二、出现死信的原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq )
  • 消息被拒绝(basic.reject basic.nack)并且 requeue=false.

三、实战

(一)代码架构图

 

(二)消息被拒

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());if (msg.equals("info5")) {System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("consumer01接收到消息:" + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

C2 消费者代码不变
启动消费者 1 然后再启动消费者 2

 

 

(三)消息TTL过期

生产者代码
public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}
消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}

 

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收死信队列消息.....");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02 接收死信队列的消息:" + new String(message.getBody()));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});}
}

 

 

(四)队列达到最大长度

我们在声明普通队列时添加一个参数x-max-length即可

生产者

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}
C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息),这里设置了队列最多容纳6条消息,此时由于生产者发送了10条消息,所以有4条会进入死信队列。
public class Consumer01 {public static final String NORMAL_QUEUE = "normal_queue";public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_QUEUE = "dead_queue";public static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");Map<String, Object> arguments = new HashMap<>();// 普通队列设置对应的交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put("x-dead-letter-routing-key", "lisi");// 设置队列最大长度arguments.put("x-max-length", 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("consumer01接收到消息:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
}
注意此时需要把原先队列删除 因为参数改变了
C2 消费者代码不变(启动 C2 消费者)

 

总结

死信队列可能出现的三种情况为:

①消息被拒(消费者方拒绝签收该消息)

②消息设置的TTL过期(生产者方设置的过期时间)

③消息投放的队列内消息已经满了,放不进入时消息会进入死信队列


http://www.ppmy.cn/news/28624.html

相关文章

【项目实战】从0开始入门JDK源码 - LinkedList源码

一、源码位置 一般来说IDEA配置好JDK以后 ,JDK的源码其实也配置好了,本文是基于JDK1.8的源码说明 rt - java - util - LinkedList 二、 继承关系图 LinkedList public class LinkedList<E>extends AbstractSequentialList<E>implements

Redis常见的数据类型命令

文章目录Redis 常见的数据类型及命令一、常见的NoSQL二、Redis 简介三、key 键的一些操作命令四、Redis的五种基本数据结构1、String&#xff08;字符串&#xff09;介绍常用命令1.1 set/get1.2 append1.3 strlen1.4 setex1.5 mset/mget1.6 setrange/getrange1.7 setnx1.8 incr…

【2363. 合并相似的物品】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你两个二维整数数组 items1 和 items2 &#xff0c;表示两个物品集合。每个数组 items 有以下特质&#xff1a; items[i] [valuei, weighti] 其中 valuei 表示第 i 件物品的 价值 &#xff0c;we…

C语言之练习题合集

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; 文章目录leetcode 题号&#xff1a;728. 自除数leetcode 题号&#xff1a;238.…

zabbix部署

文章目录前言一、zabbix简介二、zabbix下载与部署三、部署完成、访问前端测试前言 一、zabbix简介 Zabbix 是一个企业级分布式开源监控解决方案。Zabbix 软件能够监控众多网络参数和服务器的健康度、完整性。Zabbix 使用灵活的告警机制&#xff0c;允许用户为几乎任何事件配置…

Qt之QTableView自定义排序/过滤(QSortFilterProxyModel实现,含源码+注释)

一、效果示例图 1.1 自定义表格排序示例图 本文过滤条件为行索引取余2等于0时返回true&#xff0c;且从下图中可以看到&#xff0c;奇偶行是各自挨在一起的。 1.2 自定义表格过滤示例图 下图添加两列条件&#xff08;当前数据大于当前列条件才返回true&#xff0c;且多个列…

学习 Python 之 Pygame 开发魂斗罗(五)

学习 Python 之 Pygame 开发魂斗罗&#xff08;五&#xff09;继续编写魂斗罗1. 加载地图2. 修改角色尺寸和地面高度3. 添加玩家镜头移动4. 修改子弹的发射位置继续编写魂斗罗 在上次的博客学习 Python 之 Pygame 开发魂斗罗&#xff08;四&#xff09;中&#xff0c;我们完成…

Allegro如何删除铜皮上多余的空洞操作指导

Allegro如何删除铜皮上多余的空洞操作指导 在做PCB设计的时候,设计铜皮的时候是不希望铜皮上有多余的空洞的,设计完成前需要把多余的空洞删除,如下图 如何删除,具体操作如下 点击Shape点击Manual Void/Cavity