RabbitMQ5-死信队列

news/2025/2/1 9:32:40/

目录

死信的概念

死信的来源

死信实战

死信之TTl

死信之最大长度

死信之消息被拒


死信的概念

死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息 TTL 过期

    TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 mq 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue=false

死信实战

死信之TTl

消费者 C1:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度

消费者 C1:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//设置正常队列的长度限制params.put("x-max-length",6);//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin.two;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}

死信之消息被拒

拒收消息 "info5"

消费者 C1:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 死信队列 - 消费者1**/
public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定:队列、交换机、路由键(routingKey)channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常队列绑定死信队列信息Map<String, Object> params = new HashMap<>();//正常队列设置死信交换机,参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信 routing-key,参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");//正常队列String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启手动应答channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});}}

生产者:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//该信息是用作演示队列个数限制for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());System.out.println("生产者发送消息:" + message);}}
}

消费者 C2:

package com.qcby.sixin.three;import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收到消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}


http://www.ppmy.cn/news/1568392.html

相关文章

第26篇 基于ARM A9处理器用C语言实现中断<二>

Q&#xff1a;基于ARM A9处理器怎样编写C语言工程&#xff0c;使用按键中断将数字显示在七段数码管上呢&#xff1f; A&#xff1a;基本原理&#xff1a;主程序需要首先调用子程序set_A9_IRQ_stack()初始化IRQ模式的ARM A9堆栈指针&#xff1b;然后主程序调用子程序config_GIC…

控件【QT】

文章目录 控件QWidgetenabledgeometrysetGeometry qrcwindowOpacityQPixmapfonttoolTipfocusPolicystyleSheetQPushButtonRadio ButtionCheck Box显示类控件 控件 Qt中已经提供了很多内置的控件了(按钮,文本框,单选按钮,复选按钮&#xff0c;下拉框…&#xff09; Qt中的各种…

密码学的数学基础1-整数 素数 和 RSA加密

数学公式推导是密码学的基础, 故开一个新的课题 – 密码学的数学基础系列 素数 / 质数 质数又称素数。 一个大于1的自然数&#xff0c;除了1和它自身外&#xff0c;不能被其他自然数整除的数叫做质数&#xff1b;否则称为合数&#xff08;规定1既不是质数也不是合数&#xff0…

进程池的制作(linux进程间通信,匿名管道... ...)

目录 一、进程间通信的理解 1.为什么进程间要通信 2.如何进行通信 二、匿名管道 1.管道的理解 2.匿名管道的使用 3.管道的五种特性 4.管道的四种通信情况 5.管道缓冲区容量 三、进程池 1.进程池的理解 2.进程池的制作 四、源码 1.ProcessPool.hpp 2.Task.hpp 3…

vue框架技术相关概述以及前端框架整合

vue框架技术概述及前端框架整合 1 node.js 介绍&#xff1a;什么是node.js Node.js就是运行在服务端的JavaScript。 Node.js是一个事件驱动I/O服务端JavaScript环境&#xff0c;基于Google的V8引擎。 作用 1 运行java需要安装JDK&#xff0c;而Node.js是JavaScript的运行环…

Linux网络编程中的零拷贝:提升性能的秘密武器

在当今数字化时代&#xff0c;网络应用的性能至关重要。而在网络编程中&#xff0c;数据传输的效率直接影响着应用的整体性能。传统的数据传输方式往往涉及大量的数据拷贝和上下文切换&#xff0c;这在高并发、大数据量的场景下&#xff0c;会成为性能瓶颈。零拷贝技术的出现&a…

AI赋能未来:Agent能力与AI中间件平台对行业的深远影响

引言 随着人工智能技术的快速发展&#xff0c;AI正在逐步渗透到各行各业。尤其是在“Agent”能力的提升以及AI中间件平台&#xff08;如Dify&#xff09;开源生态的推动下&#xff0c;技术的入门门槛显著降低&#xff0c;越来越多的开发者、企业甚至非技术人员都能参与到AI的应…

Python 梯度下降法(三):Adagrad Optimize

文章目录 Python 梯度下降法&#xff08;三&#xff09;&#xff1a;Adagrad Optimize一、数学原理1.1 介绍1.2 符号定义1.3 实现流程 二、代码实现2.1 函数代码2.2 总代码 三、优缺点3.1 优点3.2 缺点 Python 梯度下降法&#xff08;三&#xff09;&#xff1a;Adagrad Optimi…