目录
为什么需要消息队列?
什么是消息队列?
如何技术选型?
WorkQueues模型
Fanout交换机
Direct交换机
Topic交换机
声明队列交换机
消息转换器
消息可靠性问题
1.发送者的可靠性
生产者重连
生产者确认
Spring AMQP生产者消费确认的几种返回情况:
如何处理消费者确认信息?
2.MQ的可靠性
数据持久化
LazyQueue
3.消费者的可靠性
消费者确认机制
消费失败处理
业务幂等性
4.延迟消息
死信交换机
延迟消息插件
取消超时订单
为什么需要消息队列?
可以利用RabbitMQ将同步的任务改成异步的操作
在同步模式下,执行下单业务会将保存订单,减少库存,扣减金额,增加积分操作串行执行。如果其中一个环节变慢,或者失败,那么会影响最终的结果。
如果利用消息队列实现异步模式,那么在保存订单成功后就可以向用户发送下单成功,减少用户等待时间,减少库存,扣减金额,增加积分的后台操作就发送给消息队列,让它异步处理,实现解耦。
并且还有一个好处就是限流,在高并发请求下,服务器压力过大,处理的请求过多,此时可以利用消息队列,将部分请求发送到rabbitmq的消息队列中,然后减少峰值,利用峰值后的时间进行处理剩下的业务,用时间换空间,保证请求平稳
什么是消息队列?
消息队列就是应用程序相互通信的中间件
如何技术选型?
RabbitMQ内部的体系结构
WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
比如面试问如何解决消息的堆积问题,就可以说用prefetch实现消息能者多劳的消费模式。或者用代码,池化、缓存、异步处理。
Fanout交换机
比如一条消息被一个消费者消费了之后就会消失,如果我想让多个服务都对这个消息处理,那么就用交换机将消息发送到不同的队列中。
Direct交换机
通过那么如果我有一个订单服务下单之后,需要有后续的操作,比如增加积分,扣款,减少库存,那么如果都放在同一个队列供多个消费者消费,那么如果我积分服务获取了那么其他服务就不用获取不到这个消息那么此时就出现一个direct交换机,通过一个交换机绑定指定队列,然后同一条消息通过交换机发送到指定的队列中。
这种形式叫做定向路由
Topic交换机
这种类似于direct交换机,只不过routingkey是可以是列表的方式设置,比如XXX.xxx.sss这种类型。还可以使用通配符,比如#代表0个单词或者多个单词,*单独代表一个单词。
声明队列交换机
消息转换器
就比如我要传一个Map的一个对象,通过Spring AMQP它会先数据转换成jdk的序列化后存入队列中。
消息可靠性问题
如果在消息发送中,支付服务向消息服务发送过程失败了,或者消息这个服务挂了,到不了交易服务,或者是交易服务挂了,处理不了消息。
实际上就三种,发送者消息丢了,MQ挂了,消费者消费失败。
1.发送者的可靠性
生产者重连
生产者确认
消息投递到MQ,但是路由失败,但是还是会返回ACK因为成功投递到MQ但是没有路由到指定队列里。
消息投递到队列中,如果是临时的队列那么就返回ACK,如果是持久化队列,它会等持久化后再返回ACK。
否则就会返回投递失败NACK
开启publisher-returns是开启路由失败的消息
future是异步接收结果,执行一个步骤接受一个future,对象执行成功才会从future获取到结果。
Spring AMQP生产者消费确认的几种返回情况:
消息投递到MQ,但是路由原因产生异常,返回ACK
临时消息投递到MQ入队成功返回ACK
持久消息投递到MQ入队成功,吃就会成功返回ACK
其他情况都返回NACK
如何处理消费者确认信息?
生产者确认机制会保证可靠性,但是会造成额外开销影响性能,尽量不要使用,如果使用不要开启publisher-retrurn 因为一般路由失败的原因都是每写对路由的地址。对于NACK可以限制重试次数。
2.MQ的可靠性
数据持久化
用MessageBuilder设置消息并配置消息是持久化的,因为普通直接发送是非持久化的消息,然后像MQ发送100w条消息,因为MQ性能比较强大,能承载每秒10W的QBS。
因为MQ为了让MQ处理消息新能快一点,所以把消息存到内存中,如果一次性接收到大量的消息的时候就会出现pageOUT,意思就是MQ接受消息的内存满了,要等待一会儿,等消费者消费了一部分之后才会继续接受消息,此时消息会短暂停止接受消息。
开启了持久化消息机制后,接收消息的时候就会向硬盘中存入消息,等内存满了就会直接清空消息。就不太会出现消息pageOUT情况,因为每个消息都要存到磁盘中,所以最快的速度可能会慢一点。
LazyQueue
接收到消息之后会直接存入磁盘中,消息会存储最新的2048条消息,读取消息的时候会读取磁盘到内存。
在3.12版本之后所有队列都认为是lazyqueue
3.消费者的可靠性
消费者确认机制
reject是消息本身有问题才使用
消费失败处理
如果执行nack将会重新投递,那么这个重新入队再重新发送给消费者,然后再次出异常返回nack,无限循环导致mq消息处理飙升带来服务器压力。所以可以用失败重试机制Spring的retry如果消费者出现异常利用本地重试机制。
此时本地重试失败后,消息就会消失
于是
编写类的时候要在失败重试机制之后才生效,此时才会将消息发送到指定交换机里
所以要在类上加入@ConditionOnProperty prefix 和 name是文件指定属性的前缀和名字,havingValue等于true的时候才生效
业务幂等性
唯一ID
有写入数据库的操作性能有影响,而且要用数据库中的id和消息的id对比,会对原有的代码有侵入性。
通过业务判断幂等性
4.延迟消息
如果使用定时任务,就会不断扫描支付状态,订单数量过大,这样会把数据库压力激增。如果使用MQ延迟发送消息,用户下单后会向MQ发送消息,30分之后消费消息进行判断订单状态,如果还是未支付就取消订单,减轻数据库压力。
死信交换机
消息过期后在第一个队列中就会变成死信,那么此时就会投递到死信交换机中然后供后面的消费者消费,间接的视线一个延迟消息的功能。实现繁琐,只是一种实现方案,它最主要是用来实现消息安全的兜底方案。
延迟消息插件
配置交换机的时候要加一个delay参数配置延迟时间
每有一个定时任务,就会内部维护一个定时时钟,会对CPU造成压力,所以只适合定时时间较短的任务。并发很高,那么很快队列就会满掉。