RabbitMQ实现延迟消息的两种方法(提供延迟插件)

devtools/2024/11/13 5:31:16/

在消息队列(MQ)中实现延迟队列有几种常见方法。以下是两种常见的实现方式:

1. 使用死信队列(DLQ)

这种方法利用了消息的死信特性:

  1. 消息过期时间:为消息设置一个TTL(Time-To-Live)。
  2. 死信交换器(DLX):当消息在队列中过期后,将其转发到一个专门的死信交换器。
  3. 延迟消费:在死信交换器中将消息路由到延迟队列,消费者从延迟队列中消费消息。

适用场景:适合需要较长延迟的场景。

优点

  • 简单易用,不需要额外的插件或复杂配置。

缺点

  • 精度可能不高,取决于TTL设置和消息处理时间。

2. 使用定时队列插件(如RabbitMQ的延迟插件)

通过RabbitMQ的延迟插件可以实现:

  1. 消息头设置:发送消息时,通过设置消息头的x-delay属性来指定延迟时间。
  2. 延迟交换器:消息被投递到延迟交换器,延迟时间到达后再转发到目标队列。

适用场景:需要精确控制延迟时间的场景。

优点

  • 支持毫秒级别的延迟精度。
  • 配置简单,支持动态调节延迟时间。

缺点

  • 需要安装额外的插件,增加了系统复杂性。

插件地址:

链接:https://pan.baidu.com/s/1IUMEk832ymPR6aqqRj0Y_g?pwd=04gk 
提取码:04gk

下面是两种方法的代码示例:

1.使用死信队列

生产者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class ProducerDLQ {private final static String NORMAL_QUEUE = "normal_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 设置死信队列参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx_exchange");args.put("x-message-ttl", 10000); // 消息存活时间,单位为毫秒channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);String message = "Hello World!";channel.basicPublish("", NORMAL_QUEUE, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}

消费者:

 

import com.rabbitmq.client.*;public class ConsumerDLQ {private final static String DLX_QUEUE = "dlx_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare("dlx_exchange", "direct");channel.queueDeclare(DLX_QUEUE, false, false, false, null);channel.queueBind(DLX_QUEUE, "dlx_exchange", "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> { });}}
}

2.使用延迟插件

插件地址文章首部给出

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.AMQP;import java.util.HashMap;
import java.util.Map;public class ProducerDelay {private final static String DELAYED_EXCHANGE = "delayed_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 设置延迟交换器类型Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);String message = "Hello World!";Map<String, Object> headers = new HashMap<>();headers.put("x-delay", 10000); // 延迟时间AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();props.headers(headers);channel.basicPublish(DELAYED_EXCHANGE, "", props.build(), message.getBytes());System.out.println(" [x] Sent '" + message + "' with delay");}}
}

消费者

import com.rabbitmq.client.*;public class ConsumerDelay {private final static String DELAYED_QUEUE = "delayed_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(DELAYED_QUEUE, false, false, false, null);channel.queueBind(DELAYED_QUEUE, "delayed_exchange", "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(DELAYED_QUEUE, true, deliverCallback, consumerTag -> { });}}
}


http://www.ppmy.cn/devtools/98146.html

相关文章

windows C++-windows C++/CX简介(三)

^类型 (^) 是 C/CX 最突出的功能之一——当人们第一次看到 C/CX 代码时&#xff0c;很难不注意到它。那么&#xff0c;^ 类型到底是什么&#xff1f;这是类型是一种智能指针类型&#xff0c;它自动管理 Windows 运行时对象的生命周期&#xff0c;也 提供自动类型转换功能以简化…

ECMAScript性能优化技巧与陷阱

ECMAScript&#xff08;JavaScript&#xff09;的性能优化是一个复杂而重要的话题&#xff0c;尤其是在现代Web开发中。以下是一些常见的性能优化技巧和需要避免的陷阱&#xff1a; 性能优化技巧 避免全局变量&#xff1a; 全局变量会增加查找时间&#xff0c;尽量将变量的作…

Java面试题--JVM大厂篇之高并发Java应用的秘密武器:深入剖析GC优化实战案例

引言: 晚上好,Java开发者们!在高并发的现代应用中,垃圾回收器(GC)是Java性能优化的重要环节。尤其在CMS(Concurrent Mark-Sweep)GC曾经担任主角的日子里,适当的调优和优化措施至关重要。本篇文章将通过三个实际案例,探讨如何在不同场景中优化CMS GC,为你揭示Java性能…

【hot100篇-python刷题记录】【爬楼梯】

R5-真正的动态规划 动态规划核心&#xff1a; 第i步是怎么来的&#xff08;即动态规划公式&#xff09; 走到第i步阶梯的总方法数sum(走到第i-1步阶梯的总方法数&#xff0c;走到第i-2步阶梯的总方法数) class Solution:def climbStairs(self, n: int) -> int:if n<2:r…

电源噪声对高分辨率ADC影响

1 简介 ADC本身需要外部电源供电&#xff0c;而且&#xff0c;与混合信号数据采集系统中的任何其他组件一样&#xff0c;电源也会产生噪声。 电源噪声与其他噪声类似&#xff0c;对系统性能的影响主要取决于电源噪声的级别和类型。例如便携式应用的3V锂离子电池通常比用于测…

Vault密钥管理的基本概述

Vault密钥管理是一种加密密钥管理解决方案&#xff0c;旨在帮助组织管理其加密密钥&#xff0c;并确保这些密钥的安全性和可用性。以下是关于Vault密钥管理的详细介绍&#xff1a; 一、Vault的基本概述 Vault由HashiCorp开发&#xff0c;是一种用于保护、存储和严格控制对令牌、…

Linux 操作系统 --- 信号

序言 在本篇内容中&#xff0c;将为大家介绍在操作系统中的一个重要的机制 — 信号。大家可能感到疑惑&#xff0c;好像我在使用 Linux 的过程中并没有接触过信号&#xff0c;这是啥呀&#xff1f;其实我们经常遇到过&#xff0c;当我们运行的进程当进程尝试访问非法内存地址时…

计算机毕业设计选题推荐-养老院管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…