rabbitmq详解

embedded/2025/2/13 9:57:41/

有需要的直接看狂神的视频,讲得很好

简介

RabbitMQ 是一个开源的 消息队列中间件,实现了 AMQP(Advanced Message Queuing Protocol,先进消息队列协议)。它允许 应用程序、服务、系统之间异步地传递消息,并通过消息队列来实现消息的 存储、转发、处理。

它的核心功能是解耦,异步,削峰

常用的消息队列
activemq(基于JMS协议,基本不用了)
rabbitmq(基于AMQP协议,主要是Exchange组件)
kafka(kafka协议,主要是Topic和partition组件)
rocketmq(自定义的 RPC 协议)

rabbitmq_kafka_13">rabbitmq 和kafka的异同点

1.kafka消费者是拉取kafka中的数据,rabbitmq 是推送数据到消费者

rabbitMQ核心组成

组件

produce/chnanle/broker/exchange/key/bondings/queue/consumer
在这里插入图片描述
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力),这里我理解和kafka的topic类似,都是处理和隔离不同的数据
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

Exchange(核心)

因为rabbitmq是基于AMQP协议的,所有最重要的第一点就是Excahnge(交换机)
这里有不同种类的交换机来处理不同的业务场景

fanout模式

在这里插入图片描述
Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
这种模式的特点是不需要执行key,生产者会将生产的数据转发到该Exchange的所有queue中

direct模式

在这里插入图片描述
Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
什么意思呢,意思是之前fanout模式不需要指定RoutingKey,而direct模式是需要指定Routingkey,对应每一个队列
例如3个queue,配置Routingkey为key1,key2,key3,然后生成的时候讲routingkey带上key1就会只向queue1发送数据

topic模式

在这里插入图片描述
Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

意思是routingkey可以进行模糊匹配

消费者端消费模式-轮询模式

上面的的模式可以说是生成者端生成数据到queue的策略
这里模式是指多个消费者消费queue是怎么消费的

轮询模式是指多个消费者,消费一个queue,会公平的分发相同数量的数据到不同的消费者,让他们消费
简单来说就是一个消费者消费一条数据

公平分发模式

这里就是按照各自的处理速度来分发数据到不同的消费者
根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

实现(fanout举例)

配置yml文件

# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 47.104.141.27port: 5672

配置生产者

@Component
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate; //创建了rbbitmq对象// 1: 定义交换机private String exchangeName = "direct_order_exchange";// 2: 路由keyprivate String routeKey = "";public void makeOrder(Long userId, Long productId, int num) {// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}

创建配置类绑定交换机队列关系

@Configuration
public class DirectRabbitConfig {//队列 起名:TestDirectQueue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.fanout.queue", true);}@Beanpublic Queue smsQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("sms.fanout.queue", true);}@Beanpublic Queue weixinQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("weixin.fanout.queue", true);}//Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange fanoutOrderExchange() {//  return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("fanout_order_exchange", true, false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@Beanpublic Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");}
}

消费者消费

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。value = @Queue(value = "email.fanout.queue",autoDelete = "false"),// order.fanout 交换机的名字 必须和生产者保持一致exchange = @Exchange(value = "fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->" + message);}
}

过期时间TTL、超长处理

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息队列设置TTL。目前有两种方法可以设置。

过期的时候怎么处理,可以直接抛弃或者加入死信队列

rabbitmq还可以设置queue长度,超长的也是抛出到死信队列

死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
消息被拒绝、消息过期、超出队列长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

归纳:
简单来说就是咱们正常queue的数据由于过期,例如一个订单,长时间未支付一直存在queue中就会占用额外的内存空间,且无法得到处理,这时候就定义一个死信队列来存放过期的数据,然后这个死信队列也是正常的Exchange+queue,我们可以定义一个消费者来接受这些异常的消息,例如刚才订单超时就可以返回超时的处理

MQ解决分布式事务

分布式系统中,使用到了不同数据库,不同数据库之间事务不互通,因此需要使用分布式事务,
我们业务中应该尽量避免核心业务使用分布式事务,这样会导致响应速度慢,进行在非核心业务上使用

解决分布式事务的方法有很多例如
1.使用seata管理分布式事务
2.使用TCC方式
3.最大努力通知
一般建议使用最大努力通知

生产者可靠

当生产者发送消息到Exchange-》queue之后可能出现网络波动等因素,导致消息没有应答,获取消息根本没有发送到rabbitmq中去

方案 try-catch+冗余表
声明rabbitmq的回调函数,是否正确响应,如果没有则放入冗余表(存放为正确进入queue中的数据),然后用定时任务重新执行冗余表

消费者可靠

一般我们会基于try-catch+手动ack+死信队列来保证消息的可靠
例如当我们的消费者为正常消费到数据,报错了,这时候rabbitmq会干啥,会一直重新发送数据,导致内存崩溃
这时候就需要关闭重试机制,手动ack,然后让未能正确处理的数据进入死信队列,然后又后续的程序处理死信队列的数据

如果死信队列处理消息也失败,就像进行人工处理了

消费重复问题

幂等性解决重复消费问题

问题

消费者消费应答机制ACK改为false
使用手动应答机制否则会造成死循环


http://www.ppmy.cn/embedded/161845.html

相关文章

蓝桥杯试题:归并排序

一、问题描述 在一个神秘的岛屿上,有一支探险队发现了一批宝藏,这批宝藏是以整数数组的形式存在的。每个宝藏上都标有一个数字,代表了其珍贵程度。然而,由于某种神奇的力量,这批宝藏的顺序被打乱了,探险队…

HiveQL命令(二)- 数据表操作

文章目录 前言一、数据表操作1. 创建表1.1 语法及解释1.2 内部表1.2.1 创建内部表示例 1.3 外部表1.3.1 创建外部表示例 2. 查看表2.1 查看当前数据库中所有表2.2 查看表信息2.2.1 语法及解释2.2.2 查看表信息示例 3. 修改表3.1 重命名表3.1.1 语法3.1.2 示例 3.2 修改表属性3.…

[css] 黑白主题切换

link动态引入 类名切换 css滤镜 var 类名切换 v-bind css预处理器mixin类名切换 【前端知识分享】CSS主题切换方案

Unity使用iTextSharp导出PDF-03显示文本内容

文本内容自动排布 类似GUILayout Chunk 最简单的文本对象 显示文本设置使用的字体,不设置字体,默认使用英文字体设置下划线设置背景色文档中只使用Chunk,文本内容不会自动换行 换行:Chunk.NEWLINE或者"\n" doc.Add…

MATLAB电机四阶轨迹规划考虑jerk、Djerk

1、内容简介 略 126-可以交流、咨询、答疑 2、内容说明 略 在电机控制中,轨迹规划是一个重要的环节,它决定了电机如何从一个状态平滑地过渡到另一个状态。四阶轨迹规划考虑了位置、速度、加速度和加加速度(jerk),有…

Leetcode - 周赛435

目录 一、3442. 奇偶频次间的最大差值 I二、3443. K 次修改后的最大曼哈顿距离三、3444. 使数组包含目标值倍数的最少增量四、3445. 奇偶频次间的最大差值 II 一、3442. 奇偶频次间的最大差值 I 题目链接 本题使用数组统计字符串 s s s 中每个字符的出现次数,然后…

git 提示 fatal: The remote end hung up unexpectedly

我在 git push 的时候遇到报错 fatal: The remote end hung up unexpectedly 解决方法如下: 1. 调整缓存限制(大文件推送) git config --global http.postBuffer 524288000 # 设置缓存为500MB git config --global https.postBuffer 52428…

ClickHouse的前世今生

ClickHouse是一款由Yandex开发的高性能列式存储数据库管理系统,专为在线分析处理(OLAP)设计,适用于实时数据分析、大规模数据处理和复杂查询场景。以下是关于ClickHouse的安装、使用及应用场景的详细介绍: 一、ClickHouse的安装 ClickHouse支持多种操作系统,包括Linux、…