RabbitMQ介绍与使用

ops/2025/1/12 18:14:45/

RabbitMQ官网

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中,用于实现应用程序之间的异步消息传递。RabbitMQ 具有高可靠性、易扩展、高可用和功能丰富的特点,支持多种编程语言客户端,如 Java、Python、Ruby、C# 等。

RabbitMQ 的核心概念

  • Producer(生产者):消息的生产者,负责将消息发送到 RabbitMQ 中的 Exchange。
  • Consumer(消费者):消息的消费者,负责从队列中获取并处理消息。
  • Connection:生产者/消费者和 Broker 之间的 TCP 连接。
  • Channel:在 Connection 内部建立的逻辑连接,用于减少操作系统建立 TCP 连接的开销。
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
  • Virtual Host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。
  • Exchange:消息到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的类型有 direct、topic 和 fanout。
  • Queue:消息最终被送到这里等待消费者取走。

RabbitMq的交换机类型

1. Direct Exchange

  • 描述:Direct 交换机是最简单的交换机类型。它根据消息的 routing key 将消息路由到一个特定的队列。如果队列的 binding key 与消息的 routing key 完全匹配,则消息会被路由到该队列。
  • 特点
    • 一对一匹配:消息的 routing key 必须与队列的 binding key 完全相同。
    • 简单直接:适用于一对一的消息传递场景。
  • 示例
    • 生产者发送消息时指定 routing key 为 info
    • 队列 A 绑定到交换机时,binding key 也为 info
    • 消息将被路由到队列 A。

2. Topic Exchange

  • 描述:Topic 交换机允许更复杂的路由模式。消息的 routing key 和队列的 binding key 可以包含通配符,从而实现更灵活的路由规则。
  • 特点
    • 模式匹配:支持通配符 *(匹配一个单词)和 #(匹配多个单词)。
    • 灵活多变:适用于多对多的消息传递场景,可以实现复杂的路由逻辑。
  • 示例
    • 生产者发送消息时指定 routing key 为 user.info
    • 队列 A 绑定到交换机时,binding key 为 user.*
    • 队列 B 绑定到交换机时,binding key 为 *.info
    • 消息将被路由到队列 A 和队列 B。

3. Fanout Exchange

  • 描述:Fanout 交换机是最简单的广播交换机。它不关心消息的 routing key,将消息广播到所有绑定到该交换机的队列。
  • 特点
    • 广播消息:消息会被发送到所有绑定的队列,无论队列的 binding key 是什么。
    • 简单高效:适用于需要将消息广播到多个消费者的情况。
  • 示例
    • 生产者发送消息时,不指定 routing key。
    • 队列 A、队列 B 和队列 C 都绑定到该交换机。
    • 消息将被路由到队列 A、队列 B 和队列 C。

RabbitMQ 的主要特点

  • 可靠性:使用消息确认机制,确保消息的可靠传递。生产者在发送消息后会收到一个确认,消费者在处理完消息后会发送一个确认。如果消息发送或处理失败,RabbitMQ 会重新发送消息,直到确认为止。
  • 灵活性:支持多种消息传递模式,包括点对点、发布/订阅和消息路由等。
  • 可扩展性:可以通过添加更多的节点来实现水平扩展,以处理更大的消息负载。它还支持集群和镜像队列,提供高可用性和负载均衡。
  • 多语言支持:提供了多种编程语言的客户端库,包括 Java、Python、Ruby、C# 等。

MQ选型对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
单机吞吐量万级万级十万级十万级以上
消息延迟微秒级毫秒级毫秒级毫秒级以内
消息可靠性较高,基本不丢较低,有丢大概率经过参数优化配置,可以做到 0 丢失经过参数优化配置,可以做到 0 丢失

RabbitMQ安装

环境:Centos7.9,基于docker安装

1.使用docker run命令创建容器并安装mq

docker run \-e RABBITMQ_DEFAULT_USER=mqadmin \-e RABBITMQ_DEFAULT_PASS=mqadmin \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mq-net\-d \rabbitmq:3.8-management

2.开放端口,或关闭防火墙(如果访问不了)

方法1:开放端口

#1.开放mq端口
firewall-cmd --zone=public --add-port=15672/tcp --add-port=5672/tcp --permanent
#2.重新加载防火墙配置
firewall-cmd --reload

方法2:临时关闭防火墙

systemctl stop firewalld

3.访问RabbitMQ控制台并登录

账号密码就是创建mq容器时指定的RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS

访问IP地址:主机ip:15672

 

RabbitMQ控制台使用

1.收发消息

1.1创建消息队列

 1.2创建一个交换机

 1.3讲交换机与队列绑定

 1.4发送消息

1.5查看消息

2.数据隔离

当我们只部署了一个mq的话,当多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。

实现步骤:

1.在我们的用户管理创建一个新用户

可以看到我们的用户创建成功,但是刚创建的用户是没有虚拟主机的

2.登录新创建的用户,配置虚拟主机

我们可以通过右上角选择自己的虚拟主机

 可以看到,在我们选择我们当前用户的虚拟机主机之后,就看不到我们之前用/创建的队列了

SpringAMQP使用

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

Spring AMQPSpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

1.创建一个Maven的mqDemo项目

2.创建两个子模块publisher(消息的发送者)、consumer(消息的消费者)

3.在父模块的pom.xml中导入以下配置:

    <groupId>cn.mq.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies>

4.在子模块pom.xml中分别导入以下配置:

publisher

    <parent><artifactId>mq-demo</artifactId><groupId>cn.mq.demo</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>publisher</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties>

consumer

    <parent><artifactId>mq-demo</artifactId><groupId>cn.mq.demo</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>consumer</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties>

5.在两个子模块的application.yaml中加入以下配置:

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.181.32 # 你的虚拟机IPport: 5672 # 端口virtual-host: /test # 虚拟主机username: testuser # 用户名password: testuser # 密码

6.创建交换机、队列,并监听消息

方式1基于配置类创建交换机、队列并绑定:

在consumer下创建一个configuration类

@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange() {//创建交换机return new FanoutExchange("test.fanout");}@Beanpublic Queue fanoutQueue1() {//创建队列return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2() {//创建队列return new Queue("fanout.queue2");}@Beanpublic Binding binDingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding binDingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

然后创建一个监听类,监听消息

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")//监听的队列public void listenerFanoutQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());}@RabbitListener(queues = "fanout.queue2")//监听的队列public void listenerFanoutQueue2(String message) throws InterruptedException {System.out.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());}
}

方式2基于注解创建交换机、队列并绑定:

@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.quque1"),exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)))public void listenerFanoutQueue1(String message) throws InterruptedException {System.err.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue1"),exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)))public void listenerFanoutQueue2(String message) throws InterruptedException {System.err.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());}
}

 启动ConsumerApplication类,查看rabbitmq控制台查看是否已经创建交换机和队列成功,并且正确绑定(上面的方式实现一种即可)

 6.发送消息

在publisher创建测试类发送消息

@Slf4j
@SpringBootTest
class SpringAmqpTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void testFanoutQueue() {String exchangeName = "test.fanout";//交换机名称String message = "hello,everyone!";//发送的消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
}

8.测试

1.运行ConsumerApplication启动类,保持运行状态

2.运行testFanoutQueue测试类的方法

3.查看控制台输出

正确接收到消息! 


http://www.ppmy.cn/ops/149520.html

相关文章

ThreadLocal 的使用场景

在现代电商平台中&#xff0c;ThreadLocal 常用于以下场景&#xff0c;特别是与线程隔离相关的业务中&#xff0c;以提高性能和简化上下文传递。 1. 用户上下文信息管理 场景&#xff1a;在用户发起的每次请求中&#xff0c;需要携带用户 ID、角色、权限等信息&#xff0c;而这…

移动支付安全:五大威胁及防护策略

随着移动支付的普及和便利&#xff0c;越来越多的用户选择通过支付应用进行日常交易。根据艾利德市场研究公司&#xff08;Allied Market Research&#xff09;的报告&#xff0c;全球移动支付市场预计到2027年将超过12万亿美元。然而&#xff0c;随着市场的增长&#xff0c;移…

蓝桥杯嵌入式速通(1)

1.工程准备 创建一文件夹存放自己的代码&#xff0c;并在mdk中include上文件夹地址 把所有自身代码的头文件都放在headfile头文件中&#xff0c;之后只需要在新的文件中引用headfile即可 headfile中先提前可加入 #include "stdio.h" #include "string.h"…

STM32使用ITM调试_通过仿真器实现串口打印

IDE&#xff1a;CLion MCU: STM32F407VET6 工具&#xff1a;OpenOCD Telnet 一、简介 调试单片机时&#xff0c;如果要打印数据往往需要另接一根线通过USB转TTL接到电脑上。但这样做往往并不方便&#xff0c;尤其是身边没有USB转TTL工具时。这时可以使用单片机自带的ITM单元…

axios的替代方案onion-middleware

onion-middleware的由来 嗯。。。闲来无事瞎搞的&#xff01;&#xff01;&#xff01;&#xff01;主要用来实现请求/相应拦截&#xff0c;当然队列性的数据操作都是可以的 直接上使用教程 安装 npm install onion-middleware使用 import { OnionMiddleware } from onion…

《拉依达的嵌入式\驱动面试宝典》—操作系统篇(七)

《拉依达的嵌入式\驱动面试宝典》—操作系统篇(七) 你好,我是拉依达。 感谢所有阅读关注我的同学支持,目前博客累计阅读 27w,关注1.5w人。其中博客《最全Linux驱动开发全流程详细解析(持续更新)-CSDN博客》已经是 Linux驱动 相关内容搜索的推荐首位,感谢大家支持。 《拉…

多租户系统的实现方案

多租户架构&#xff08;Multi-Tenant Architecture&#xff09;是一种在单个系统中支持多个独立租户&#xff08;客户或公司&#xff09;的设计模式。每个租户可以拥有自己独立的数据、业务逻辑、用户界面等。多租户架构通常被应用于SaaS&#xff08;Software as a Service&…

1.微服务

商城项目源码地址 https://gitee.com/huyi612/hmall 使用jmeter测试高并发 传统单体项目的弊端 案例&#xff1a;如果某一个请求耗时太长会把tomcat的资源给占完了&#xff0c;导致其他请求进来耗时更长&#xff0c;甚至无法进入。 RestController RequestMapping("h…