RabbitMq
使用mq的三个作用
第一个 异步操作
第二个 应用解耦
第三关 流量削峰
同步异步
同步调用就像是openfeign调用
一旦响应立即看到结果
异步通信就像是调用的时候可以干别的事,可以不用等待
开闭原则
面向修改关闭,面向拓展打开
mq用于流量削峰,多个业务之间的调用
镜像
请使用 m.daocloud.io/docker.io/rabbitmq:management 替代源镜像
MQ技术选项
先进的消息先出,先进先出
日志使用kafka
日志收集
RabbitMq快速入门
首先添加队列
想先消息发给交换机,那么查看消息是否能到达队列
发送消息
消息发送先到达交换机
交换机路由到队列
交换机是负责路由和转发消息的
他没有存储消息的能力
步骤 消息队列进行绑定,交换机进行绑定
交换机绑定了队列
新建两个队列,发送消息,到达队列
RabbitMq数据隔离
添加用户后没有可以访问的虚拟主机
所以需要添加虚拟主机
给每一个项目创建自己的用户
给每一个用户创建自己的virtual host
测试不同virtual host之间的数据隔离现象进行测试
首先创建用户
然后创建虚拟目录
java客户端-快速入门
AMQP
高级消息队列
advanced message queuing protocol
Spring AMQP
就是
发送的是有queue 还有交换机
快速入门
首先是引入了依赖
在父工程中引入了 amqp的依赖
包含rabbitmq
org.springframework.boot
spring-boot-starter-amqp
然后还有单元测试的依赖
org.springframework.boot
spring-boot-starter-test
还有jackson的依赖
com.fasterxml.jackson.dataformat
jackson-dataformat-xml
写完之后就直接写生产者 还有消费者的代码模块的功能的模块
然后再接收者里面写监听器
在监听器中写监听代码
写
@RabbitListener(queue=“topic.queue3”)
public void listeneTopicQueue3(String msg) {
sout(msg)
}
在配置文件中写
mq的地址文件信息
spring:
rabbitmq:
host: 39.101.68.162
port: 5672
virtual-host: /ces
username: ces
password: ces
listener:
simple:
prefetch: 1
下面的是负载的逻辑
然后写发送者代码信息
消息提供
使用rabbitTemplate
队列模式
@Test
void testSendMessage2Queue() {
String queueName = “simple.queue”;
String msg = “hello, amqp!”;
rabbitTemplate.convertAndSend(queueName, msg);
}
将对象转化并且发送给队列
fanout交换机
@Test
void testSendMessage2FanoutQueue() throws InterruptedException {
String exchangeName = “ces.fanout”;
// for (int i = 1; i <= 60; i++) {
String msg = “hello,everyone”;
// rabbitTemplate.convertAndSend(queueName, msg);
rabbitTemplate.convertAndSend(exchangeName, “”, msg);
Thread.sleep(20);
// }
}
topic交换机
@Test
void testSendMessage2TopicQueue() throws InterruptedException {
String exchangeName = “ces.topic”;
String msg = “测试”;
rabbitTemplate.convertAndSend(exchangeName, “china.news”, msg);
}
Work Queues
任务模型,简单的说就是让多个消费者绑定到一个队列,共同消费队列中的消息
rabbitTemplate.convertAndSend(queueName, “message” + i) 是一个在 Spring AMQP (RabbitMQ) 框架中提供的方法,用于简化消息发送的过程。这个方法结合了对象转换和消息发送两个步骤。
当你使用 convertAndSend() 方法时,Spring 会自动将你传递给该方法的对象转换为适合发送的消息格式,并将其发送到指定的交换机(Exchange)或队列(Queue)。这对于快速实现消息生产者非常有用,因为它避免了手动序列化对象和配置消息属性的需要。
信息只消费一次
测试案例
在监听器中使用的是
@RabbitListener(queue=“work.queue”)
public void listenWorkQueue1(String msg) {
System.err.println(“消费者1接收到了work.queue的消息:[” + msg + “]”);
}
然后发送发送者,使用的是rabbitTemplate
默认情况轮询
每次读取之后一条数据后才能获取另外一条
work模型是一种用法
多个消费者绑定到一个队列,可以加快消息处理速度
同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完后再处理另外一条,能者多劳
work工作模型
fanout交换机
刚才的测试都是简单测试
正在生产环境都是经过交换机exchange进行发送消息,而不是直接发送到队列
交换机的类型有三种
fanout 广播
direct 定向
topic 话题
因为交换机本身就具备路由功能的
消息要发送给多个微服务,微服务有多少个队列不管
这样每个微服务都能收到消息
现在发送消息到交换机
如果使用原始的操作命令,就是先写交换机,然后写队列
然后再交换机上面发送消息,之后队列就可以收到
direct交换机
direct 被称为定点路由
使用直连交换机进行测试
首先可以在操作页面上面写
发送之后
就可以在java代码中写了
在代码中写就是需要使用 exchange
还有key 还有msg
topic交换机
绑定的时候使用通配符
测试案例
首先是发送的消息者
@RabbitListener(queues = “topic.queue1”)
public void listenTopicQueue1(String msg) throws InterruptedException {
System.out.println(“消费者1接收到了topic.queue1的消息:[” + msg + “]”);
}
@RabbitListener(queues = “topic.queue2”)
public void listenTopicQueue2(String msg) throws InterruptedException {
System.out.println(“消费者2接收到了topic.queue2的消息:[” + msg + “]”);
}
就是先在mq页面创建交换机还有队列
交换机创建之后就是开始创建队列
direct交换机的key只能写完整的key
而 topic交换机的key可以写匹配模式,可以使用通配符
队列是不分种类的
交换机是分种类的
声明一个fanout类型的交换机
并且创建队列与其绑定
发送发不需要关心交换机的声明,只是关系交换机
队列交换机的声明,都是在消费者这边写
因为发送发不需要关心这些
就是写 交换机 交换机有3种类型
然后是写队列类型,队列只有一种
然后写绑定信息
@Bean
public Binding fanoutQueue3Binding(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
由于是fanout类型所以可以不用写指定
就是基于代码进行绑定交换机与队列
就是需要写configuration,就是需要写
交换机还有队列
就是需要写exchange 还有要写queue
然后写binding
使用BindingBuilder
基于注解声明
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = “direct.queue1”, durable = “true”), exchange = @Exchange(name = “direct.exchange”, type = ExchangeTypes.DIRECT), key = {“red”, “blue”}))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println(“消费者1接收到了direct.queue1的消息:[” + msg + “]”);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = “direct.queue1”, durable = “true”), exchange = @Exchange(name = “direct.exchange”, type = ExchangeTypes.DIRECT), key = {“red”, “yellow”}))
public void listenDirectQueue2(String msg) throws InterruptedException {
System.out.println(“消费者2接收到了direct.queue2的消息:[” + msg + “]”);
}
消息转化器书写
默认使用的是jdk的序列化所以,会乱码,所以使用json序列化
MQ高级
企业开发
消息可靠性问题
消息可靠性要保证一个消息被发送以后,至少被消费一次
可靠性问题就是有三方面
第一个是 发送者把消息发送给mq的过程中,把消息弄丢了
第二个是 mq自己把消息弄丢了
第三个是 mq中的消息被消费者消费时把消息弄丢了
生产者可靠性问题也就是消息的安全性
有两个解决方案
第一个生产者重连
第二个生产者确认
生产者重连信息
connection-timeout首先是连接的超时时间,就是发送消息时多少时间算超时,1s
retry 然后设置重试
enable开启超时重试机制
initial-interval :然后设置初始等待时间,就是间隔一定时间再试 ,就是间隔1000ms,就是1s时间
连接超时配置200ms就够了
超时等待时间 1s
生产者确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
只要收到的是ack
消息以及投递
生产者确认代码实现
在消息发送者的配置文件
通过applicationContextAwarea获取
config.MqConfirmConfig
开启消息生产者确认信息
生产者确认
RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:
消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
其他情况都会返回NACK,告知投递失败
这里的publisher-confirm-type 有三种模式可选:
异步回调方式:
我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理
我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback
springamqp生产者消息确认的几种返回情况
消息成功到达交换机,但是没有路由 ,返回的是ack,但是带着return的消息
消息到达mq,但是
ack不用管,那么在nack就进行消息重发就行了
如何处理生产者的确认消息?
配置重连机制
生产者确认需要额外的开销
尽量不要使用
首先,我们可以在rabbitmq中配置生产者的重连机制,也就是在连接mq的网络波动时候可以进行重新连接
这样可以避免因为网络波动导致消息发送
如果是其他原因,rabbitmq还支持生产者确认机制,只要开启了生产者确认机制,那么发送消息
到mq时 ,mq就会给一个回执
如果说发送,返回nack的回执
这样可以基于回执的情况去判断
如果失败可以重发消息
通过以上手段可以保证生产者消息的可靠性
但是以上手段会增加
额外的资源的开销
所以不用开启生产者确认机制
MQ的可靠性-数据持久化
解决了生产者的可靠性问题了
确保了消息一定能到达mq
到达mq之后,那么mq本身把消息弄丢了呢
到达mq,mq重启了那么消息可能丢了
比如先向队列中发送消息
之后然后重启mq,使用docker重启mq
重启之后刷新,队列中的消息没有了,丢失了
默认情况下mq把消息保存在内存中
保存内存中可以降低消息收发的延迟
就像redis,使用的全内存
如果队列满了,那么mq就会进入阻塞,再来消息mq就进不去了,会卡,然后才能进入,继续去处理消息
20w进行一次pageout
所以内存存储消息,一方面可能导致消息丢失,另一方面可能导致消息阻塞
那么解决方案数据持久化与改变队列的模式
在早期 mq3.6之前,我们要解决这种问题会采用这种方式,采用数据持久化
而在mq3.6以后,mq提出了新的功能,lazy queue ,他可以解决消息丢失的问题
数据持久化信息
rabbitMq实现数据持久化有3个方面
交换机持久化
队列持久化
在spring中 我们创建 交换机还有队列都是默认持久化
消息持久化
第一,mq重启后消息不会丢失
第二,消息保存到磁盘
那么,我们就不会有频繁的pageout
我们大量的消息往内存中放,之后放置磁盘
那么这个过程中整个mq是阻塞状态是不可访问的
一旦进入pageout,mq就会阻塞,之后pageout结束,才能接受请求
默认情况,mq发送的消息不是持久化消息,因为convertAndSend方法没有提供消息持久化属性的选项
如果想要发送持久化消息,那么需要创建一个Message对象,并设置他的持久化属性然后再发送
向mq中发送100w条消息
mq请求松松每秒处理几万数据还是可以
需要关闭生产者确认
100w条消息 30s发送
使用临时消息进行测试
那么mq就会发送pageout 就是把内存中的数据读到磁盘中
使用持久化消息进行测试
就是直接向磁盘中写进去
但是需要保存性能,所以内存中也有
这样持久化可以进行避免pageout
内存中的数据会进行清空,但是清空的那一瞬间不会向磁盘中写
在内存清空的那一瞬间有一点点的性能下降
但是很快就恢复了
相较于直接停止了,效果要好一些
最后 先磁盘持久化了100w条,内存中还有10w条
使用持久化进行
100w慢了几秒
但是不会出现阻塞的情况
Lazy Queue
数据持久化解决了内存存储的安全问题
同时解决了mq的阻塞问题
但是整体性能上不是特别的好
所以使用第二种方案 lazy queue
传统的mq接受到消息后,天生保存在内存中,读取消息是直接从内存中读取
延迟队列在写,在写入磁盘的时候,有优化同时在,消息在内存中的数量有一定的限制,不是说无限多,默认是2048条
在3.12版本以后,没有纯内存模式了
使用控制台创建队列
接受到消息后直接存入磁盘而非内存,内存中只保留最近的数据,默认2048条
发送消息先写磁盘,读了再从磁盘中读出来,然后加载到内存中
支持数百万条数据的存储
使用控制台是这样创建的
那么使用java代码进行创建
lazy_queue 在写磁盘的时候有优化
mq的持久化的安全性就搞定了
消费者的可靠性
消费者确认机制
为了确认消费者接受到消息,rabbitmq提供了消费者确认机制,当消费者处理消息结束后,应该向rabbitmq发送一个回执,告知rabbitmq自己的消息处理状态,回执有三种状态可以选值
测试:
首先是在监听器中写队列
然后发送消息
none : 在消息投递到消费者的那一刻,会立刻返回ack,默认自动返回,这样消息就丢失了
放行那么就是一直投递一直投递,直到成功为止
如果服务宕机了,那么数据就会恢复,因为没有ack的动作
那么就会
防止无限投递
一般把消费者模式手动改为auto模式
该为auto模式以后,消息可靠性就增强了
只要消费者消费消息成功,消息才会被移除
如果消息处理失败,消息会一直被投递
当抛出消息转换异常的时候,会reject
然后消息就会被移除
消息失败处理策略
默认情况下是重新入队,消息不断的的重新入队到队列,然后在发送个给消费者,然后再次异常
再次入队requeue,导致mq的消息处理飙升,带来不必要的压力
我们可以使用spring的retry机制,在消费者出现异常时调用本地重试,而不是无限的requeue到mq队列
第一种不能取
第二种是可选方案,但是可能发频繁投递
第三张,把消息扔到制定交换机
测试
首先定义一个交换机还有队列,以及绑定关系
测试
消费者如何保证消息一定被消费呢
如何保证消息的可靠性
那么就可以从三个方面去说了
生产者的可靠性
消息可靠性-消息幂等性
经过以上的方案可以确保消息至少被消费一次
比如因为网络波动,收到了消息但是没有被消费,消息投递成功了,但是一直没有收到ack ,可能会认为自动超时
当消息重复消费时,我们要保证幂等性
抢购商品的时候,狂刷商品
这个时候就是表单重复提交
但是不管提交多少次,最后只执行一次
业务幂等性是我们必须保证的
那么对于天生幂等的业务不用管
对应非幂等的业务必须通过某种方保证幂等性
如果保证业务幂等性
生产者确认机制不要开启,因为非常消耗性能,非常慢,每秒中消息的处理数量只有几千个,qps太低了,去掉生产者确认机制,每秒的qps可以达到几万
删除购物车不需要进行幂等性操作,因为删除操作天生支持幂等性
所以mq通知让删除购物车,那么什么都不用做,直接删除就行了
重复通知没有什么意义
那么支付服务还有交易服务,订单状态怎么保证一致呢
首先,支付服务会在用户支付成功之后 ,利用mq消息通知交易服务,完成订单状态同步
其次,我们为了保证消息的可靠性,我们采用了生产者确认机制,消费者确认,确保消息投递和处理的可靠性,同时也开启了MQ的持久性,避免因服务宕机导致消息丢失
最后,我们还在交易服务更新订单状态时做了业务幂等性判断,避免了因为消息重复消费导致订单状态异常
我们能保证到 消息百分之99可以到达交易服务
延迟消息
延迟消息是指 生产者发送消息时指定一个时间,消费者不会立刻接收到消息,而是在指定时间之后才能收到消息
延迟消息有多钟解决方案
第一种是死信交换机
第二种是延迟消息插件
使用死信交换机实现延迟消息
需要先找一组正常的交换机队列,然后找一组死信交换机队列
然后利用过期消息最终绕了一圈,实现消息
延迟消息-延迟消息插件
设计死信交换机的作用是用于解决,专门接收死信消息的,然后人工去做处理
延迟队列使用方式
首先下载插件
然后赋值到rabbitmq中
6.4.2安装插件并启用
dockercp /opt/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
进入 Docker 容器
dockerexec -it rabbitmq /bin/bash
在plugins内启用插件
#先执行,解除防火墙限制,增加文件权限umask 0022rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重启 RabbitMQ
docker restart rabbitmq
所以的定时功能都有一定损耗
取消超时订单功能