【RabbitMQ】快速上手

embedded/2024/12/22 15:36:33/

目 录

  • 一. RabbitMQ 安装
  • 二. RabbitMQ 核心概念
    • 2.1 Producer 和 Consumer
    • 2.2 Connection 和 Channel
    • 2.3 Virtual host
    • 2.4 Queue
    • 2.5 Exchange
    • 2.6 RabbitMQ 工作流程
  • 三. AMQP
  • 四. web界面操作
    • 4.1 用户相关操作
    • 4.2 虚拟主机相关操作
  • 五. RabbitMQ 快速入门
    • 5.1 引入依赖
    • 5.2 编写生产者代码
      • 5.2.1 创建连接
      • 5.2.2 创建 Channel
      • 5.2.3 声明⼀个队列 Queue
      • 5.2.4 发送消息
      • 5.2.5 释放资源
      • 5.2.6 运行代码, 观察结果
    • 5.3 编写消费者代码
      • 5.3.1 消费当前队列
      • 5.3.2 释放资源
      • 5.3.3 运行代码, 观察结果
    • 5.4 附源码

一. RabbitMQ 安装

我们对于 RabbitMQ 已经有了简单的了解, 接下来进行 RabbitMQ 的安装

RabbitMQ 是⼀套开源的消息队列服务软件, 基于 Erlang 语言编写, 所以安装RabbitMQ 之前, 需要先安装部署 Erlang 环境, 再安装 RabbitMQ 环境.

RabbitMQ 大多部署在 Linux 操作系统

此处不做详细安装教程~

二. RabbitMQ 核心概念

在安装完 RabbitMQ 之后, 接下来学习如何去使用 RabbitMQ

来到 RabbitMQ 管理界面

在这里插入图片描述

界面上的导航栏共分6部分, 这6部分分别是什么意思呢, 我们先看看 RabbitMQ 的工作流程

在这里插入图片描述

RabbitMQ 是⼀个消息中间件, 也是⼀个生产者消费者模型. 它负责接收, 存储并转发消息

消息传递的过程类似邮局.
 
当你要发送⼀个邮件时,你把你的邮件放到邮局,邮局接收邮件, 并通过邮递员送到收件人的手上
 在这里插入图片描述
按照这个逻辑, Producer 就类似邮件发件人. Consumer 就是收件人, RabbitMQ 就类似于邮局

2.1 Producer 和 Consumer

  • Producer: 生产者, 是 RabbitMQ Server 的客户端, 向 RabbitMQ 发送消息
  • Consumer: 消费者, 也是 RabbitMQ Server 的客户端, 从 RabbitMQ 接收消息
  • Broker:其实就是 RabbitMQ Server, 主要是接收和收发消息
  1. 生产者 (Producer) 创建消息, 然后发布到 RabbitMQ 中. 在实际应用中, 消息通常是⼀个带有⼀定业务逻辑结构的数据, 比如 JSON 字符串. 消息可以带有⼀定的标签, RabbitMQ 会根据标签进行路由, 把消息发送给感兴趣的消费者(Consumer).
  2. 消费者连接到 RabbitMQ 服务器, 就可以消费消息了, 消费的过程中, 标签会被丢掉. 消费者只会收到消息, 并不知道消息的生产者是谁, 当然消费者也不需要知道.
  3. 对于 RabbitMQ 来说,⼀个 RabbitMQ Broker 可以简单地看作⼀个 RabbitMQ 服务节点, 或者 RabbitMQ 服务实例. 大多数情况下也可以将⼀个 RabbitMQ Broker 看作⼀台 RabbitMQ 服务器

在这里插入图片描述

2.2 Connection 和 Channel

Connection: 连接. 是客户端和 RabbitMQ 服务器之间的⼀个 TCP 连接. 这个连接是建立消息传递的基础, 它负责传输客户端和服务器之间的所有数据和控制信息.

Channel: 通道, 信道. Channel 是在 Connection 之上的⼀个抽象层. 在 RabbitMQ 中, ⼀个 TCP 连接可以有多个 Channel, 每个 Channel 都是独立的虚拟连接. 消息的发送和接收都是基于 Channel 的

通道的主要作用是将消息的读写操作复用到同⼀个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能

在这里插入图片描述

2.3 Virtual host

Virtual host: 虚拟主机. 这是⼀个虚拟概念. 它为消息队列提供了⼀种逻辑上的隔离机制. 对于 RabbitMQ 而言, ⼀个 BrokerServer 上可以存在多个 Virtual Host. 当多个不同的用户使用同⼀个RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

类似 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 MySQL 服务器可以有多个 database

2.4 Queue

Queue: 队列, 是 RabbitMQ 的内部对象, 用于存储消息

在这里插入图片描述

多个消费者, 可以订阅同⼀个队列

在这里插入图片描述

2.5 Exchange

Exchange: 交换机. message 到达 broker 的第⼀站, 它负责接收生产者发送的消息, 并根据特定的规则把这些消息路由到⼀个或多个 Queue 列中.

Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息

类似于发快递之后, 物流公司怎么处理呢, 根据咱们的地址来分派这个快递到不同的站点, 然后再送到收件人手里. 这个分配的工作,就是交换机来做的
 
在这里插入图片描述

2.6 RabbitMQ 工作流程

理解了上面的概念之后, 再来回顾⼀下这个图, 来看 RabbitMQ 的工作流程

在这里插入图片描述

  1. Producer 生产了⼀条消息
  2. Producer 连接到 RabbitMQBroker, 建立⼀个连接 (Connection),开启⼀个信道(Channel)
  3. Producer 声明⼀个交换机 (Exchange), 路由消息
  4. Producer 声明⼀个队列 (Queue), 存放信息
  5. Producer 发送消息至 RabbitMQ Broker
  6. RabbitMQ Broker 接收消息, 并存入相应的队列 (Queue) 中, 如果未找到相应的队列, 则根据生产者的配置, 选择丢弃或者退回给生产者

如果我们把 RabbitMQ 比作⼀个物流公司,那么它的⼀些核心概念可以这样理解:
 

  1. Broker就类似整个物流公司的总部, 它负责协调和管理所有的物流站点, 确保包裹安全、⾼效地送达.
  2. Virtual Host 可以看作是物流公司为不同的客户或业务部门划分的独立运营中⼼. 每个运营中心都有自己的仓库(Queue), 分拣规则(Exchange)和运输路线(Connection和Channel), 这样可以确保不同客户的包裹处理不会相互干扰, 同时提供定制化的服务
  3. Exchange 就像是站点里的分拣中心. 当包裹到达时, 分拣中心会根据包裹上的标签来决定这个包裹应该送往哪个目的地(队列). 快递站点可能有不同类型的分拣中心, 有的按照具体地址分拣, 有的将包裹复制给多个收件人等.
  4. Queue 就是快递站点里的⼀个个仓库, 用来临时存放等待派送的包裹. 每个仓库都有⼀个或多个快递员(消费者)负责从仓库中取出包裹并派送给最终的收件人.
  5. Connection 就像是快递员与快递站点之间的通信线路. 快递员需要通过这个线路来接收派送任务(消息).
  6. Channel 就像是快递员在执行任务时使用的多个并行的通信线路. 这样,快递员可以同时处理多个包裹, 比如⼀边派送包裹, ⼀边接收新的包裹

三. AMQP

AMQP(Advanced Message Queuing Protocol)是⼀种高级消息队列协议, AMQP 定义了⼀套确定的消息交换功能, 包括交换器(Exchange), 队列(Queue) 等. 这些组件共同工作, 使得生产者能够将消息发送到交换器. 然后由队列接收并等待消费者接收. AMQP 还定义了⼀个网络协议, 允许客户端应用通过该协议与消息代理和 AMQP 模型进行交互通信

RabbitMQ 是遵从 AMQP 协议的,换句话说,RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2, MQTT2 等协议). AMQP 的模型结构和 RabbitMQ 的模型结构是⼀样的

在这里插入图片描述

四. web界面操作

RabbitMQ 管理界面上的 Connections,Channels, Exchange, Queues 就是和上面流程图的概念是⼀样的, Overview 就是视图的意思, Admin 是用户管理.

我们在操作 RabbitMQ 前, 需要先创建 Virtual host

接下来看具体操作:

4.1 用户相关操作

添加用户

a) 点击 Admin -> Add user

在这里插入图片描述

b) 设置账号密码及权限

在这里插入图片描述

①: 设置账号
②: 设置密码
③: 确认密码
④: 设置权限
添加完成后, 点击[Add user]

c) 观察用户是否添加成功

在这里插入图片描述

用户相关操作

a) 点击要删除的用户, 查看用户详情

在这里插入图片描述

b) 在用户详情页面,进行更新或删除操作

  • 设置对虚拟机的操作权限

在这里插入图片描述

  • 更新/删除用户

在这里插入图片描述

退出当前用户

在这里插入图片描述

4.2 虚拟主机相关操作

创建虚拟主机

在 Admin 标签页下, 点击右侧 Virtual Hosts -> Add a new virtual host

设置虚拟主机名称

在这里插入图片描述

观察设置结果

在这里插入图片描述

此操作会为当前登录用户设置虚拟主机

五. RabbitMQ 快速入门

步骤:

  1. 引入依赖
  2. 编写生产者代码
  3. 编写消费者代码

5.1 引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

5.2 编写生产者代码

5.2.1 创建连接

RabbitMQ 默认的用于客户端连接的 TCP 端口号是 5672, 需要提前进行开放

此处修改了端口号为15673

// 1. 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("110.41.51.65");//ip 默认值localhost
factory.setPort(15673); //默认值5672 
factory.setVirtualHost("bite");//虚拟机名称, 默认 /factory.setUsername("study");//⽤⼾名,默认guest
factory.setPassword("study");//密码, 默认guest
//3. 创建连接Connection
Connection connection = factory.newConnection();

5.2.2 创建 Channel

生产者和消费者创建的 channel 并不是同⼀个

//4. 创建channel通道
Channel channel = connection.createChannel();

5.2.3 声明⼀个队列 Queue

/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当 Connection 关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数
*/
//如果没有⼀个hello_world 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("hello",true,false,false,null);

5.2.4 发送消息

当⼀个新的 RabbitMQ 节点启动时, 它会预声明(declare)几个内置的交换机,内置交换机名称是空字符串(“”). 生产者发送的消息会根据队列名称直接路由到对应的队列.

在这里插入图片描述

例如: 如果有⼀个名为 “hello” 的队列, 生产者可以直接发送消息到 “hello” 队列, 而消费者可以从"hello" 队列中接收消息, 而不需要关心交换机的存在. 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是⼀对⼀的

在这里插入图片描述

//6. 通过channel发送消息到队列中
/*basicPublish(String exchange, String routingKey,AMQP.BasicProperties props, byte[] body)1.exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""2.routingKey: 路由名称, routingKey = 队列名称3.props: 配置信息4.body: 发送消息的数据
*/
String msg = "Hello World";
//使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由到对应的队列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息发送成功");

5.2.5 释放资源

//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭.
channel.close();
connection.close();

5.2.6 运行代码, 观察结果

运行之前

在这里插入图片描述

运行之后, 队列中就已经有了hello这个队列的信息

右上角需要选择虚拟机

在这里插入图片描述

如果在代码中注掉资源释放的代码, 在 Connections 和 Channels 也可以看到相关信息

在这里插入图片描述

Queue 也可以配置显示 Consumer 相关信息

在这里插入图片描述

5.3 编写消费者代码

消费者代码和生产者前3步都是⼀样的, 第4步改为消费当前队列

  1. 创建连接
  2. 创建 Channel
  3. 声明⼀个队列 Queue
  4. 消费消息
  5. 释放资源

5.3.1 消费当前队列

basicConsume

/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue: 队列名称2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认3. callback: 回调对象
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throwsIOException;

Consumer

Consumer 用于定义消息消费者的行为. 当我们需要从 RabbitMQ 接收消息时, 需要提供⼀个实现了 Consumer 接口的对象.

DefaultConsumer 是 RabbitMQ 提供的⼀个默认消费者, 实现了 Consumer 接口

核心方法:

  1. handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时, 会自动调用该方法.

在这个方法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.

参数说明如下:

  • consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
  • envelope : 包含消息的封包信息,如队列名称, 交换机等.
  • properties : ⼀些配置信息
  • body : 消息的具体内容
//6. 接收消息, 并消费
/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue: 队列名称2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认3. callback: 回调对象
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法1. consumerTag: 标识2. envelope: 获取⼀些信息, 交换机, 路由key3. properties:配置信息4. body:数据
*/
@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume("hello", true, consumer);

5.3.2 释放资源

//等待回调函数执⾏完毕之后, 关闭资源
TimeUnit.SECONDS.sleep(5);
//7. 释放资源 消费者相当于是⼀个监听程序, 不需要关闭资源
channel.close();
connection.close();

实际上消费者相当于是⼀个监听程序, 不需要关闭资源

5.3.3 运行代码, 观察结果

运行程序, 我们刚才发送的消息, 就收到了

接收到消息: Hello World

如果我们不释放资源, 可以看到响应的 Connection, channel

在这里插入图片描述

在这里插入图片描述

5.4 附源码

生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitProducer {public static void main(String[] args) throws Exception {// 1. 创建连接⼯⼚ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("110.41.51.65");//ip 默认值localhostfactory.setPort(15673); //默认值5672factory.setVirtualHost("bite");//虚拟机名称, 默认 factory.setUsername("study");//⽤⼾名,默认guestfactory.setPassword("study");//密码, 默认guest//3. 创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化, 当mq重启之后, 消息还在3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数*///如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建channel.queueDeclare("hello", true, false, false, null);//6. 通过channel发送消息到队列中/*basicPublish(String exchange, String routingKey,AMQP.BasicProperties props, byte[] body)1. exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""2.routingKey: 路由名称, routingKey = 队列名称3.props: 配置信息4.body: 发送消息的数据*/String msg = "Hello World";//使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由到对应的队列上去channel.basicPublish("", "hello", null, msg.getBytes());//7.释放资源System.out.println(msg + "消息发送成功");channel.close();connection.close();}
}

消费者代码

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class RabbitmqConsumer {public static void main(String[] args) throws Exception {// 1. 创建连接⼯⼚ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("110.41.51.65");//ip 默认值localhostfactory.setPort(15673); //默认值5672factory.setVirtualHost("bite");//虚拟机名称, 默认 /factory.setUsername("study");//⽤⼾名,默认guestfactory.setPassword("study");//密码, 默认guest//3. 创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化, 当mq重启之后, 消息还在3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数*///如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建channel.queueDeclare("hello", true, false, false, null);//6. 接收消息, 并消费/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue: 队列名称2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认3. callback: 回调对象*/DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法1. consumerTag: 标识2. envelope: 获取⼀些信息, 交换机, 路由key3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume("hello", true, consumer);//等待回调函数执⾏完毕之后, 关闭资源TimeUnit.SECONDS.sleep(5);//7. 释放资源 消费者相当于是⼀个监听程序, 不需要关闭资源//顺序不可改变// channel.close();// connection.close();}
}

http://www.ppmy.cn/embedded/102116.html

相关文章

DDOS攻击学习-渗透测试-域名信息收集

文章目录 wordpress漏洞利用域名信息收集域名介绍域名分类 whoiswhois反查子域名收集子域名发现网络空间安全搜索引擎SSL证书查询js文件发现子域名 wordpress漏洞利用 这个一般都需要安装wordpress服务使用wpscan扫描&#xff0c;但现在一般很少人知道或者使用wordpress所以这个…

大模型RAG应用开发之PDF解析工具对比

一 汇总 类型名称地址OCR提取表格内容保留文本顺序提取图片保存成md格式其他特性传统PDF解析库pymupdfhttps://github.com/pymupdf/PyMuPDF❌✔️✔️✔️❌● 表格提取● 自定义字体传统PDF解析库pdfminerhttps://github.com/pdfminer/pdfminer.six❌❌✔️❌❌● 版面分析传…

EtherCAT 转 ModbusTCP 网关

设备简介 本产品是 EtherCAT 和 Modbus TCP 网关&#xff0c;使用数据映射方式工作。 本产品在 EtherCAT 侧作为 EtherCAT 从站&#xff0c;接 TwinCAT 、 CodeSYS 、 PLC等&#xff1b;在 ModbusTCP 侧做为 ModbusTCP 主站&#xff08; Client &#xff09;或从站…

HTML静态网页成品作业(HTML+CSS)——西点蛋糕介绍(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有5个页面。 二、作品演示 三、代…

【M-LAG 跨设备链路聚合技术】

M-LAG&#xff08;Multichassis Link Aggregation Group&#xff09;即跨设备链路聚合组&#xff0c;是一种实现跨设备链路聚合的机制。 M-LAG将一台设备与另外两台设备进行跨设备链路聚合&#xff0c;从而把链路可靠性从单板级提高到了设备级&#xff0c;组成双活系统。其具备…

MYSQL集群技术

---------------第一部分---------------------- 一.mysql源码部署 环境&#xff1a;rhel7.9 1.1.下载安装包 官网&#xff1a;http://www.mysql.com 1.2.在linux下部署mysql 1.创建登录用户和数据目录&#xff0c;并给数据目录赋权&#xff0c;因为配置文件读取需要权限&…

代码随想录算法训练营第13天 |二叉树的学习

目录 二叉树 理论基础 二叉树的分类 1. 满二叉树 (Full Binary Tree) 2. 完全二叉树 (Complete Binary Tree) 3. 平衡二叉树 (Balanced Binary Tree) 5. 二叉搜索树 (Binary Search Tree, BST) 二叉树的存储 1. 链式存储 (Linked Representation) 2. 顺序存储 (Sequent…

钉钉的未读真的是对方未读吗?

背景 远程沟通工具今年换成了钉钉&#xff0c;说实话&#xff0c;钉钉比飞书好用的地方是图片前后翻&#xff0c;跟微信一样&#xff0c;能对聊天记录中所有的图片进行前后翻找。 还有一个就是消息撤回功能&#xff0c;好几天前的消息&#xff0c;也能撤回。 过程 略…… …