使用RabbitMQ

embedded/2024/12/26 22:12:16/

一、MQ是什么

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信,主要功能业务解耦。

二、市面上常见的MQ产品

RabbitMQ、RocketMQ(阿里的)、Kafka 、 ActiveMQ(很少用了)

三、为什么要用MQ

3.1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式

串行化方式:将用户信息注册到数据库以后再发送邮件然后再发送短信,这三步完成以后才返回给客户端。但是邮件短信这种东西不是必须立马发送给用户的,这样就会导致我们使用串行化会很慢

并行化方式:就是用户信息注册到数据库以后,发送邮件的同时发送信息,虽然比串行化快一点,但是依旧是要等待发送完邮箱和短信才能返回给客户端,依旧不够快

使用消息队列:

使用消息队列,用户只管发送请求,而写入数据库到写入消息队列这段时间交给MQ来处理,起两个消费者关注消息队列去消费消息;这样我们发现用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),而消费消息队列是异步处理的

3.2、应用解耦

场景说明:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

订单系统只负责写入消息,库存系统只负责消费消息,大大提高了效率并且也将我们的订单模块和库存模块解耦了,哪怕库存系统宕机了也不会影响到订单系统下单,只需要修复库存系统重新去消费消息队列的消息即可;

3.3、流量削峰

场景说明: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

1.可以控制活动人数,超过此一定阀值的订单直接丢弃.

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

四、交换机类型

1、Fanout Exchange(广播交换机):

扇型(广播)交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

2、Direct Exchange(直连交换机):

直连型交换机,根据RoutingKey(路由键)路由到不同的队列

3、Topic Exchange (主题交换机):

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。(开始计算)

简单地介绍下规则:

* (星号) 用来表示一个单词 (必须出现的)

# (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子

队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#

如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;

如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

如果只有 # ,它就实现了扇形交换机的功能。

所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能

五、springboot整合RabbitMQ

1、使用前先引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

2、配置RabbitMQ连接

在application.properties文件里配置

3、使用直连交换机(其他两个使用方式一样,无非是路由键是否使用以及路由键的配置规则*,#等)

消费者:

java">@Configuration
public class DirectConsumer {//注册一个队列@Bean  //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的public Queue queue(){return   QueueBuilder.durable("Direct_Q01").maxLength(100).build();}//注册交换机@Beanpublic DirectExchange exchange(){//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的return  ExchangeBuilder.directExchange("Direct_E01").build();}//绑定交换机与队列关系@Beanpublic Binding binding(Queue queue,DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("RK01");}//启动一个消费者@RabbitListener(queues = "Direct_Q01")public void receiveMessage(String msg){System.out.println("收到消息:"+msg);}}

注意,如果只设置了一个队列或者交换机,那么在rabbitmq的网页中是看不到的,因为需要有绑定关系,只有建立的绑定关系才能够看到相应的队列和交换机

生产者:

java">@Service
public class DirectProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object message) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", message);}
}

当然我们在实际开发中不可能只传递字符串,也有可能传递对象,这时候需要定义一个消息转换器

java">package com.by.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

这个Bean 是一个 MessageConverter 类型的对象,这个Bean 将被自动注入到任何需要 MessageConverter 的地方,以确保所有消息处理都使用这个转换器;他的作用定义了一个消息转换器

messageConverter Bean 的作用:消息序列化、消息反序列化、确保一致性(所有的消息都遵循统一的json格式)

六、交换机、队列、消费者之间的关系

一个交换机对应多个队列,每个队列对应一个消费者的时候:

一个队列对应多个消费者的时候:

七、死信交换机

什么是死信

死信就是消息在特定场景下的一种表现形式,这些场景包括:

当消息队列满的时候,我们的默认策略是丢弃旧消息到死信队列,让新的消息插入进来

1. 消息被拒绝访问,即消费者返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 (其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间。.ttl(20*1000) 进入的是 先进业务队列的数据,超时之后送给死信交换机

4. 消息队列达到最大容量 .maxLength(5)

什么是死信队列

存储死信消息的队列

当有消息变成死信了,那么这个消息就会重新被死信交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

死信队列的使用

他的使用方式和正常的队列一样,需要注意的是需要把创建的死信队列和死信交换机绑定到正常的业务队列上

java">@Slf4j
@Configuration
public class DeadConsumer {@Bean//死信队列public Queue deadQueue() {return QueueBuilder.durable("dead-q").build();}@Beanpublic Exchange deadExchange() {return ExchangeBuilder.fanoutExchange("dead-e").autoDelete().build();}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("").noargs();}@Beanpublic Queue fanoutQueue() {return QueueBuilder.durable("fanout-q").maxLength(10)//队列容积最大为10,超过的消息将发送到死信交换机.deadLetterExchange("dead-e").ttl(5000)//五秒没消费直接发送到死信交换机.build();}@Beanpublic Exchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-e").durable(true).build();}@Beanpublic Binding fanoutBinding() {return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()).with("").noargs();}@RabbitListener(queues = "fanout-q")public void consume(Ordering ordering){log.debug("消费者->{}", JSONUtil.toJsonStr(ordering));
//消费者发生异常,不消费消息直接进入死信队列int i = 5/0;}}

八、应答模式

RabbitMQ 中的消息应答模式主要包括两种:自动应答(Automatic Acknowledgement)和手动应答(Manual Acknowledgement)。

自动应答:在这种模式下,一旦消息被传递给消费者,RabbitMQ 就认为这条消息已经被成功消费,并立即从队列中移除。这种方式简化了编程模型,因为它不需要消费者显式地发送确认。

手动应答:在这种模式下,消息不会被自动确认,而是等待消费者显式地发送一个确认信号。只有当消费者明确表示已经成功处理了消息之后,RabbitMQ 才会将这条消息从队列中移除。

三种应答方法:

方法

作用

Channel.basickAck

用于肯定确认

Channel.basickNack

用于否定确认(可以处理单个消息或多个消息)

Channel.basickReject

用于否定确认(只能处理单个消息)

自动应答实现:

# 自动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = auto
java">@Component
public class AutoAcknowledgementConsumer {@RabbitListener(queues = "yourQueue")public void consumeMessage(String message, Message amqpMessage) {// 处理消息...// 框架会在方法执行完成后自动发送ack确认消息}
}

手动应答实现:

# 手动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual

九、如何保证消息可靠性

从两方面考虑

1、消费者端

①消息确认:消费者在收到消息后默认情况下rabbitmq会自动应答(autoAck=true),为了保证消息可靠性可设置为手动应答,使得消费者在处理完消息后手动发送确认(basicAck),如果消费者在处理消息时发生异常,那么消息会被重新投递给其他消费者;缺点就是代码多,容易出现死循环

②死信队列:如果消息不能被正常消费,就将消息放入死信队列,由人工干预,后续来分析和处理

2、生产者端

①消息持久化:

当生产者发布消息时,可以选择将其标记为持久化(persistent).这意味着即使 RabbitMQ 服务器重启,消息也不会丢失,因为它们会被存储在磁盘上

②消息确认机制:

确认机制分为发布者确认机制和发布者退回机制

发布者确认机制:Publisher Confirm机制允许RabbitMQ服务器通知生产者一个消息是否已经被交换机正确接收。当publisher-confirm-type设置为CORRELATED时,RabbitMQ会向生产者发送确认或否定响应,确认消息已到达交换机,但不保证消息已被路由到至少一个队列中。

总结来说是生产者到交换机的确认

spring.rabbitmq.publisher-confirm-type = CORRELATED
//开启方式,在properties文件中配置

代码示例:

java">public class DirectProvider {
//在生产者端设置确认机制@PostConstructpublic void init() {//发布者确认机制的回调,肯定·要在发布之前设置回调,所以放在send方法上面//如果消息到达交换机就调用这个方法,如果没有到达也发送回调//这里只是注册一个回调方法rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack){log.debug("直连发送成功");}else {log.debug("直连发送失败");}});}@AutowiredRabbitTemplate rabbitTemplate;public void send(Ordering ordering) {CorrelationData correlData = new CorrelationData(ordering.getId()+"");rabbitTemplate.convertAndSend("direct", "key2", ordering, correlData);}
}

发布者退回:

Publisher Return机制用于当消息无法按照路由键规则路由到任何队列时,或者由于其他原因(例如队列满、消息过大等)而被交换机拒绝时,RabbitMQ将消息返回给生产者。

总结来说就是交换机到队列的消息确认(消息只要到达任何一个队列就算成功,并且只有消息没有路由到队列才会触发这个回调)

开启方式:

spring.rabbitmq.publisher-returns = true

代码示例:

java">public class DirectProvider {@PostConstructpublic void init() {//消息回退机制,监听消息是否到达队列rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息回退"+new String(message.getBody()));System.out.println("消息回退码"+replyCode);System.out.println("消息回退原因"+replyText);System.out.println("消息回退交换机"+exchange);System.out.println("消息回退路由键"+routingKey);});}@AutowiredRabbitTemplate rabbitTemplate;public void send(Ordering ordering) {CorrelationData correlData = new CorrelationData(ordering.getId()+"");rabbitTemplate.convertAndSend("direct", "key2", ordering, correlData);}
}

十、如何保证消息幂等性

在消费端做手脚,消费者消费的时候避免重复消费,加上一个BizId

BizId由业务标识加上数字组成,这样我们可以很清楚的知道是什么业务生成的BizId。


http://www.ppmy.cn/embedded/149007.html

相关文章

JVM 详解

一. JVM 内存区域的划分 1. 程序计数器 程序计数器是JVM中一块比较小的空间, 它保存下一条要执行的指令的地址. [注]: 与CPU的程序计数器不同, 这里的下一条指令不是二进制的机器语言, 而是Java字节码. 2. 栈 保存方法中的局部变量, 方法的形参, 方法之间的调用关系. 栈又…

深度学习中batch_size

Batch size调整和epoch/iteration的关系 训练数据集总共有1000个样本。若batch_size10&#xff0c;那么训练完全体样本集需要100次迭代&#xff0c;1次epoch。 训练样本10000条&#xff0c;batchsize设置为20&#xff0c;将所有的训练样本在同一个模型中训练5遍&#xff0c;则…

网络安全公司150强

本清单主要列出专门或主要专注于网络安全的公司。 公司名称 业务描述 1Password 企业密码安全管理 A10 Networks 安全云应用服务 Abnormal Security 云原生邮件安全 Absolute 终端防护平台 Agari 电子邮件和网络钓鱼威胁防护 Aqua Security 云原生应用保护 Arcse…

ECharts热力图-笛卡尔坐标系上的热力图,附视频讲解与代码下载

引言&#xff1a; 热力图&#xff08;Heatmap&#xff09;是一种数据可视化技术&#xff0c;它通过颜色的深浅变化来表示数据在不同区域的分布密集程度。在二维平面上&#xff0c;热力图将数据值映射为颜色&#xff0c;通常颜色越深表示数据值越大&#xff0c;颜色越浅表示数…

设计模式——组合模式

文章目录 1.定义2. 结构组成3. 组合模式结构4. 示例代码5. 模式优势6. 应用场景 1.定义 组合模式是一种设计模式&#xff0c;它允许将对象组合成树形结构来表示 “部分 - 整体” 的层次关系&#xff0c;使得客户端可以统一地处理单个对象和对象组合&#xff0c;而无需区分它们…

VS2022 无法使用GitHub账户登录/无法使用copilot 解决方案

无法登录github 账户 错误提示如下&#xff1a; We encountered an issue while adding your GitHub account. Exception Type: GitHubSignInException. Inner Exception: StreamJsonRpc.RemoteInvocationException. Error Message: The input is not a valid Base-64 str…

vscode+编程AI配置、使用说明

文章目录 [toc]1、概述2、github copilot2.1 配置2.2 使用文档2.3 使用说明 3、文心快码&#xff08;Baidu Comate&#xff09;3.1 配置3.2 使用文档3.3 使用说明 4、豆包&#xff08;MarsCode&#xff09;4.1 配置4.2 使用文档4.3 使用说明 5、通义灵码&#xff08;TONGYI Lin…

redis开发与运维-redis02-redis数据类型与命令总结

文章目录 【README】【1】redis通用命令与数据结构【1.1】通用命令【1.2】数据结构与内部编码【1.3】redis单线程架构【1.3.1】redis单线程优缺点 【2】字符串&#xff08;值的类型为字符串&#xff09;【2.1】常用命令【2.1.1】设置值【2.1.2】获取值【2.1.3】批量设置值【2.1…