有需要的直接看狂神的视频,讲得很好
简介
RabbitMQ 是一个开源的 消息队列中间件,实现了 AMQP(Advanced Message Queuing Protocol,先进消息队列协议)。它允许 应用程序、服务、系统之间异步地传递消息,并通过消息队列来实现消息的 存储、转发、处理。
它的核心功能是解耦,异步,削峰
常用的消息队列:
activemq(基于JMS协议,基本不用了)
rabbitmq(基于AMQP协议,主要是Exchange组件)
kafka(kafka协议,主要是Topic和partition组件)
rocketmq(自定义的 RPC 协议)
rabbitmq_kafka_13">rabbitmq 和kafka的异同点
1.kafka消费者是拉取kafka中的数据,rabbitmq 是推送数据到消费者
rabbitMQ核心组成
组件
produce/chnanle/broker/exchange/key/bondings/queue/consumer
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力),这里我理解和kafka的topic类似,都是处理和隔离不同的数据
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
Exchange(核心)
因为rabbitmq是基于AMQP协议的,所有最重要的第一点就是Excahnge(交换机)
这里有不同种类的交换机来处理不同的业务场景
fanout模式
Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
这种模式的特点是不需要执行key,生产者会将生产的数据转发到该Exchange的所有queue中
direct模式
Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
什么意思呢,意思是之前fanout模式不需要指定RoutingKey,而direct模式是需要指定Routingkey,对应每一个队列
例如3个queue,配置Routingkey为key1,key2,key3,然后生成的时候讲routingkey带上key1就会只向queue1发送数据
topic模式
Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
意思是routingkey可以进行模糊匹配
消费者端消费模式-轮询模式
上面的的模式可以说是生成者端生成数据到queue的策略
这里模式是指多个消费者消费queue是怎么消费的
轮询模式是指多个消费者,消费一个queue,会公平的分发相同数量的数据到不同的消费者,让他们消费
简单来说就是一个消费者消费一条数据
公平分发模式
这里就是按照各自的处理速度来分发数据到不同的消费者
根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
实现(fanout举例)
配置yml文件
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 47.104.141.27port: 5672
配置生产者
@Component
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate; //创建了rbbitmq对象// 1: 定义交换机private String exchangeName = "direct_order_exchange";// 2: 路由keyprivate String routeKey = "";public void makeOrder(Long userId, Long productId, int num) {// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num > numstore ){ return "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}
创建配置类绑定交换机队列关系
@Configuration
public class DirectRabbitConfig {//队列 起名:TestDirectQueue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.fanout.queue", true);}@Beanpublic Queue smsQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("sms.fanout.queue", true);}@Beanpublic Queue weixinQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("weixin.fanout.queue", true);}//Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange fanoutOrderExchange() {// return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("fanout_order_exchange", true, false);}//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@Beanpublic Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");}
}
消费者消费
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。value = @Queue(value = "email.fanout.queue",autoDelete = "false"),// order.fanout 交换机的名字 必须和生产者保持一致exchange = @Exchange(value = "fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->" + message);}
}
过期时间TTL、超长处理
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
过期的时候怎么处理,可以直接抛弃或者加入死信队列
rabbitmq还可以设置queue长度,超长的也是抛出到死信队列
死信队列
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
消息被拒绝、消息过期、超出队列长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
归纳:
简单来说就是咱们正常queue的数据由于过期,例如一个订单,长时间未支付一直存在queue中就会占用额外的内存空间,且无法得到处理,这时候就定义一个死信队列来存放过期的数据,然后这个死信队列也是正常的Exchange+queue,我们可以定义一个消费者来接受这些异常的消息,例如刚才订单超时就可以返回超时的处理
MQ解决分布式事务
分布式系统中,使用到了不同数据库,不同数据库之间事务不互通,因此需要使用分布式事务,
我们业务中应该尽量避免核心业务使用分布式事务,这样会导致响应速度慢,进行在非核心业务上使用
解决分布式事务的方法有很多例如
1.使用seata管理分布式事务
2.使用TCC方式
3.最大努力通知
一般建议使用最大努力通知
生产者可靠
当生产者发送消息到Exchange-》queue之后可能出现网络波动等因素,导致消息没有应答,获取消息根本没有发送到rabbitmq中去
方案 try-catch+冗余表
声明rabbitmq的回调函数,是否正确响应,如果没有则放入冗余表(存放为正确进入queue中的数据),然后用定时任务重新执行冗余表
消费者可靠
一般我们会基于try-catch+手动ack+死信队列来保证消息的可靠
例如当我们的消费者为正常消费到数据,报错了,这时候rabbitmq会干啥,会一直重新发送数据,导致内存崩溃
这时候就需要关闭重试机制,手动ack,然后让未能正确处理的数据进入死信队列,然后又后续的程序处理死信队列的数据
如果死信队列处理消息也失败,就像进行人工处理了
消费重复问题
幂等性解决重复消费问题
问题
消费者消费应答机制ACK改为false
使用手动应答机制否则会造成死循环