死信队列
概览
死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种特殊队列,用于存储无法被消费者正常处理的消息。当消息被发送到普通队列时,如果满足一定的条件,比如消息过期、被拒绝、队列长度达到上限等,这些消息就会被标记为死信,并被发送到死信队列中。
死信队列的主要作用包括:
-
消息处理失败处理:当消息无法被正常处理时,将其发送到死信队列中,以便进一步分析处理失败的原因,进行错误恢复或补救措施。
-
延迟消息处理:可以将消息发送到带有过期时间的队列中,在消息过期后自动发送到死信队列,实现延迟消息处理功能。
-
异常消息处理:可以将无法被正常处理的异常消息发送到死信队列中,避免影响正常消息的处理流程。
配置死信队列的步骤包括:
-
声明普通队列:首先需要声明一个普通队列,并设置相关的参数,如过期时间、最大长度等。
-
声明死信交换机和死信队列:然后声明一个死信交换机和一个死信队列,并将它们绑定在一起。
-
设置普通队列的死信参数:将普通队列的
x-dead-letter-exchange
和x-dead-letter-routing-key
参数设置为死信交换机和死信队列的名称。
当满足死信条件时,消息就会被发送到死信交换机,并路由到死信队列中。开发者可以通过监控死信队列中的消息,分析处理失败的原因,并根据需要进行后续的处理。使用死信队列可以提高消息系统的稳定性和可靠性,降低消息处理失败带来的影响。
使用
1.创建交换机和队列
为了方便区分,这里重新创建配置文件DeadLetterConfig,创建一个正常交换机,一个死信交换机,队列和消费者同理,这里采用的是路由模式,路由 key为dead
package com.model.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: Haiven* @Time: 2024/4/23 16:35* @Description: 用于测试死信队列的配置文件*/
@Configuration
public class DeadLetterConfig {/*** 创建普通交换机* @return 交换机*/@Bean("normalExchange")public Exchange getNormalExchange(){return ExchangeBuilder.directExchange("exchange_normal").build();}/*** 创建普通交换机* @return 交换机*/@Bean("deadExchange")public Exchange getDeadExchange(){return ExchangeBuilder.directExchange("exchange_dead").build();}/*** 普通队列* @return queue*/@Bean("normalQueue")public Queue getNormalQueue(){return QueueBuilder.durable("queue_normal")//设置队列的死信交换机。当消息被标记为死信时,会被发送到指定的死信交换机中。.deadLetterExchange("exchange_dead")//队列的最大长度.maxLength(3)//设置队列的死信路由键。当消息被发送到死信交换机时,会根据该路由键路由到指定的死信队列中。.deadLetterRoutingKey("dead")//设置队列的过期时间(毫秒)。队列在没有被使用,且过期时间到达后,会自动被删除。.expires(30000).build();}/*** 死信队列* @return queue*/@Bean("deadQueue")public Queue getDeadQueue(){return QueueBuilder.durable("queue_dead").build();}@Beanpublic Binding getNormalBinding(){return BindingBuilder.bind(getNormalQueue()).to(getNormalExchange()).with("normal").noargs();}@Beanpublic Binding getDeadBinding(){return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange()).with("dead").noargs();}
}
QueueBuilder.deadLetterExchange("exchange_dead");配置此参数将交换机与死信交换机绑定,当消息消费失败后会推送到死信交换机中
2.创建消费者
package com.model.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.rmi.ServerException;
import java.util.concurrent.TimeUnit;/*** @Author: Haiven* @Time: 2024/4/24 9:55* @Description: TODO*/
@Component
@Slf4j
public class DeadConsumer {@RabbitListener(queues = {"queue_normal"})public void consumeNormal(String msg, Message message, Channel channel) throws IOException {try {log.debug("消费者 - 普通队列 - 接收到消息:" + msg);//模拟消息处理, 三秒种TimeUnit.SECONDS.sleep(2);//在 RabbitMQ 中,channel.basicAck() 方法用于向 RabbitMQ 发送消息应答,以确认消息已经被成功处理。//deliveryTag:表示消息的唯一标识符,用于标识需要确认的消息。// 每个消息都有一个唯一的 deliveryTag,由 RabbitMQ 自动生成。// 在确认消息时,需要指定对应消息的 deliveryTag。//multiple: 表示是否批量确认消息。如果将 multiple 参数设置为 false,// 则只确认指定 deliveryTag 对应的单个消息;如果将 multiple// 参数设置为 true,则表示确认所有 deliveryTag 小于或等于指定值的消息。// 通常情况下,建议将 multiple 参数设置为 false,以避免误操作导致确认了未处理的消息。//requeue:表示是否重新将消息放入队列中。如果将 requeue 参数设置为 true,// 则表示消息将被重新放入队列中,等待被重新消费;如果将 requeue// 参数设置为 false,则表示消息将被丢弃,不会重新放入队列中。通常情况下,// 在确认消息时应将 requeue 参数设置为 false,以确保消息不会被重复消费。int a = 1/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.debug("消费者 - 普通队列 - 消息处理完毕:" + msg);} catch (Exception e) {//channel.basicNack() 方法用于向 RabbitMQ 发送拒绝消息应答,表示消息未被成功处理。与 basicAck() 方法类似,basicNack() 方法也有三个参数,分别是 deliveryTag、multiple 和 requeuechannel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);throw new ServerException("普通队列消费消息失败!");}}@RabbitListener(queues = {"queue_dead"})public void consumeDead(String msg){log.debug("消费者 - 死信队列 - 接收消息:" + msg);}
}
3.生产者发送消息
package com.model.controller;import com.code.domain.Response;
import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 9:46* @Description: TODO*/
@RestController
@RequestMapping("/producer")
@Slf4j
public class ProducerController {@Resourceprivate RabbitService rabbitService;@GetMapping("/simple")public Response<Void> simple(String msg){boolean res = rabbitService.simple(msg);return res ? Response.success() : Response.fail();}@GetMapping("/work")public Response<Void> work(String msg){boolean res = rabbitService.work(msg);return res ? Response.success() : Response.fail();}@GetMapping("/sub")public Response<Void> sub(String msg){boolean res = rabbitService.sub(msg);return res ? Response.success() : Response.fail();}@GetMapping("/routing")public Response<Void> routing(String msg, String type){boolean res = rabbitService.routing(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/topic")public Response<Void> topic(String msg, String type){boolean res = rabbitService.topic(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/confirm")public Response<Void> confirm(String msg, String type){boolean res = rabbitService.confirm(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/normal")public Response<Void> normal(String msg, String rout){boolean res = rabbitService.normal(msg, rout);return res ? Response.success() : Response.fail();}
}
package com.model.service.impl;import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 10:51* @Description: TODO*/
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {@Resourceprivate RabbitTemplate rabbitTemplate;@Value("${rabbitmq.simple.queue}")private String simpleQueue;@Value("${rabbitmq.work.queue}")private String workQueue;@Overridepublic boolean simple(String msg) {try {rabbitTemplate.convertAndSend(simpleQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean work(String msg) {try {rabbitTemplate.convertAndSend(workQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean sub(String msg) {try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_sub","", msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean routing(String msg, String type) {log.debug("理由模式发送消息:msg="+msg+",type="+type+"");try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_routing",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean topic(String msg, String type) {log.debug("主题模式发送消息:msg="+msg+",type="+type+"");try {//主题模式会根据 type的通配符进行分发rabbitTemplate.convertAndSend("exchange_topic",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean confirm(String msg, String type) {log.debug("发布确认模式发送消息:msg="+msg+",type="+type+"");try {rabbitTemplate.convertAndSend("exchange_confirm",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean normal(String msg, String rout) {log.debug("发送消息到普通队列中(normal):msg={},type={}", msg, rout);try {rabbitTemplate.convertAndSend("exchange_normal",rout, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}
}
向普通交换机(normal)发送消息
后台接收
当消费者消费时,int a = 1/0;会报错,导致手动签收失败,此时消息被死信队列接收,普通队列此时还是会继续重试,这是由于消费者的重试机制会默认开启,重试次数为3次
延迟队列
概览
延迟队列是指消息在进入队列后,并不立即被消费,而是在一定的延迟时间后才能被消费。在 RabbitMQ 中,并没有原生支持延迟队列的功能,但可以通过一些技术手段来实现延迟队列的效果。
一种常见的实现方式是使用 RabbitMQ 的插件或者结合其他组件来实现延迟队列,其中比较常用的方法是使用死信队列和 TTL(Time-To-Live)。
步骤
-
定义死信队列(DLX)和延迟队列:首先,定义一个死信队列(DLX),用于接收延迟消息。然后,定义一个延迟队列,将消息发送到延迟队列中,并设置消息的 TTL,使消息在一定的延迟时间后成为死信,进入死信队列。
-
配置延迟队列:在延迟队列的声明中,需要设置
x-dead-letter-exchange
参数为死信队列的交换机,以及x-dead-letter-routing-key
参数为死信队列的路由键。 -
发送延迟消息:将消息发送到延迟队列中,并设置消息的 TTL,使消息在一定的延迟时间后成为死信。
-
消费死信队列:消费死信队列中的消息,并进行相应的处理。通常情况下,可以将死信队列中的消息重新发送到其他队列中进行处理,或者进行一些其他的处理逻辑。
通过上述步骤,可以实现延迟队列的效果。消息在发送到延迟队列后,并不会立即被消费,而是在一定的延迟时间后成为死信,进入死信队列,然后再被消费。这样就实现了延迟队列的效果。
实现
这里我们使用上面的死信队列进行测试,新建延迟队列queue_delay
1.创建交换机和队列
package com.model.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: Haiven* @Time: 2024/4/24 15:58* @Description: TODO*/
@Configuration
public class DelayConfig {@Bean("delayExchange")public Exchange getDelayExchange(){return ExchangeBuilder.directExchange("exchange_delay").build();}/*** 普通队列* @return queue*/@Bean("delayQueue")public Queue getDelayQueue(){return QueueBuilder.durable("queue_delay")//设置队列的死信交换机。当消息被标记为死信时,会被发送到指定的死信交换机中。.deadLetterExchange("exchange_dead")//设置队列的死信路由键。当消息被发送到死信交换机时,会根据该路由键路由到指定的死信队列中。.deadLetterRoutingKey("dead")//队列中的每条消息在被投递到队列时,都会被赋予一个过期时间,如果消息在队列中的时间超过了设定的过期时间,则会被标记为过期消息.ttl(3000).build();}@Bean("delayBinding")public Binding getDelayBinding(){return BindingBuilder.bind(getDelayQueue()).to(getDelayExchange()).with("delay").noargs();}
}
- 由于是延迟队列,里面的消息主要是延迟后由死信队列消费,所以这里不需要消费者
x-message-ttl
是 RabbitMQ 中用于设置队列中消息的过期时间的参数。具体来说,它是队列级别的属性,用于设置队列中消息的生存时间,单位为毫秒。当设置了
x-message-ttl
参数后,队列中的每条消息在被投递到队列时都会被赋予一个过期时间,如果消息在队列中的时间超过了设定的过期时间,则会被标记为过期消息,并在一段时间后被自动删除。如果队列中的某条消息在被消费者消费之前就已经过期了,那么这条消息将被立即丢弃。另外,如果消息已经被消费者接收并消费,那么即使消息过期了,也不会对消费者产生影响。
使用
x-message-ttl
参数可以实现一些场景,比如:
- 对于一些临时性的消息,可以设置消息的过期时间,确保消息不会在队列中长时间堆积。
- 可以配合死信队列一起使用,将过期的消息发送到死信队列中进行处理。
2.生产者发送消息
package com.model.controller;import com.code.domain.Response;
import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 9:46* @Description: TODO*/
@RestController
@RequestMapping("/producer")
@Slf4j
public class ProducerController {@Resourceprivate RabbitService rabbitService;@GetMapping("/simple")public Response<Void> simple(String msg){boolean res = rabbitService.simple(msg);return res ? Response.success() : Response.fail();}@GetMapping("/work")public Response<Void> work(String msg){boolean res = rabbitService.work(msg);return res ? Response.success() : Response.fail();}@GetMapping("/sub")public Response<Void> sub(String msg){boolean res = rabbitService.sub(msg);return res ? Response.success() : Response.fail();}@GetMapping("/routing")public Response<Void> routing(String msg, String type){boolean res = rabbitService.routing(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/topic")public Response<Void> topic(String msg, String type){boolean res = rabbitService.topic(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/confirm")public Response<Void> confirm(String msg, String type){boolean res = rabbitService.confirm(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/normal")public Response<Void> normal(String msg, String rout){boolean res = rabbitService.normal(msg, rout);return res ? Response.success() : Response.fail();}@GetMapping("/delay")public Response<Void> delay(String msg, String rout){boolean res = rabbitService.delay(msg, rout);return res ? Response.success() : Response.fail();}
}
package com.model.service.impl;import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 10:51* @Description: TODO*/
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {@Resourceprivate RabbitTemplate rabbitTemplate;@Value("${rabbitmq.simple.queue}")private String simpleQueue;@Value("${rabbitmq.work.queue}")private String workQueue;@Overridepublic boolean simple(String msg) {try {rabbitTemplate.convertAndSend(simpleQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean work(String msg) {try {rabbitTemplate.convertAndSend(workQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean sub(String msg) {try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_sub","", msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean routing(String msg, String type) {log.debug("理由模式发送消息:msg="+msg+",type="+type+"");try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_routing",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean topic(String msg, String type) {log.debug("主题模式发送消息:msg="+msg+",type="+type+"");try {//主题模式会根据 type的通配符进行分发rabbitTemplate.convertAndSend("exchange_topic",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean confirm(String msg, String type) {log.debug("发布确认模式发送消息:msg="+msg+",type="+type+"");try {rabbitTemplate.convertAndSend("exchange_confirm",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean normal(String msg, String rout) {log.debug("发送消息到普通队列中(normal):msg={},type={}", msg, rout);try {rabbitTemplate.convertAndSend("exchange_normal",rout, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean delay(String msg, String rout) {log.debug("发送消息到 --延迟队列-- 中(delay):msg={},rout={}", msg, rout);try {rabbitTemplate.convertAndSend("exchange_delay",rout, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}
}
发送消息
后台接收
发送到延迟队列的消息在过期后被死信队列接收