分布式中间件:RabbitMQ死信队列和延迟队列

news/2025/3/28 9:34:17/

分布式中间件:RabbitMQ死信队列和延迟队列

引言

分布式系统的开发中,消息队列是一种常用的通信机制,它可以帮助我们实现系统之间的解耦、异步处理和流量削峰等功能。RabbitMQ 作为一款功能强大的消息队列中间件,提供了许多高级特性,其中死信队列和延迟队列在实际应用中非常实用。本文将详细介绍 RabbitMQ 的死信队列和延迟队列,并结合具体的代码示例进行讲解。

RabbitMQ 配置类

首先,我们来看一个 RabbitMQ 的配置类,这个配置类定义了一些基本的 RabbitMQ 配置,包括单一消费者实例配置、多个消费者实例配置以及自定义的 RabbitMQ 发送消息组件。

@Configuration
public class RabbitmqConfig {@Autowiredprivate Environment environment; // 注入rabbitmq环境@Autowiredprivate CachingConnectionFactory connectionFactory; // 注入rabbitmq连接工厂@Autowiredprivate SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; // 注入rabbitmq配置// 单一消费者实例配置@Bean("singleListenerContainer")public SimpleRabbitListenerContainerFactory singleListenerContainer() {// 创建一个监听容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置连接工厂factory.setConnectionFactory(connectionFactory);// 设置消息转换器factory.setMessageConverter(new Jackson2JsonMessageConverter());// 设置并发消费factory.setConcurrentConsumers(1);// 设置最大并发消费factory.setMaxConcurrentConsumers(1);// 设置最大单条消费消息factory.setPrefetchCount(1);return factory;}// 多个消费者实例@Bean("multiListenerContainer")public SimpleRabbitListenerContainerFactory multiListenerContainer() {// 创建一个监听容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置连接工厂factoryConfigurer.configure(factory, connectionFactory);// 设置消息转换器factory.setMessageConverter(new Jackson2JsonMessageConverter());// 设置手动提交factory.setAcknowledgeMode(AcknowledgeMode.NONE);// 设置并发消费factory.setConcurrentConsumers(10);// 设置最大并发消费factory.setMaxConcurrentConsumers(20);// 设置最大单条消费消息factory.setPrefetchCount(10);return factory;}// 自定义RabbitMQ发送消息组件@Beanpublic RabbitTemplate rabbitTemplate() {// 设置消息发送确认connectionFactory.setPublisherConfirms(true);// 设置消息发送返回connectionFactory.setPublisherReturns(true);// 创建rabbitTemplateRabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置消息发送确认回调rabbitTemplate.setMandatory(true);// 设置消息发送确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.out.println("消息发送失败:" + cause + correlationData.toString());}});// 设置消息发送返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息丢失:exchange:" + exchange + ",route:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText);});// 设置消息转换器rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}

代码解释

  1. 单一消费者实例配置(singleListenerContainer

    • 创建了一个 SimpleRabbitListenerContainerFactory 实例,用于处理单一消费者的消息监听。
    • 设置了连接工厂、消息转换器、并发消费者数量、最大并发消费者数量和最大单条消费消息数量。
  2. 多个消费者实例配置(multiListenerContainer

    • 同样创建了一个 SimpleRabbitListenerContainerFactory 实例,用于处理多个消费者的消息监听。
    • 配置了手动提交模式,设置了更大的并发消费者数量和最大并发消费者数量。
  3. 自定义 RabbitMQ 发送消息组件(rabbitTemplate

    • 设置了消息发送确认和返回机制。
    • 定义了消息发送确认回调和返回回调,用于处理消息发送成功和失败的情况。
    • 设置了消息转换器,使用 Jackson2JsonMessageConverter 进行消息的序列化和反序列化。

死信队列

什么是死信队列

死信队列(Dead Letter Queue)是一种特殊的队列,当消息满足某些条件时,会被发送到死信队列中。这些条件包括:

  • 消息被拒绝(basic.rejectbasic.nack)并且 requeue 参数设置为 false
  • 消息过期(TTL)。
  • 队列达到最大长度。

死信队列的应用场景

  • 消息重试:当消息处理失败时,可以将消息发送到死信队列,经过一定的处理后再重新发送到原队列进行重试。
  • 记录失败消息:将失败的消息存储在死信队列中,方便后续的排查和处理。

代码示例

以下是一个简单的死信队列配置示例:

@Configuration
public class DeadLetterConfig {// 正常交换机public static final String NORMAL_EXCHANGE = "normal.exchange";// 正常队列public static final String NORMAL_QUEUE = "normal.queue";// 死信交换机public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";// 死信队列public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";// 路由键public static final String ROUTING_KEY = "normal.routing.key";// 声明正常交换机@BeanDirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}// 声明死信交换机@BeanDirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明正常队列并绑定死信交换机@BeanQueue normalQueue() {Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 设置死信路由键args.put("x-dead-letter-routing-key", ROUTING_KEY);return new Queue(NORMAL_QUEUE, true, false, false, args);}// 声明死信队列@BeanQueue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE, true);}// 绑定正常队列和正常交换机@BeanBinding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);}// 绑定死信队列和死信交换机@BeanBinding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);}
}

代码解释

  1. 交换机和队列的声明

    • 声明了正常交换机、正常队列、死信交换机和死信队列。
    • 在正常队列的声明中,通过 args 参数设置了死信交换机和死信路由键。
  2. 绑定关系

    • 将正常队列绑定到正常交换机,将死信队列绑定到死信交换机。

延迟队列

什么是延迟队列

延迟队列是一种可以让消息在指定的时间后才被消费的队列。在 RabbitMQ 中,可以通过消息的 TTL(Time To Live)和死信队列来实现延迟队列的功能。

延迟队列的应用场景

  • 订单超时处理:当用户下单后,如果在一定时间内没有支付,可以使用延迟队列来处理订单超时的情况。
  • 缓存刷新:定期刷新缓存,可以使用延迟队列来实现定时任务。

代码示例

以下是一个简单的延迟队列配置示例:

@Configuration
public class DelayQueueConfig {// 延迟交换机public static final String DELAY_EXCHANGE = "delay.exchange";// 延迟队列public static final String DELAY_QUEUE = "delay.queue";// 死信交换机public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";// 死信队列public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";// 路由键public static final String ROUTING_KEY = "delay.routing.key";// 声明延迟交换机@BeanDirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE);}// 声明死信交换机@BeanDirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明延迟队列并绑定死信交换机@BeanQueue delayQueue() {Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 设置死信路由键args.put("x-dead-letter-routing-key", ROUTING_KEY);// 设置消息的 TTLargs.put("x-message-ttl", 5000); // 5 秒return new Queue(DELAY_QUEUE, true, false, false, args);}// 声明死信队列@BeanQueue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE, true);}// 绑定延迟队列和延迟交换机@BeanBinding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTING_KEY);}// 绑定死信队列和死信交换机@BeanBinding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);}
}

代码解释

  1. 交换机和队列的声明

    • 声明了延迟交换机、延迟队列、死信交换机和死信队列。
    • 在延迟队列的声明中,通过 args 参数设置了死信交换机、死信路由键和消息的 TTL。
  2. 绑定关系

    • 将延迟队列绑定到延迟交换机,将死信队列绑定到死信交换机。

总结

本文介绍了 RabbitMQ 的死信队列和延迟队列的概念、应用场景,并结合具体的代码示例进行了详细的讲解。死信队列和延迟队列在分布式系统中非常实用,可以帮助我们处理消息失败、实现定时任务等功能。通过合理使用这些特性,可以提高系统的可靠性和稳定性。

希望本文对你理解 RabbitMQ 的死信队列和延迟队列有所帮助。如果你有任何疑问或建议,欢迎在评论区留言。


http://www.ppmy.cn/news/1580836.html

相关文章

LLaMA-Factory多机多卡训练实战

https://www.dong-blog.fun/post/1999 参考资料&#xff1a;https://llamafactory.readthedocs.io/zh-cn/latest/advanced/distributed.html 以训练qwen2.5vl 7b 为例子。 创建空间 创建数据集 如果数据集文件非常多,可以选择上tar.gz包,然后再数据集页面面,点击终端进入到…

Git——分布式版本控制工具使用教程

本文主要介绍两种版本控制工具——SVN和Git的概念&#xff0c;接着会讲到Git的安装&#xff0c;Git常用的命令&#xff0c;以及怎么在Vscode中使用Git。帮助新手小白快速上手Git。如果想直接上手用Vscode操作远程仓库则直接看7和9即可&#xff01; 目录 1. SVN和Git介绍 1.1 …

uni-app——网络API

uni-app 网络API 在 uni-app 开发中&#xff0c;网络请求是获取数据与和服务器交互的重要手段。以下介绍 uni-app 中常见的网络 API&#xff0c;包括发起请求、上传和下载以及 WebSocket、UDP 通信等方面。 发起请求 在 uni-app 里&#xff0c;使用uni.request(OBJECT)来发起…

注意力机制之MQA模型与MLA模型

《DeepSeek大模型高性能核心技术与多模态融合开发&#xff08;人工智能技术丛书&#xff09;》(王晓华)【摘要 书评 试读】- 京东图书 高性能注意力机制崛起&#xff1a;GQA与MLA 在人工智能的壮阔征途中&#xff0c;高性能注意力机制的崛起如同一股不可阻挡的浪潮&#xff0…

《Linux系统编程篇》Linux目录动态检测应用 (inotify递归子目录)——工具篇

文章目录 引言利用 Linux inotify 机制实现文件系统监控一、inotify 简介inotify 的主要特点 二、inotify 的工作原理三、使用 inotify 的示例代码代码说明 四、inotify 的实际应用场景五、注意事项与局限性RegisterServerCFD_JJ 结束 引言 笔者在做云相框的缩略图功能时候&am…

Linux C/C++编程——线程

线程是允许应用程序并发执行多个任务的一种机制&#xff0c;线程参与系统调度。 系统调度的最小单元是线程、而并非进程。 线程包含在进程之中&#xff0c;是进程中的实际运行单位。一个线程指的是进程中一个单一顺序的控制流&#xff08;或者说是执行路线、执行流&#xff09;…

ESP32 BLE 初步学习笔记

前言 蓝牙作为一个庞大的知识体系&#xff0c;其学习和运用对于初学者来说显得有些复杂且凌乱。我整理了这段时间的学习笔记&#xff0c;涵盖了协议栈、工作流程、参数等内容。在实际应用中&#xff0c;我们主要使用 GAP 和 GATT&#xff0c;协议栈中的其他部分只需了解即可。…

AI小白的第七天:必要的数学知识(四)

概率 Probability 1. 概率的定义 概率是一个介于 0 和 1 之间的数&#xff0c;表示某个事件发生的可能性&#xff1a; 0&#xff1a;事件不可能发生。1&#xff1a;事件必然发生。0 到 1 之间&#xff1a;事件发生的可能性大小。 例如&#xff0c;掷一枚公平的硬币&#xf…