RabbitMQ之死信队列

news/2025/1/31 7:19:21/

1 概念

​ 死信,就是无法被消费的消息。

​ 一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,如果没有后续的处理,就变成了死信。

​ 应用场景:为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。或者用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2 来源

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false

3 实战

3.1 代码架构图

在这里插入图片描述

3.2 消息TTL过期

import com.lv.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:31**      死信队列 之 生产者*/
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置TTL时间  10sAMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("10000").build();}
}
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}
}

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)在这里插入图片描述

import com.lv.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:53*      死信队列*      消费者02**/
public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息。。。。");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8"));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});}
}

在这里插入图片描述

6.3.3 队列达到最大长度

  • 消息生产者代码去掉TTL属性

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:31**      死信队列 之 生产者*/
    public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置TTL时间  10s
    //        AMQP.BasicProperties properties =
    //                new AMQP.BasicProperties()
    //                    .builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
    }
    
  • C1 消费者修改后的代码(启动后关闭消费者,模拟其接收不到消息)

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
    import java.util.Map;/*** @Author:晓风残月Lx* @Date: 2022/10/25 19:39* <p>* 死信队列* 消费者 1*/public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列的名称private static final String NORMAL_QUEUE = "normal_queue";// 死信队列的名称private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");// 设置正常队列的长度的限制params.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}
    }

    注意此时需要把原先队列删除 因为参数改变了

  • C2 不变

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:53*      死信队列*      消费者02**/
    public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息。。。。");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8"));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});}
    }
    

在这里插入图片描述

6.3.4 消息被拒

  • 消息生产者代码不变

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:31**      死信队列 之 生产者*/
    public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置TTL时间  10s
    //        AMQP.BasicProperties properties =
    //                new AMQP.BasicProperties()
    //                    .builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
    }
    
  • C1消费者代码(启动后关闭该消费者 模拟其接收不到消息)

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
    import java.util.Map;/*** @Author:晓风残月Lx* @Date: 2022/10/25 19:39* <p>* 死信队列* 消费者 1*/
    public class Consumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();// 声明死信和普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> params = new HashMap<>();// 过期时间 10s//arguments.put("x-message-ttl",10000);  由生产者设置过期时间 更加灵活// 普通队列设置死信交换机params.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信RoutingKeyparams.put("x-dead-letter-routing-key", "lisi");// 设置正常队列的长度的限制
    //        params.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与普通的队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接收消息。。。。。");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody(), "UTF-8");if (msg.equals("info5")) {System.out.println("Consumer01接收的消息是:" + msg + ":此消息是被C1拒绝的");// 被拒到死信队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);}System.out.println("Consumer01接收的消息是:" + msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}
    }
    

在这里插入图片描述

  • C2不变(先启动 1 在启动 2)

    import com.lv.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;/*** @Author:晓风残月Lx* @Date: 2022/10/25 20:53*      死信队列*      消费者02**/
    public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息。。。。");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02接受的消息是:"+ new String(message.getBody(),"UTF-8"));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});}
    }
    

在这里插入图片描述


http://www.ppmy.cn/news/78316.html

相关文章

定了 香港新政6月1日生效 散户交易加密货币正式合法化!

如今&#xff0c;香港虚拟资产交易的各项准备工作已准备就绪。5月23日&#xff0c;香港证监会&#xff08;SFC&#xff09;详细介绍了各界参与虚拟资产交易的咨询总结文件&#xff0c;宣布《适用于虚拟资产交易平台营运者的指引》将于2023年6月1日生效。 SFC行政总裁梁凤仪表示…

「实在RPA·电力数字员工」助推电力行业提质增效

一、电力行业数字化转型的重要性: 电力行业作为节能减排的关键&#xff0c;其数字化转型是推动碳达峰、碳中和目标如期实现的重要一环。实现“双碳”目标&#xff0c;能源是主战场&#xff0c;电力是主力军。对此&#xff0c;国家有关部门出台了一系列引导相关产业数字化发展的…

jar命令打包java应用和java打jar包的几种方式详解

一、Jar命令打包java应用的用法 jar是标准的java打包命令&#xff0c;位于JAVA_HOME/bin/目录下面。主要功能是将多个文件打包成一个单独的jar文件。 创建jar文件 jar c[v0Mmfe] [manifest] [jarfile] [entrypoint] [-C dir] inputfiles [-Joption] 更新jar文件 jar u[v0Mmfe…

自动化测试-DevOps如何实施?看看10年测试大佬的总结...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Selenium4自动化测…

算法题:20. 有效的括号

一、题目&#xff1a;20. 有效的括号 给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’&#xff0c;‘}’&#xff0c;‘[’&#xff0c;‘]’ 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括…

【杂记】JUC高并发容器

1.java的基础容器有哪些&#xff1f;哪些是线程不安全的&#xff1f;哪些有是线程安全的&#xff1f; java的基础容器有四大主要类&#xff1a;List、Set、Queue、Map四个大类&#xff0c;其中ArrayList、LinkedLsit、HashMap都是线程不安全的&#xff1b;VerCtor、HashTbale、…

【P28】JMeter 测试活动(Flow Control Action)

文章目录 一、测试活动&#xff08;Flow Control Action&#xff09;参数说明二、测试计划设计2.1、Pause2.2、Break Current Loop2.3、Stop 一、测试活动&#xff08;Flow Control Action&#xff09;参数说明 控制取样器流程 选择线程组右键 >>> 添加 >>>…

Java之旅(一)

Java简介&#xff1a; Java是一种广泛使用的编程语言&#xff0c;由詹姆斯高斯林&#xff08;James Gosling&#xff09;在1991年开发&#xff0c;后由Sun Microsystems&#xff08;现为Oracle公司的一部分&#xff09;推广。Java的主要特点是跨平台性、面向对象、安全性和稳健…