[RabbitMQ] 延迟队列+事务+消息分发

news/2024/12/5 0:57:59/

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 延迟队列
    • 1.1 概念
    • 1.2 TTL+死信队列实现
    • 1.3 延迟队列插件
      • 1.3.1 安装延迟队列
      • 1.3.2 基于插件延迟队列实现
    • 1.4 常见面试题
  • 2. 事务
    • 2.1 配置事务
    • 2.2 配置队列
    • 2.3 生产者
  • 3. 消息分发
    • 3.1 概念
    • 3.2 应用场景
      • 3.2.1 限流
      • 3.2.2 负载均衡

1. 延迟队列

1.1 概念

延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费.
RabbitMQ本身并没有直接支持延迟队列的功能,但是可以通过TTL+死信队列的方式结合模拟出延迟队列的功能.
假设一个应用中需要每条消息都为10s延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列,之后为这个队列或者队列中的消息的ttl设置为10s.但是消费者订阅的队列并不是normal_queue这个队列,而是dlx_queue这个队列,当消息从normal_queue这个队列中的消息经历10s过期之后存入dlx_queue这个队列中,消费者就恰好消费到了延迟10s之后的消息.
在这里插入图片描述

1.2 TTL+死信队列实现

代码实现:

  1. 先看TTL+死信队列实现延迟队列
    定义正常队列和死信队列,绑定正常队列和死信交换机.
@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();
}
@Bean
public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange
){return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public Queue dlxQueue(){return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxQueue") Queue queue,@Qualifier("dlxExchange") DirectExchange exchange
){return BindingBuilder.bind(queue).to(exchange).with("dlx");
}

编写生产者:
发送一条10s过期的消息,再发送一条20s过期的消息.

@RequestMapping("/delay")
public String delay(){Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setExpiration("10000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setExpiration("20000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);return "发送成功";
}

消费者:

@Component
public class DelayListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void listener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}
}

调用接口之后观察控制台接收消息的结果:等待10s和20s之后分别接收到消息
在这里插入图片描述
延迟队列希望达到的效果就是延迟一定的时间之后才收到消息,TTL刚好给消息设置延迟时间,成为死信,成为死信之后就会被投递到死信队列中,这样消费者就可以一直消费死信队列的消息就可以了.
但是这样的模式也会存在一定的问题
我们可以先发送20s的数据,再发送10s的数据:

@RequestMapping("/delay")
public String delay(){Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setExpiration("20000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setExpiration("10000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);return "发送成功";
}

通过控制台观察死信队列消费情况:
在这里插入图片描述
我们发现10s过期的消息和20s过期的消息同时被消费者收到.10s过期的消息和20s过期的消息同时进入了死信队列.
这是由于在消息过期之后,消息不会被马上丢弃,消息只在消息被消费者消费的时候,即出队列的时候检测消息是否过期(扫描队头的消息是否过期),由于20s的消息在10s消息的前面,队列会优先扫描20s过期的消息,10s过期的消息还暂时不会被扫描到,当队列扫描到20s的消息过期的时候,10s的消息才会被扫描到,队列这才会认为10s的这条消息已经过期了,所以他和20s的消息便同时进入了死信队列中.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是⼀致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每⼀种不同延迟时间的消息建立单独的消息队列.

1.3 延迟队列插件

RabbitMQ官方也提供了一个延迟的插件来实现延迟队列的功能.

1.3.1 安装延迟队列

  1. 下载并上传插件
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    在这里插入图片描述
    下载ez文件.下载到Windows环境之后通过Xshell上传到服务器.(这里需要注意的是,我们下载的插件版本需要和我们操作系统上安装的RabbitMQ的版本一致)
    我们在上传到服务器中的时候,我们需要把该文件上传到/usr/lib/rabbitmq/plugins目录中,RabbitMQ本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建.
    在这里插入图片描述
  2. 启动插件:
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    在这里插入图片描述
  3. 验证插件
    在RabbitMQ管理平台查看,新建交换机的时候是否有延迟消息的选项,如果有就说明延迟消息插件已经正常运行了.
    在这里插入图片描述

1.3.2 基于插件延迟队列实现

  1. 声明交换机和队列
    在交换机的声明之后加上delay()选项,这里需要注意的是,虽然我们叫的是延迟队列,但是我们是在交换机上声明延迟的.
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_QUEUE = "delay_queue";
@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().durable(true).build();
}
@Bean
public Queue delayQueue(){return QueueBuilder.durable(Constant.DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange,@Qualifier("delayQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("delay");
}
  1. 生产者
    发送两条消息,并设置延迟时间,这里我们先设置20s的,再设置10s的,看看上面的问题有没有得到解决.我们前面使用ttl+死信队列的方式实现消息延迟的时候,我们设置消息设置的是过期时间(setExpiration),我们在这里设置的时候设置的是延迟时间(setDelayLong).
@RequestMapping("/delay")
public String delay(){Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setDelayLong(20000L);rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message2);Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setDelayLong(10000L);rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message1);return "发送成功";
}
  1. 消费者
@Component
public class DelayListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void listener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}
}
  1. 运行程序,观察控制台日志和RabbitMQ管理界面
    在这里插入图片描述
    我们看到delay_exchange的交换机类型是"x-delay-message".
    调用接口,发送消息,观察控制台日志:
    在这里插入图片描述
    我们发现我们上述的问题得到了解决,我们首先收到了延迟10s的消息,后收到了延迟20s的消息.

1.4 常见面试题

延迟队列作为RabbitMQ的高级特性,也是面试的一大重点

  1. 介绍一下RabbitMQ的延迟队列
    延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费.
    但是RabbitMQ本身并没有直接实现延迟队列,有以下的两种方法来实现:
    • 通过ttl+死信队列的方式来实现
    • 但是通过这种方式实现存在一定的问题,如果延迟时间长的消息先到达队列,延迟时间短的后到达队列,延迟时间短的不会即时被消费者收到.
    • 可以通过官方提供的延迟插件实现延迟功能.
  2. 应用场景
    • 订单在10min之内未支付自动取消.
    • 用户在注册成功之后,3天之后发起调查问卷
    • 用户发起退款,24小时之内商家未处理,则默认同意退款.
  3. 二者对比
    • 基于死信实现的延迟队列
      优点就是灵活,不需要额外的插件来支持,缺点就是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性.
    • 基于插件实现的延迟队列
      优点就是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了DLX存在消息顺序问题.
      缺点就是需要依赖特定的插件,有运维的工作,其次RabbitMQ的版本必须和插件的版本对应.

2. 事务

RabbitMQ是基于AMQP协议实现的,该协议实现了事务的机制,因此RabbitMQ也支持事务的机制,Spring AMQP也提供了对事务的额相关操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败.

2.1 配置事务

配置事务管理器的时候,分为两步,首先创建RabbitTemplate,使用setChannelTransacted(true)开启RabbitTemplate的信道事务.之后创建事务管理器RabbitTransactionManager.

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate transactionTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}

2.2 配置队列

@Bean
public Queue tansactionQueue(){return QueueBuilder.durable("trans_queue").build();
}

2.3 生产者

在生产者中,我们需要在方法之上加上@Transactional才可以生效.我们在异常发生之前发送一条消息,在异常发生之后发送一条消息,查看数据是否会被回滚.

@RequestMapping("/trans")
@Transactional
public String trans(){transactionTemplate.convertAndSend("","trans_queue","trans1...");int i = 5/0;transactionTemplate.convertAndSend("","trans_queue","trans2...");return "发送成功";
}

测试:
在这里插入图片描述
在发送消息之后,报出了500的错误码.
在这里插入图片描述
我们看到trans_queue中没有接收到消息,说明第一条消息被回滚了.

3. 消息分发

3.1 概念

RabbitMQ队列拥有多个消费者的时候,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者.这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者处理消息即可.
默认的情况下,消费者是轮训进行分发的,而不管消费者是否已经消费并已经确认了消息.这种方式是不太合理的,试想一下,如果某些消费者的消费速度较慢,而某些消费者的消费速度很快,这就会导致某些消费者的消息发生积压,某些消费者则很空闲,进而导致应用的整体吞吐量下降.所以我们就可以使用消息分发来解决问题.

3.2 应用场景

消息分发的常见应用场景如下:

  1. 限流
  2. 非公平分发

3.2.1 限流

RabbitMQ提供了限流的机制,可以控制消费端一次只能拉取N个请求.
通过设置prefetchCount参数,同时也必须设置消息应答方式为手动应答.
prefetchCount: 控制消费者从队列中预取消息的数量,以此来实现流控制和负载均衡.
代码示例:

  1. 首先我们需要在配置文件中加入prefetch参数
    listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000msenabled: truemax-attempts: 5prefetch: 5 # 表示一个队列最多可以有5条未确认的消息
  1. 配置队列和交换机
@Bean
public DirectExchange QOSExchange(){return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).durable(true).build();
}
@Bean
public Queue QOSQueue(){return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
@Bean
public Binding QOSBinding(@Qualifier("QOSQueue") Queue queue,@Qualifier("QOSExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos");
}
  1. 消费者监听队列
    首先我们先不对消息进行手动ack
@Component
public class QOSListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));
//        channel.basicAck(deliveryTag,true);}
}
  1. 生产者发送消息,一次性发送20条消息
@RequestMapping("/qos")
public String qos(){for (int i = 0;i < 20 ;i++){rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","message"+i);}return "发送成功";
}
  1. 测试
    调用接口,发送消息:
    在这里插入图片描述
    在这里插入图片描述
    我们看到,由于没有对消息进行手动应答,我们控制台只收到了5条消息.
    在这里插入图片描述
    由于5条消息还没有ack掉,所以剩下的15条消息就在队列中发生了堆积.
    之后我们对消息进行手动ack.观察控制台:
    在这里插入图片描述
    我们看到消息全部被应答了.

3.2.2 负载均衡

我们也可以使用此配置来实现负载均衡.
如图所示的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费者很忙,一个消费者很闲.这是因为RabbitMQ只是在消息进入队列的时候分派消息,它不考虑消费者未确认消息的数量.
在这里插入图片描述
我们可以设置prefetch=1的方式来实现负载均衡.告诉RabbitMQ一次只给一个消费者发送消息,也就是说,在一个消费者对前一条消息进行确认之前,不会对该消费者发送新的消息,相反,它会将它分配给一个不处在繁忙阶段的消息队列.
代码示例:

  1. 配置prefetch参数为1,将消息应答机制设置为手动应答
    listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000msenabled: truemax-attempts: 5prefetch: 1 # 表示一个队列最多可以有1条未确认的消息
  1. 设置两个消费者,其中消费较慢的消费者使用Thread.sleep(100)来模拟消费慢.
@Component
public class QOSListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void listener(Message message, Channel channel) throws IOException, InterruptedException {Thread.sleep(100);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));channel.basicAck(deliveryTag,true);}@RabbitListener(queues = Constant.QOS_QUEUE)public void listener2(Message message,Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者2接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));channel.basicAck(deliveryTag,true);}
}
  1. 测试
    调用接口,向两个消费者发送消息
    在这里插入图片描述
    我们可以很明显的看到,消费者2消费消息的速度比消费者1快很多.

deliveryTag有重复是因为两个消费者使用的是不同的Channel,每个Channel上的deliveryTag 是独立计数的.


http://www.ppmy.cn/news/1552423.html

相关文章

无人机主控芯片技术与算法详解!

一、无人机主控芯片核心技术 高性能CPU&#xff1a; 无人机需要高性能的CPU来处理复杂的飞行控制算法、图像处理和数据传输等任务。目前&#xff0c;无人机的CPU主要有大疆自研的飞控系统、高通提供的无人机设计平台Snapdragon Flight&#xff0c;以及基于开源平台APM、Px4等…

HCIA笔记6--路由基础与静态路由:浮动路由、缺省路由、迭代查找

文章目录 0. 概念1.路由器工作原理2. 跨网访问流程3. 静态路由配置4. 静态路由的应用场景4.1 路由备份4.2 浮动路由4.3 缺省路由 5. 迭代路由6 问题6.1 为什么路由表中有的下一跳的地址有接口&#xff1f;6.2 个人电脑的网关本质是什么&#xff1f; 0. 概念 自治系统&#xff…

故障诊断 | Transformer-LSTM组合模型的故障诊断(Matlab)

效果一览 文章概述 故障诊断 | Transformer-LSTM组合模型的故障诊断(Matlab) 源码设计 %% 初始化 clear close all clc disp(此程序务必用2023b及其以上版本的MATLAB!否则会报错!) warning off %

【iOS】设计模式的六大原则

【iOS】设计模式的六大原则 文章目录 【iOS】设计模式的六大原则前言开闭原则——OCP单一职能原则——SRP里氏替换原则——LSP依赖倒置原则——DLP接口隔离原则——ISP迪米特法则——LoD小结 前言 笔者这段时间看了一下有关于设计模式的七大原则&#xff0c;下面代码示例均为OC…

【Oracle11g SQL详解】INSERT INTO 的用法及插入数据注意事项

INSERT INTO 的用法及插入数据注意事项 在 Oracle 11g 中&#xff0c;INSERT INTO 语句用于向表中插入数据&#xff0c;是数据写入操作中最常用的 SQL 语句之一。本文将详细介绍 INSERT INTO 的基本语法、常见场景、注意事项及常见错误处理。 一、INSERT INTO 的基本语法 INS…

详解Vue设计模式

详解 vue 设计模式 ​ Vue.js 作为一个流行的前端框架&#xff0c;拥有许多设计模式&#xff0c;这些设计模式帮助开发者更好地组织和管理代码&#xff0c;提升代码的可维护性、可扩展性和可读性。Vue 设计模式主要体现在以下几个方面&#xff1a; 1. 组件化设计模式 (Compon…

MATLAB不动点迭代法求单变量非线性方程的根程序加实例

不动点迭代法用于单变量线性方程近似根&#xff0c;首先确定一个方程根附近的近似初始值&#xff0c;采用逐次逼近的方法&#xff0c;使用迭代公式不断地更新这个初始值&#xff0c;使这个初始值不断趋近于准确值。 1.不动点迭代法自定义函数 fixed_point.m是一个MATLAB函数&a…

redis都有哪些用法

1. 缓存&#xff08;Caching&#xff09;&#xff1a; • Redis常被用作缓存层&#xff0c;存储那些频繁访问但更新不频繁的数据&#xff0c;以减少数据库的访问压力&#xff0c;提高数据读取速度。 • LRU&#xff08;Least Recently Used&#xff09;淘汰策略&#xff1a;Red…