【SpringBoot整合RabbitMQ(下)】

news/2025/4/2 3:57:22/

八、死信队列

        先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了, consumer queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。

8.1、死信的三大来源

代码结构图:

 架构图逻辑说明:

生产者发送消息至交换机(正常交换机),由交换机根据routing-key决定发送到哪个队列(正常队列),此时触发以下三种条件之一,正常队列将消息发送到死信交换机,并指明routing-key发往死信队列,再由死信队列消费者进行消费。

8.1.1、消息 TTL 过期

生产者:

    private static final String EXCHANGE_NAME = "normal_exchange";/*** 设置过期时间导致消息死信* @param args* @throws Exception*/public static void main1(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//死信消息 设置TTL时间 -》time to live 单位是msAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//发消息for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));}}

正常消费者:

    /*** 正常交换机名称*/private static final String NORMAL_EXCHANGE = "normal_exchange";/*** 死信交换机名称*/private static final String DEAD_EXCHANGE = "dead_exchange";/*** 交换机类型*/private static final String TYPE = "direct";/*** 普通队列名称*/private static final String NORMAL_QUEUE = "c1";/*** 死信队列名称*/private static final String DEAD_QUEUE = "c2";
    public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,TYPE);channel.exchangeDeclare(DEAD_EXCHANGE,TYPE);//声明普通队列Map<String,Object> argument = new HashMap<>();//正常的队列设置消息变成死信之后发送给哪个死信交换机argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信rountingKEY-》即死信消息发送给死信交换机后发往哪个死信队列argument.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信交换机与队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("C1等待接收消息......");//声明 接收消息DeliverCallback deliverCallback = (consumeTag, message) ->{String msg = new String(message.getBody(),"UTF-8");System.out.println(msg);};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};//开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}

 生产者和消费者线程启动后将消费者线程杀死,即消息无法被消费,等待消息过期后查看死信队列当中的消息数量或死信消费者的控制台输出。

注意:消息过期时间也可以由队列来决定,argument集合中添加如下参数即可:

        //过期时间-》10sargument.put("x-message-ttl",10_000);

8.1.2、队列达到最大长度(队列满了,无法再添加数据到 mq )

生产者:

public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();for (int i = 0; i < 10; i++) {String message = "info" + i;channel.basicPublish(EXCHANGE_NAME,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));}}

消费者:

声明队列时添加参数(其余代码与情况1代码一致):

        //设置正常队列的最大容量argument.put("x-max-length",6);

先启动消费者线程,由消费者线程声明完交换机和队列后将消费者线程杀死,此时再启动生产者线程发送消息,此处发送十条消息,由于队列最多存储6条消息,其余4条则发往死信队列,生产者程序执行完成后查看WEB端两个队列当中的消息条数即可。

8.1.3、消息被拒绝(basic.reject basic.nack)并且 requeue=false.

消费者(整体代码与情况1一致,区别在于接收到消息时的回调函数):

        DeliverCallback deliverCallback = (consumeTag, message) ->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){//拒绝该消息接收System.out.println("拒绝该消息:" + msg);channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer1接收的消息是:" + new String(message.getBody(),"UTF-8"));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};

 生产者和消费者线程一并启动,生产者代码与情况2当中的代码一致,当发送消息为:“info5”时正常消费者拒绝该消息,此时正常队列将该消息发送到死信队列进行处理。

8.1.4、死信消费者

    private static final String DEAD_QUEUE = "c2";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明 接收消息DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println("Consumer2接收的消息是:" + new String(message.getBody(),"UTF-8"));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}

九、整合SpringBoot

添加依赖:

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

配置文件:

spring:rabbitmq:host: 43.138.78.150port: 5672username: adminpassword: 123virtual-host: /testpublisher-confirm-type: CORRELATEDpublisher-returns: true

十、延迟队列

延迟队列是死信队列的第一种情况(消息TTL过期)进一步演化而来的,原因如下:

在我们上述死信队列代码演示当中说明了两种设置消息过期的方法,一种是发送消息时指明消息多久后过期,另外一种是设置某队列当中消息的过期时间,这两种方式是有差异的,如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

10.1、简单延迟队列实现

实现逻辑:和上述的死信队列实现方式类似,区别在于我们不再需要正常消费者,只需要设置死信消费者即可。

代码结构图:创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

生产者:

由于整合了SpringBoot,我们将不再使用main函数的形式发送消息,而是采用web界面的输入来发送消息。
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {//发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:"+message);}
}

队列声明、交换机声明、绑定关系:

@Configuration
public class QueueTtlConfig {//普通交换机的名称private static final String NORMAL_EXCHANGE = "X";//死信交换机的名称private static final String DEAD_EXCHANGE = "Y";//普通队列1的名称private static final String NORMAL_QUEUE_1 = "QA";//普通队列2的名称private static final String NORMAL_QUEUE_2 = "QB";//死信队列的名称private static final String DEAD_QUEUE = "QD";//声明X 交换机@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(NORMAL_EXCHANGE);}//声明Y 交换机@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(DEAD_EXCHANGE);}//声明普通队列A TTL为10s@Bean("queueA")public Queue queueA(){Map<String,Object> arguments = new HashMap<>(3);//过期时间-》10sarguments.put("x-message-ttl",10_000);//正常的队列设置消息变成死信之后发送给哪个死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列arguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(NORMAL_QUEUE_1).withArguments(arguments).build();}//声明普通队列B TTL为40s@Bean("queueB")public Queue queueB(){Map<String,Object> arguments = new HashMap<>(3);//过期时间-》10sarguments.put("x-message-ttl",40_000);//正常的队列设置消息变成死信之后发送给哪个死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列arguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(NORMAL_QUEUE_2).withArguments(arguments).build();}//声明死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE).build();}//队列QA和交换机X绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("XA");}//队列QB和交换机X绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("XB");}//队列QD和交换机Y绑定@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("YD");}
}

 消费者:

@Component
@Slf4j
public class Consumer {//接收消息@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMsg(Message message, Channel channel){String msg = new String(message.getBody());log.info("接收到的队列confirm_queue消息:{}",msg);}
}

 执行测试:

浏览器输入:http://localhost:8080/ttl/sendMsg/你好

 可以看到死信消费者消费了两个队列发送给死信队列的消息。

10.2、延迟队列优化

在上面实现的延迟队列当中,我们的发送消息的延迟时间是固定的,只有10s和40s两种,但是在实际应用场景当中,我们的延迟时间是不固定的,换句话说,延迟时间是由用户的需求所决定的。因此,我们需要再生产者这边就决定消息的延迟时间。

代码架构图:在上面实现的架构当中添加队列QC,绑定关系如下,不设置其TTL时间。

生产者:

    //发消息-自定义TTL@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")public void sendExpirationMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC","消息来自TTL为自定义的队列:"+message,msg ->{//发送消息的时候 延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}

消费者:消费者没有变化

队列声明、交换机声明、绑定关系:在上面的声明配置类当中添加如下代码

    //普通队列3的名称private static final String NORMAL_QUEUE_3 = "QC";//声明普通队列C@Bean("queueC")public Queue queueC(){Map<String,Object> arguments = new HashMap<>(3);//正常的队列设置消息变成死信之后发送给哪个死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列arguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(NORMAL_QUEUE_3).withArguments(arguments).build();}//队列QC和交换机X绑定@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queue,@Qualifier("xExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("XC");}

 测试:

浏览器输入:

http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000
控制台输出:

 观察控制台我们发现就有问题了,20s时我们发送“你好1”,延迟时间20s,41s时死信队列收到该消息,有1s的误差是可以接受的,因为网络传输也需要时间,但是,我们30s时发送“你好2”,延迟时间2s,按道理来说是需要在32s左右时收到该消息,可以看到,收到该消息的时间与消息1一致,这就是我们上面所说的在发送消息时设置TTL属性的弊端。究其原因:因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

10.3、解决延迟队列优化后的弊端

10.3.1、安装延迟队列插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html
下载 rabbitmq_delayed_message_exchange 插件,
然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

10.3.2、安装完成后

10.3.3、代码架构图

在这里新增了一个队列 delayed.queue, 一个自定义交换机 delayed.exchange ,绑定关系如下 :

10.3.4、代码实现

        在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia( 一个分布式数据系统 ) 表中,当达到投递时间时,才投递到目标队列中。

声明:

@Configuration
public class DelayedQueueConfig {//交换机private static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";//队列private static final String DELAYED_QUEUE_NAME = "delayed_queue";//routing-keyprivate static final String DELAYED_ROUTING_KEY = "delayed_routingKey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String,Object>  arguments = new HashMap<>();arguments.put("x-delayed-type","direct");/*** 1.交换机的名称* 2.交换机的类型* 3.是否需要持久化* 4.是否需要自动删除* 5.其他参数*/return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//队列和交换机绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者:

    //发消息-基于插件 发送消息+时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),delayTime,message);rabbitTemplate.convertAndSend("delayed_exchange","delayed_routingKey","消息来自TTL为自定义的队列:"+message,msg ->{//发送消息的时候 延迟时长msg.getMessageProperties().setDelay(delayTime);return msg;});}

消费者:

@Slf4j
@Component
public class DelayQueueConsumer {//接收消息@RabbitListener(queues = "delayed_queue")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody(),"UTF-8");log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);}
}

 10.3.5、测试验证

浏览器输入:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
控制台效果:

十一、发布确认高级模式

在上一篇中我们将了发布确认模式的必要性,我们此时考虑一个复杂的场景,例如,生产者发送消息时,MQ此时刚好宕机重启了,甚至整个MQ集群都不可用了,那重启期间生产者发送的消息就会丢失,需要我们手动处理和恢复。此时我们思考一个问题,如何保证消息可靠投递呢?换句话说,极端情况下,无法投递的消息我们该如何处理?

11.1、基础逻辑代码

配置类:

@Configuration
public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";//routingKeypublic static final String ROUTING_KEY = "key1";//声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,@Qualifier("confirmQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);}
}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8));log.info("发送消息内容:{}",message);}

消费者:

@Component
@Slf4j
public class Consumer {//接收消息@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMsg(Message message, Channel channel){String msg = new String(message.getBody());log.info("接收到的队列confirm_queue消息:{}",msg);}
}

11.2、消息发送失败以及消息回退处理(交换机未收到、队列未收到) 

配置文件添加如下配置:

    publisher-confirm-type: CORRELATED//发布消息成功到交换器后会触发回调方法

11.2.1、消息未被交换机或队列收到时的回调函数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的此时就需要添加如下配置:

    publisher-returns: true//消息回退
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}//交换机确认回调方法/*** 1.发消息 交换机收到了-》回调*  1.1 correlationData 保存了回调消息的ID及相关信息*  1.2 交换机是否收到消息 true*  1.3 交换机没有收到消息的原因-》null* 2.发消息 交换机接收失败了-》回调*  2.1 correlationData 保存了回调消息的ID及相关信息*  2.2 交换机是否收到消息 false*  2.3 交换机没有收到消息的原因-》reason*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "0";if (b){log.info("交换机已经收到了ID为:{}的消息",id);}else {log.info("交换机没有收到ID为:{}的消息,原因为:{}",id,s);}}//当生产者成功发送消息至交换机后但未发送至队列时的回调函数//当消息不可达队列时才会触发此函数@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息{}被交换机{}退回,退回的原因:{},路由key:{}",returnedMessage.getMessage().getBody().toString(),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());}}

 验证:

①、交换机错误

 将发送消息时的交换机名称进行修改,即可观察到所触发的回调函数。

 rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + "123",ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);

②、路由错误

  生产者代码修改:

    @GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){String message1 = message + "key1";CorrelationData correlationData = new CorrelationData();String id = String.valueOf(UUID.randomUUID());correlationData.setId(id);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);log.info("发送消息1内容:{}",message + "key1");CorrelationData correlationData2 = new CorrelationData();String id2 = String.valueOf(UUID.randomUUID());correlationData2.setId(id2);String message2 = message + "key12";rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY + "2",message2.getBytes(StandardCharsets.UTF_8),correlationData2);log.info("发送消息2内容:{}",message + "key12");}

十二、备份交换机 

        有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

 12.1、代码结构图及其逻辑

1.Web端输入消息内容

2.控制器将消息封装成两份,message1和message2,message1使用正确的routing-key路由到队列当中,message2使用错误的routing-key路由不到队列中。

3.message1由正常消费者进行消费,message2回退至交换机,由交换机发往备份交换机进行处理,最后由报警队列的消费者进行消费。

12.2、代码

1.生产者:生产者代码使用发布确认高级模式当中的生产者代码。

2.普通交换机及其队列声明:

声明同样采用发布确认高级模式当中的代码,只需要将交换机声明的代码修改为如下代码:

    //声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){Map<String,Object> arguments = new HashMap<>();arguments.put("alternate-exchange",MsgFailSendConfig.BACKUP_EXCHANGE_NAME);return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArguments(arguments).build();}

 使用如上配置后,消息未路由到队列时,将消息发送至备份交换机。 

3.备份交换机及其队列声明:

@Configuration
public class MsgFailSendConfig {//备份交换机public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";//备份队列public static final String BACKUP_QUEUE_NAME = "backup_queue";//报警队列(或者二次处理队列)public static final String WARNING_QUEUE_NAME = "warning_queue";//声明备份交换机@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明备份队列和报警队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}//绑定@Beanpublic Binding backupBindingExchange(@Qualifier("backupQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}@Beanpublic Binding warningBindingExchange(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}}

12.3、效果验证

 浏览器输入:

http://localhost:8080/confirm/sendMsg/你好

控制台效果:

十三、优先级队列

13.1、使用场景

在我们系统中有一个 订单催付 的场景,我们的客户在天猫下的订单 , 淘宝会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景, 所以订单量大了后采用 RabbitMQ 进行改造和优化 , 如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

13.2、如何添加

a.控制台页面添加

b.代码层面添加

        Map<String,Object> arguments = new HashMap<>();arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高/*** 创建一个队列* 1.队列名称* 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)* 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);

13.3、实战

生产者:

public class Producer {//队列名称private static final String QUEUE_NAME = "hello";//发消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();Map<String,Object> arguments = new HashMap<>();arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高/*** 创建一个队列* 1.队列名称* 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)* 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);for (int i = 11; i < 21; i++) {String message = "info:" + i;AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i-10).build();//设置消息优先级/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的key值是哪个 本次是队列名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,properties,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息体:" + message + " 优先级:" + (i - 10));}}
}

消费者:

public class Consumer {//队列的名称public static final String QUEUE_NAME = "hello";//接收消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/*** 声明 接收消息*/DeliverCallback deliverCallback = (consumeTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费的回调CancelCallback cancelCallback = (consumeTag) -> {System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答* 3.当一个消息发送过来后的回调接口* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

 结果验证:

 

 

十四、惰性队列

14.1、使用场景

        RabbitMQ 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因( 比如消费者下线、宕机亦或者是由于维护而关闭等 ) 而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

14.2、两种模式

队列具备两种模式: default lazy
代码设置:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);


http://www.ppmy.cn/news/71762.html

相关文章

在博客逮到一个阿里8年测试开发,聊过之后我悟了....

老话说的好&#xff0c;这人呐&#xff0c;一旦在某个领域鲜有敌手了&#xff0c;就会闲得疋虫疼。前几天我在上班摸鱼刷社区的时候认识了一位腾讯测试开发大佬&#xff0c;在阿里工作了8年&#xff0c;因为本人天赋比较高&#xff0c;平时工作也兢兢业业&#xff0c;现在企业内…

Git安装应用

Git版本控制 1. 概述 1.1 什么是版本控制 开发中实际场景 场景一&#xff1a;备份 小明负责的模块就要完成了&#xff0c;就在即将Release之前的一瞬间&#xff0c;电脑突然蓝屏&#xff0c;硬盘光荣牺牲&#xff01;几个月以来的努力付之东流 ​ 场景二&#xff1a;代码还…

OpenGL之创建窗口

目录 什么是OpenGL&#xff1f; 核心模式与立即渲染模式 立即渲染模式 (Immediate mode) 核心模式(Core-profile) 状态机 对象 创建窗口 配置环境 什么是OpenGL&#xff1f; 一般它被认为是一个API(Application Programming Interface, 应用程序编程接口)&#xff0c;…

【Linux内核解析-linux-5.14.10-内核源码注释】Linux系统关于多进程和多线程相关知识点

Linux系统关于多进程和多线程的问题&#xff0c;常见的有以下几个&#xff1a; 什么是进程和线程&#xff1f; 进程是操作系统资源分配的最小单位。它包括程序、数据和进程控制块&#xff08;PCB&#xff09;&#xff0c;是一个正在运行中的程序实例。每个进程都有自己的内存…

Redis(二)对事务进行操作及Jedis

系列文章目录 Redis入门笔记&#xff08;一&#xff09;&#xff1a;Redis在Linux下安装和八大数据类型 文章目录 系列文章目录前言Redis对事务进行操作开启事务&#xff1a;multi执行事务&#xff1a;exec放弃事务&#xff1a;discard编译型异常&#xff08;命令错误&#xf…

media设备节点初始化与Video4Linux初始化

media设备节点初始化与Video4Linux初始化 文章目录 media设备节点初始化与Video4Linux初始化media设备节点初始化Video4Linux初始化 media设备节点初始化 media_devnode_init函数是一个内核初始化函数&#xff0c;用于在Linux内核启动期间进行设备节点初始化。 函数的主要作用…

Python入门(十二)while循环(二)

while循环&#xff08;二&#xff09; 1.使用while循环处理列表和字典2.在列表之间移动元素3.删除为特定值的所有列表元素4.使用用户输入来填充字典 作者&#xff1a;xiou 1.使用while循环处理列表和字典 到目前为止&#xff0c;我们每次都只处理了一项用户信息&#xff1a;获…

LeetCode_数据结构设计_困难_295.数据流的中位数

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 中位数是有序整数列表中的中间值。如果列表的大小是偶数&#xff0c;则没有中间值&#xff0c;中位数是两个中间值的平均值。 例如 arr [2,3,4] 的中位数是 3 。例如 arr [2,3] 的中位数是 (2 3) / 2 2.…