消息队列RabbitMQ

server/2025/1/8 21:00:23/

目录

为什么需要消息队列?

什么是消息队列?

如何技术选型?

WorkQueues模型

Fanout交换机

Direct交换机

Topic交换机

声明队列交换机       

消息转换器  

消息可靠性问题

        1.发送者的可靠性

生产者重连

生产者确认

        Spring AMQP生产者消费确认的几种返回情况:

        如何处理消费者确认信息?

        2.MQ的可靠性

         数据持久化

         LazyQueue

        3.消费者的可靠性

消费者确认机制

消费失败处理

业务幂等性

        4.延迟消息

        死信交换机

        延迟消息插件

        取消超时订单 



为什么需要消息队列?

        可以利用RabbitMQ将同步的任务改成异步的操作

        在同步模式下,执行下单业务会将保存订单,减少库存,扣减金额,增加积分操作串行执行。如果其中一个环节变慢,或者失败,那么会影响最终的结果。

        如果利用消息队列实现异步模式,那么在保存订单成功后就可以向用户发送下单成功,减少用户等待时间,减少库存,扣减金额,增加积分的后台操作就发送给消息队列,让它异步处理,实现解耦。

        并且还有一个好处就是限流,在高并发请求下,服务器压力过大,处理的请求过多,此时可以利用消息队列,将部分请求发送到rabbitmq的消息队列中,然后减少峰值,利用峰值后的时间进行处理剩下的业务,用时间换空间,保证请求平稳

        什么是消息队列?

                消息队列就是应用程序相互通信的中间件

        如何技术选型?

        RabbitMQ内部的体系结构

WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

        比如面试问如何解决消息的堆积问题,就可以说用prefetch实现消息能者多劳的消费模式。或者用代码,池化、缓存、异步处理。

Fanout交换机

        比如一条消息被一个消费者消费了之后就会消失,如果我想让多个服务都对这个消息处理,那么就用交换机将消息发送到不同的队列中。

Direct交换机

        通过那么如果我有一个订单服务下单之后,需要有后续的操作,比如增加积分,扣款,减少库存,那么如果都放在同一个队列供多个消费者消费,那么如果我积分服务获取了那么其他服务就不用获取不到这个消息那么此时就出现一个direct交换机,通过一个交换机绑定指定队列,然后同一条消息通过交换机发送到指定的队列中。

        这种形式叫做定向路由

Topic交换机

        这种类似于direct交换机,只不过routingkey是可以是列表的方式设置,比如XXX.xxx.sss这种类型。还可以使用通配符,比如#代表0个单词或者多个单词,*单独代表一个单词。

声明队列交换机       

     

消息转换器  

        就比如我要传一个Map的一个对象,通过Spring AMQP它会先数据转换成jdk的序列化后存入队列中。

消息可靠性问题

        

        如果在消息发送中,支付服务向消息服务发送过程失败了,或者消息这个服务挂了,到不了交易服务,或者是交易服务挂了,处理不了消息。

        实际上就三种,发送者消息丢了,MQ挂了,消费者消费失败。

        1.发送者的可靠性

                生产者重连

                        

                生产者确认

                        消息投递到MQ,但是路由失败,但是还是会返回ACK因为成功投递到MQ但是没有路由到指定队列里。

                        消息投递到队列中,如果是临时的队列那么就返回ACK,如果是持久化队列,它会等持久化后再返回ACK。

                        否则就会返回投递失败NACK

                

        开启publisher-returns是开启路由失败的消息

                future是异步接收结果,执行一个步骤接受一个future,对象执行成功才会从future获取到结果。

        Spring AMQP生产者消费确认的几种返回情况:

        消息投递到MQ,但是路由原因产生异常,返回ACK

        临时消息投递到MQ入队成功返回ACK

        持久消息投递到MQ入队成功,吃就会成功返回ACK

        其他情况都返回NACK

        如何处理消费者确认信息?

        生产者确认机制会保证可靠性,但是会造成额外开销影响性能,尽量不要使用,如果使用不要开启publisher-retrurn 因为一般路由失败的原因都是每写对路由的地址。对于NACK可以限制重试次数。

        2.MQ的可靠性

                数据持久化

                        用MessageBuilder设置消息并配置消息是持久化的,因为普通直接发送是非持久化的消息,然后像MQ发送100w条消息,因为MQ性能比较强大,能承载每秒10W的QBS。

       因为MQ为了让MQ处理消息新能快一点,所以把消息存到内存中,如果一次性接收到大量的消息的时候就会出现pageOUT,意思就是MQ接受消息的内存满了,要等待一会儿,等消费者消费了一部分之后才会继续接受消息,此时消息会短暂停止接受消息。

        开启了持久化消息机制后,接收消息的时候就会向硬盘中存入消息,等内存满了就会直接清空消息。就不太会出现消息pageOUT情况,因为每个消息都要存到磁盘中,所以最快的速度可能会慢一点。

                LazyQueue

                        接收到消息之后会直接存入磁盘中,消息会存储最新的2048条消息,读取消息的时候会读取磁盘到内存。

                        在3.12版本之后所有队列都认为是lazyqueue

        3.消费者的可靠性

                        消费者确认机制

        reject是消息本身有问题才使用

                        消费失败处理

                如果执行nack将会重新投递,那么这个重新入队再重新发送给消费者,然后再次出异常返回nack,无限循环导致mq消息处理飙升带来服务器压力。所以可以用失败重试机制Spring的retry如果消费者出现异常利用本地重试机制。

此时本地重试失败后,消息就会消失

于是

        编写类的时候要在失败重试机制之后才生效,此时才会将消息发送到指定交换机里

所以要在类上加入@ConditionOnProperty prefix 和 name是文件指定属性的前缀和名字,havingValue等于true的时候才生效

        业务幂等性

                唯一ID

有写入数据库的操作性能有影响,而且要用数据库中的id和消息的id对比,会对原有的代码有侵入性。

        通过业务判断幂等性

        4.延迟消息

                如果使用定时任务,就会不断扫描支付状态,订单数量过大,这样会把数据库压力激增。如果使用MQ延迟发送消息,用户下单后会向MQ发送消息,30分之后消费消息进行判断订单状态,如果还是未支付就取消订单,减轻数据库压力。

        死信交换机

        消息过期后在第一个队列中就会变成死信,那么此时就会投递到死信交换机中然后供后面的消费者消费,间接的视线一个延迟消息的功能。实现繁琐,只是一种实现方案,它最主要是用来实现消息安全的兜底方案。

        延迟消息插件


配置交换机的时候要加一个delay参数配置延迟时间

每有一个定时任务,就会内部维护一个定时时钟,会对CPU造成压力,所以只适合定时时间较短的任务。并发很高,那么很快队列就会满掉。

        取消超时订单 


http://www.ppmy.cn/server/156881.html

相关文章

网络安全、Web安全、渗透测试之笔经面经总结

本篇文章涉及的知识点有如下几方面: 1.什么是WebShell? 2.什么是网络钓鱼? 3.你获取网络安全知识途径有哪些? 4.什么是CC攻击? 5.Web服务器被入侵后,怎样进行排查? 6.dll文件是什么意思,有什么…

Ruby语言的数据库编程

Ruby语言的数据库编程 引言 随着互联网的发展和数据量的急剧增加,数据库的应用已经成为软件开发中必不可少的一部分。Ruby语言因其简洁优雅的语法和强大的框架(如Ruby on Rails)而在开发者中备受欢迎。本文将深入探讨Ruby语言中的数据库编程…

中建海龙:科技助力福城南产业片区绿色建筑发展

在快速发展的城市化进程中,绿色建筑以其环保、节能、可持续的特点日益受到重视。作为建筑工业化领域的领军企业,中建海龙科技有限公司(简称“中建海龙”)凭借其卓越的科技实力和创新举措,在推动绿色建筑发展方面做出了…

LeetCode 力扣 热题 100道(二十八)矩阵置零(C++)

给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 class Solution { public:void setZeroes(vector<vector<int>>& matrix) {int m matrix.size();int n matrix[0].size();// 标记第…

JVM vs JDK vs JRE

JVM是Java虚拟机的缩写&#xff0c; 用于实现Java的一次编译&#xff0c;处处运行。 Java代码写成.class后&#xff0c;由本地的虚拟机运行。 JDK&#xff08;Java Development Kit&#xff09;是一个功能齐全的 Java 开发工具包&#xff0c;供开发者使用。 JDK包含了JRE。…

009:传统计算机视觉之边缘检测

本文为合集收录&#xff0c;欢迎查看合集/专栏链接进行全部合集的系统学习。 合集完整版请参考这里。 本节来看一个利用传统计算机视觉方法来实现图片边缘检测的方法。 什么是边缘检测&#xff1f; 边缘检测是通过一些算法来识别图像中物体之间或者物体与背景之间的边界&…

Spring Cloud Security集成JWT 快速入门Demo

一、介绍 JWT (JSON Web Token) 是一种带有绑实和信息的简单标准化机制&#xff0c;在信息通信中用于验证和信息传递。尤其在应用中使用Spring Cloud实现分布式构建时&#xff0c;JWT可以作为一种无状态验证原理的证明。 本文将进一步描述如何在Spring Cloud Security中集成JW…

WebSocket底层原理及 java 应用

WebSocket 底层原理 1. WebSocket 协议的基本原理 WebSocket 是一个在客户端和服务器之间建立持久、全双工的连接的协议。与传统的 HTTP 请求/响应模型不同&#xff0c;WebSocket 允许客户端和服务器双方通过一个持久的连接进行双向通信。 1.1 WebSocket 握手过程 WebSocke…