🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
目录
- 1. 延迟队列
- 1.1 概念
- 1.2 TTL+死信队列实现
- 1.3 延迟队列插件
- 1.3.1 安装延迟队列
- 1.3.2 基于插件延迟队列实现
- 1.4 常见面试题
- 2. 事务
- 2.1 配置事务
- 2.2 配置队列
- 2.3 生产者
- 3. 消息分发
- 3.1 概念
- 3.2 应用场景
- 3.2.1 限流
- 3.2.2 负载均衡
1. 延迟队列
1.1 概念
延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费.
RabbitMQ本身并没有直接支持延迟队列的功能,但是可以通过TTL+死信队列的方式结合模拟出延迟队列的功能.
假设一个应用中需要每条消息都为10s延迟,生产者通过normal_exchange
这个交换器将发送的消息存储在normal_queue
这个队列,之后为这个队列或者队列中的消息的ttl设置为10s.但是消费者订阅的队列并不是normal_queue
这个队列,而是dlx_queue
这个队列,当消息从normal_queue
这个队列中的消息经历10s过期之后存入dlx_queue
这个队列中,消费者就恰好消费到了延迟10s之后的消息.
1.2 TTL+死信队列实现
代码实现:
- 先看TTL+死信队列实现延迟队列
定义正常队列和死信队列,绑定正常队列和死信交换机.
@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();
}
@Bean
public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange
){return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public Queue dlxQueue(){return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxQueue") Queue queue,@Qualifier("dlxExchange") DirectExchange exchange
){return BindingBuilder.bind(queue).to(exchange).with("dlx");
}
编写生产者:
发送一条10s过期的消息,再发送一条20s过期的消息.
@RequestMapping("/delay")
public String delay(){Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setExpiration("10000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setExpiration("20000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);return "发送成功";
}
消费者:
@Component
public class DelayListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void listener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}
}
调用接口之后观察控制台接收消息的结果:等待10s和20s之后分别接收到消息
延迟队列希望达到的效果就是延迟一定的时间之后才收到消息,TTL刚好给消息设置延迟时间,成为死信,成为死信之后就会被投递到死信队列中,这样消费者就可以一直消费死信队列的消息就可以了.
但是这样的模式也会存在一定的问题
我们可以先发送20s的数据,再发送10s的数据:
@RequestMapping("/delay")
public String delay(){Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setExpiration("20000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message2);Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setExpiration("10000");rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal",message1);return "发送成功";
}
通过控制台观察死信队列消费情况:
我们发现10s过期的消息和20s过期的消息同时被消费者收到.10s过期的消息和20s过期的消息同时进入了死信队列.
这是由于在消息过期之后,消息不会被马上丢弃,消息只在消息被消费者消费的时候,即出队列的时候检测消息是否过期(扫描队头的消息是否过期),由于20s的消息在10s消息的前面,队列会优先扫描20s过期的消息,10s过期的消息还暂时不会被扫描到,当队列扫描到20s的消息过期的时候,10s的消息才会被扫描到,队列这才会认为10s的这条消息已经过期了,所以他和20s的消息便同时进入了死信队列中.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是⼀致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每⼀种不同延迟时间的消息建立单独的消息队列.
1.3 延迟队列插件
RabbitMQ官方也提供了一个延迟的插件来实现延迟队列的功能.
1.3.1 安装延迟队列
- 下载并上传插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载ez文件.下载到Windows环境之后通过Xshell上传到服务器.(这里需要注意的是,我们下载的插件版本需要和我们操作系统上安装的RabbitMQ的版本一致)
我们在上传到服务器中的时候,我们需要把该文件上传到/usr/lib/rabbitmq/plugins
目录中,RabbitMQ本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建.
- 启动插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 验证插件
在RabbitMQ管理平台查看,新建交换机的时候是否有延迟消息的选项,如果有就说明延迟消息插件已经正常运行了.
1.3.2 基于插件延迟队列实现
- 声明交换机和队列
在交换机的声明之后加上delay()
选项,这里需要注意的是,虽然我们叫的是延迟队列,但是我们是在交换机上声明延迟的.
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_QUEUE = "delay_queue";
@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().durable(true).build();
}
@Bean
public Queue delayQueue(){return QueueBuilder.durable(Constant.DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange,@Qualifier("delayQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("delay");
}
- 生产者
发送两条消息,并设置延迟时间,这里我们先设置20s的,再设置10s的,看看上面的问题有没有得到解决.我们前面使用ttl+死信队列的方式实现消息延迟的时候,我们设置消息设置的是过期时间(setExpiration),我们在这里设置的时候设置的是延迟时间(setDelayLong).
@RequestMapping("/delay")
public String delay(){Message message2 = new Message("发送delay消息20s".getBytes(StandardCharsets.UTF_8));message2.getMessageProperties().setDelayLong(20000L);rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message2);Message message1 = new Message("发送delay消息10s".getBytes(StandardCharsets.UTF_8));message1.getMessageProperties().setDelayLong(10000L);rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay",message1);return "发送成功";
}
- 消费者
@Component
public class DelayListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void listener(Message message){long deliveryTag = message.getMessageProperties().getDeliveryTag();String msg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+msg);}
}
- 运行程序,观察控制台日志和RabbitMQ管理界面
我们看到delay_exchange的交换机类型是"x-delay-message".
调用接口,发送消息,观察控制台日志:
我们发现我们上述的问题得到了解决,我们首先收到了延迟10s的消息,后收到了延迟20s的消息.
1.4 常见面试题
延迟队列作为RabbitMQ的高级特性,也是面试的一大重点
- 介绍一下RabbitMQ的延迟队列
延迟队列就是在消息发送以后,并不想让消费者立刻拿到消息,而是等待特定的时间之后,消费者才可以拿到消息进行消费.
但是RabbitMQ本身并没有直接实现延迟队列,有以下的两种方法来实现:- 通过ttl+死信队列的方式来实现
- 但是通过这种方式实现存在一定的问题,如果延迟时间长的消息先到达队列,延迟时间短的后到达队列,延迟时间短的不会即时被消费者收到.
- 可以通过官方提供的延迟插件实现延迟功能.
- 应用场景
- 订单在10min之内未支付自动取消.
- 用户在注册成功之后,3天之后发起调查问卷
- 用户发起退款,24小时之内商家未处理,则默认同意退款.
- 二者对比
- 基于死信实现的延迟队列
优点就是灵活,不需要额外的插件来支持,缺点就是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性. - 基于插件实现的延迟队列
优点就是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了DLX存在消息顺序问题.
缺点就是需要依赖特定的插件,有运维的工作,其次RabbitMQ的版本必须和插件的版本对应.
- 基于死信实现的延迟队列
2. 事务
RabbitMQ是基于AMQP协议实现的,该协议实现了事务的机制,因此RabbitMQ也支持事务的机制,Spring AMQP也提供了对事务的额相关操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败.
2.1 配置事务
配置事务管理器的时候,分为两步,首先创建RabbitTemplate,使用setChannelTransacted(true)
开启RabbitTemplate的信道事务.之后创建事务管理器RabbitTransactionManager
.
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate transactionTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
2.2 配置队列
@Bean
public Queue tansactionQueue(){return QueueBuilder.durable("trans_queue").build();
}
2.3 生产者
在生产者中,我们需要在方法之上加上@Transactional
才可以生效.我们在异常发生之前发送一条消息,在异常发生之后发送一条消息,查看数据是否会被回滚.
@RequestMapping("/trans")
@Transactional
public String trans(){transactionTemplate.convertAndSend("","trans_queue","trans1...");int i = 5/0;transactionTemplate.convertAndSend("","trans_queue","trans2...");return "发送成功";
}
测试:
在发送消息之后,报出了500的错误码.
我们看到trans_queue中没有接收到消息,说明第一条消息被回滚了.
3. 消息分发
3.1 概念
RabbitMQ队列拥有多个消费者的时候,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者.这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者处理消息即可.
默认的情况下,消费者是轮训进行分发的,而不管消费者是否已经消费并已经确认了消息.这种方式是不太合理的,试想一下,如果某些消费者的消费速度较慢,而某些消费者的消费速度很快,这就会导致某些消费者的消息发生积压,某些消费者则很空闲,进而导致应用的整体吞吐量下降.所以我们就可以使用消息分发来解决问题.
3.2 应用场景
消息分发的常见应用场景如下:
- 限流
- 非公平分发
3.2.1 限流
RabbitMQ提供了限流的机制,可以控制消费端一次只能拉取N个请求.
通过设置prefetchCount参数,同时也必须设置消息应答方式为手动应答.
prefetchCount: 控制消费者从队列中预取消息的数量,以此来实现流控制和负载均衡.
代码示例:
- 首先我们需要在配置文件中加入
prefetch
参数
listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000msenabled: truemax-attempts: 5prefetch: 5 # 表示一个队列最多可以有5条未确认的消息
- 配置队列和交换机
@Bean
public DirectExchange QOSExchange(){return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).durable(true).build();
}
@Bean
public Queue QOSQueue(){return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
@Bean
public Binding QOSBinding(@Qualifier("QOSQueue") Queue queue,@Qualifier("QOSExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos");
}
- 消费者监听队列
首先我们先不对消息进行手动ack
@Component
public class QOSListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));
// channel.basicAck(deliveryTag,true);}
}
- 生产者发送消息,一次性发送20条消息
@RequestMapping("/qos")
public String qos(){for (int i = 0;i < 20 ;i++){rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","message"+i);}return "发送成功";
}
- 测试
调用接口,发送消息:
我们看到,由于没有对消息进行手动应答,我们控制台只收到了5条消息.
由于5条消息还没有ack掉,所以剩下的15条消息就在队列中发生了堆积.
之后我们对消息进行手动ack.观察控制台:
我们看到消息全部被应答了.
3.2.2 负载均衡
我们也可以使用此配置来实现负载均衡.
如图所示的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费者很忙,一个消费者很闲.这是因为RabbitMQ只是在消息进入队列的时候分派消息,它不考虑消费者未确认消息的数量.
我们可以设置prefetch=1
的方式来实现负载均衡.告诉RabbitMQ一次只给一个消费者发送消息,也就是说,在一个消费者对前一条消息进行确认之前,不会对该消费者发送新的消息,相反,它会将它分配给一个不处在繁忙阶段的消息队列.
代码示例:
- 配置prefetch参数为1,将消息应答机制设置为手动应答
listener:simple:acknowledge-mode: manual #需要设置为手动应答retry:initial-interval: 1000msenabled: truemax-attempts: 5prefetch: 1 # 表示一个队列最多可以有1条未确认的消息
- 设置两个消费者,其中消费较慢的消费者使用
Thread.sleep(100)
来模拟消费慢.
@Component
public class QOSListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void listener(Message message, Channel channel) throws IOException, InterruptedException {Thread.sleep(100);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));channel.basicAck(deliveryTag,true);}@RabbitListener(queues = Constant.QOS_QUEUE)public void listener2(Message message,Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者2接收到消息,deliveryTag:"+deliveryTag+",消息内容:"+new String(message.getBody(),"UTF-8"));channel.basicAck(deliveryTag,true);}
}
- 测试
调用接口,向两个消费者发送消息
我们可以很明显的看到,消费者2消费消息的速度比消费者1快很多.
deliveryTag有重复是因为两个消费者使用的是不同的Channel,每个Channel上的deliveryTag 是独立计数的.