在消息队列(MQ)中实现延迟队列有几种常见方法。以下是两种常见的实现方式:
1. 使用死信队列(DLQ)
这种方法利用了消息的死信特性:
- 消息过期时间:为消息设置一个TTL(Time-To-Live)。
- 死信交换器(DLX):当消息在队列中过期后,将其转发到一个专门的死信交换器。
- 延迟消费:在死信交换器中将消息路由到延迟队列,消费者从延迟队列中消费消息。
适用场景:适合需要较长延迟的场景。
优点:
- 简单易用,不需要额外的插件或复杂配置。
缺点:
- 精度可能不高,取决于TTL设置和消息处理时间。
2. 使用定时队列插件(如RabbitMQ的延迟插件)
通过RabbitMQ的延迟插件可以实现:
- 消息头设置:发送消息时,通过设置消息头的
x-delay
属性来指定延迟时间。 - 延迟交换器:消息被投递到延迟交换器,延迟时间到达后再转发到目标队列。
适用场景:需要精确控制延迟时间的场景。
优点:
- 支持毫秒级别的延迟精度。
- 配置简单,支持动态调节延迟时间。
缺点:
- 需要安装额外的插件,增加了系统复杂性。
插件地址:
链接:https://pan.baidu.com/s/1IUMEk832ymPR6aqqRj0Y_g?pwd=04gk
提取码:04gk
下面是两种方法的代码示例:
1.使用死信队列
生产者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class ProducerDLQ {private final static String NORMAL_QUEUE = "normal_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 设置死信队列参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx_exchange");args.put("x-message-ttl", 10000); // 消息存活时间,单位为毫秒channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);String message = "Hello World!";channel.basicPublish("", NORMAL_QUEUE, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
消费者:
import com.rabbitmq.client.*;public class ConsumerDLQ {private final static String DLX_QUEUE = "dlx_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare("dlx_exchange", "direct");channel.queueDeclare(DLX_QUEUE, false, false, false, null);channel.queueBind(DLX_QUEUE, "dlx_exchange", "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> { });}}
}
2.使用延迟插件
插件地址文章首部给出
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.AMQP;import java.util.HashMap;
import java.util.Map;public class ProducerDelay {private final static String DELAYED_EXCHANGE = "delayed_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 设置延迟交换器类型Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);String message = "Hello World!";Map<String, Object> headers = new HashMap<>();headers.put("x-delay", 10000); // 延迟时间AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();props.headers(headers);channel.basicPublish(DELAYED_EXCHANGE, "", props.build(), message.getBytes());System.out.println(" [x] Sent '" + message + "' with delay");}}
}
消费者
import com.rabbitmq.client.*;public class ConsumerDelay {private final static String DELAYED_QUEUE = "delayed_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(DELAYED_QUEUE, false, false, false, null);channel.queueBind(DELAYED_QUEUE, "delayed_exchange", "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(DELAYED_QUEUE, true, deliverCallback, consumerTag -> { });}}
}