10.RabbitMQ集群

embedded/2025/3/6 16:31:25/

十、集群与高可用

RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;

在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器) 被归为两类:一类是磁盘节点,一类是内存节点;

磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点的数据会全部丢失,而磁盘节点的数据不会丢失;

1、默认集群模式

默认集群模式也叫 普通集群模式、或者 内置集群模式;

10.1.1、默认集群简介

(1)、元数据

队列元数据:队列名称和属性(是否可持久化,是否自动删除)

交换器元数据:交换器名称、类型和属性

绑定元数据:交换器和队列的绑定列表

vhost元数据:vhost内的相关属性,如安全属性等;

当用户访问其中任何一个RabbitMQ节点时,查询到的queue/user/exchange/vhost等信息都是相同的;

(2)、数据同步特点

RabbitMQ默认集群模式只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步,队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时会将该消息传递给该队列的拥有者节点上;

集群不复制队列内容和状态到所有节点原因

1)节省存储空间;

2)提升性能;

如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写磁盘,占用磁盘空间。

在这里插入图片描述

(3)、数据访问过程

如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接到节点1进行消息的发送或接收,那么此时集群中的消息收发只与节点1相关,这个没有任何问题;

如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是指向queue的owner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上;

同样,如果消息消费者所连接的节点2或者节点3,那这两个节点也会作为路由节点起到转发作用,将会从节点1的队列1中获取消息进行消费;

10.1.2、安装

1、安装三台RabbitMQ机器

安装过程参考前面

注意:这里先安装一台MQ,然后克隆两台就可以了

2、设置IP地址

启动并设置三台机器的IP

修改配置文件方式

图形界面方式

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、修改主机名
sudo hostnamectl set-hostname rabbit11
4、修改/etc/hosts 文件

首先需要配置一下hosts文件,因为RabbitMQ集群节点名称是读取hosts文件得到的;

注意三台机机器都需要配置

vim /etc/hosts
192.168.1.11 rabbit11
192.168.1.12 rabbit12
192.168.1.13 rabbit13

在这里插入图片描述

5、重启网络

三台机器均重启网络,使节点名生效

sudo systemctl restart NetworkManager

低版本CentOS使用如下命令

systemctl restart network
6、重新连接xshell

重启后三台机器的xshell均退出,然后再重新连接,这样才能刷新主机的名字

7、关闭防火墙

三台机器均需关闭

systemctl stop firewalld  ##关闭防火墙
systemctl disable firewalld  ##开机不启动防火墙
systemctl status firewalld	##查看防火墙状态

在这里插入图片描述

8、修改.erlang.cookie文件

三台机器 .erlang.cookie文件保持一致

由于是clone出的三台机器,所以肯定是一样的

  • 如果使用解压缩方式安装的RabbitMQ,那么该文件会在 用户名目录下 , 也就是 {用户名}目录下,也就是 用户名目录下,也就是{用户名}/.erlang.cookie;

    在这里插入图片描述

  • 如果使用rpm安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq目录下;

注意 .erlang.cookie的权限为400,目前已经是400

9、启动MQ

分别启动三台机器上的rabbitmq

rabbitmq-server -detached
10、查看集群状态
11、构建集群
  • 加入节点1

    rabbitmq12机器上执行命令,让12的rabbitmq加入集群

    注意:一定要先停止节点,将节点重置之后才能加入集群,否则数据同步会出现混乱

    ## 先停止rabbitmq
    rabbitmqctl stop_app
    ## 重置rabbitmq
    rabbitmqctl reset
    ## 节点加入集群:rabbit@rabbit11是主节点的节点名,在集群状态中可以查看到节点名称
    rabbitmqctl join_cluster rabbit@rabbit11 --ram
    ## 启动节点
    rabbitmqctl start_app
    

    –ram 参数表示让rabbitmq12成为一个内存节点,如果不带参数默认为disk磁盘节点;

    在这里插入图片描述

  • 添加节点2

    在rabbit13节点上也执行同样的命令,使rabbit13节点也加入到集群中

    ## 先停止rabbitmq
    rabbitmqctl stop_app
    ## 重置rabbitmq
    rabbitmqctl reset
    ## 节点加入集群:rabbit@rabbit11是主节点的节点名,在集群状态中可以查看到节点名称
    rabbitmqctl join_cluster rabbit@rabbit11 --ram
    ## 启动节点
    rabbitmqctl start_app
    

    当然也可以让rabbit13作为一个磁盘节点

12、添加用户和权限

操作一个节点,添加用户和权限等

#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#设置角色
rabbitmqctl set_user_tags admin administrator

在这里插入图片描述

13、启动web控制台

启动web控制台插件

注意:三台机器都要启动,因为插件不属于元数据,因此需要分别启动

#进入插件目录
cd /usr/local/rabbitmq_server-4.0.7/plugins/
#启动web端插件
rabbitmq-plugins enable rabbitmq_management 

在这里插入图片描述

14、创建虚拟主机

使用web浏览器添加一个虚拟主机:longdidi

在这里插入图片描述

15、再次查看集群状态

当执行完操作以后在浏览器访问web管控台来看看效果;

随便在哪个节点打开web管控台都能看到集群环境各节点的信息;

也可以使用"rabbitmqctl cluster_status"查看集群状态;

在这里插入图片描述

以上就是RabbitMQ默认集群模式(普通集群模式)的搭建;

16、验证集群
  1. 创建队列

    在这里插入图片描述

  2. 创建交换机

    在这里插入图片描述

  3. 绑定交换机与队列

    • 进入交换机

      在这里插入图片描述

    • 绑定交换机与队列

      在这里插入图片描述

  4. 发布消息

    在这里插入图片描述

  5. 查看消息

    在任意节点查看消息

    在这里插入图片描述

  6. 停止主节点rabbit@rabbit11节点

    在这里插入图片描述

  7. 再在其它节点查看消息

    在这里插入图片描述

17、删除节点

在这里插入图片描述

10.1.3、节点原理

RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证;

2、镜像集群模式

10.2.1、镜像模式简介

镜像模式是基于默认集群模式加上一定的配置得来的;

在默认模式下的RabbitMQ集群,它会把所有节点的交换机、绑定、队列的元数据进行复制确保所有节点都有一份相同的元数据信息

但是队列数据分为两种

  • 一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息)
  • 一种是队列里面的消息

镜像模式则是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式;

10.2.2、镜像模式配置

实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的

3.X版本设置

镜像队列配置语法:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  • rabbitmqctl set_policy:固定写法

  • -p Vhost: 可选参数,设置虚拟主机的名字(针对指定vhost下的queue进行设置)

  • Name: 设置策略的名称(自己取个名字就可以)

  • Pattern: queue的匹配模式(正则表达式);^表示所有的队列都是镜像队列

  • Definition:镜像定义(json格式),包括三个部分ha-mode、ha-params、ha-sync-mode

    • ha-mode

      指明镜像队列的模式,有效值为 all/exactly/nodes

      all:表示在集群中所有的节点上进行镜像
      exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
      
    • ha-params

      ha-mode模式需要用到的参数

    • ha-sync-mode

      队列中消息的同步方式,有效值为automatic(自动向master同步数据)和manual(手动向master同步数据)

  • priority:可选参数,指的是policy策略的优先级;

在默认集群模式的基础上执行上面这个命令就可以把一个默认的集群模式变成镜像集群模式

比如想配置所有名字开头为policy_的队列进行镜像,镜像数量为2,那么命令如下(在任意节点执行如下命令):

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy -p longdidi my_policy "^policy_"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'  
(1)、同步所有数据

所有节点、所有虚拟主机、所有队列 都进行镜像

如果要在所有节点所有队列上进行镜像则在任意节点执行如下命令

语法:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy my-all "^"  '{"ha-mode":"all"}'   
(2)、同步指定数据

针对某个虚拟主机进行镜像

语法:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy -p longdidi my-all "^" '{"ha-mode": "exactly", "ha-params": 2, "ha-sync-mode": "automatic"}'

在默认集群模式的基础上执行上面这个命令就可以把一个默认的集群模式变成镜像集群模式

4.X版本设置

在这里插入图片描述

3、SpringBoot集成集群

重点连接配置

spring:rabbitmq:# 连接单台rabbitmq 服务器的地址# host: 192.168.1.101# 连接单台rabbitmq 服务器的端口# port: 5672username: adminpassword: 123456virtual-host: longdidipublisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式publisher-returns: true #开启return模式# 开启消费者手动确认listener:simple:acknowledge-mode: manualaddresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672

测试模块:rabbitmq-10-cluster-01

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: cluster-learn01rabbitmq:# 连接单台rabbitmq 服务器的地址# host: 192.168.1.101# 连接单台rabbitmq 服务器的端口# port: 5672username: adminpassword: 123456virtual-host: longdidipublisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式publisher-returns: true #开启return模式# 开启消费者手动确认listener:simple:acknowledge-mode: manualaddresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672data:redis:host: 192.168.1.4port: 6379#password: 123456database: 0 # 0号数据库

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机* 使用durable()方法设置持久化** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).durable(true).build();}/*** 正常队列* durable()方法就是持久化** @return*/@Beanpublic Queue normalQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

生产者

package com.longdidi.service;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import java.math.BigDecimal;
import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Resourceprivate RabbitTemplate rabbitTemplate;//这个对象可以进行序列化和反序列化(json格式)@Resourceprivate ObjectMapper objectMapper;/*** 构造方法执行后自动执行*/@PostConstructpublic void init() {//开启生产者的确定模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息没有到达交换机,原因为:{}", cause);//TODO 重发消息或者记录错误日志}});rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}", returnedMessage.getReplyText());//TODO 记录错误日志,给程序员发短信或者或者邮件});}public void sendMsg() throws JsonProcessingException {{//创建订单Orders orders1 = Orders.builder().orderId("order_12345").orderName("买的手机").orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();//转成jsonString strOrders1 = objectMapper.writeValueAsString(orders1);MessageProperties messageProperties = new MessageProperties();//设置单条消息的持久化,默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders1.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);}{Orders orders2 = Orders.builder().orderId("order_12345").orderName("买的手机").orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();String strOrders2 = objectMapper.writeValueAsString(orders2);MessageProperties messageProperties = new MessageProperties();//设置单条消息的持久化,默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders2.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);}log.info("消息发送完毕,发送时间为:{}", new Date());}
}

消费者

package com.longdidi.service;import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;import com.rabbitmq.client.Channel;@Component
@Slf4j
public class ReceiveMessageService {@Resourceprivate ObjectMapper objectMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg(Message message, Channel channel) throws IOException {//获取消息的唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();//使用objectmapper把字节数组反序列化成对象Orders orders = objectMapper.readValue(message.getBody(), Orders.class);try {log.info("接收到的消息为:{}", orders.toString());//如果不存在就在redis中存储Boolean setResult = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());if (setResult) {// TODO 向数据库插入订单等log.info("向数据库插入订单");}//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("消息处理出现问题");try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.idempotent.normal.1";// 队列public static final String QUEUE_NAME1 = "queue.idempotent.normal.1";// 路由keypublic static final String ROUTING_NAME1 = "key.idempotent.normal.1";
}

定义实体类

package com.longdidi.vo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {private String orderId;private String orderName;private BigDecimal orderMoney;private Date orderTime; //下单时间
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq10Cluster01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq10Cluster01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

e;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq10Cluster01Application implements ApplicationRunner {

public static void main(String[] args) {SpringApplication.run(Rabbitmq10Cluster01Application.class, args);
}@Resource
private SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/
@Override
public void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();
}

}


`测试`![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/a8dccf837b71414bb0083c7ce2b12822.png)

http://www.ppmy.cn/embedded/170522.html

相关文章

vscode远程连接ubuntu/Linux(虚拟机同样适用)

前言 在现代开发环境中&#xff0c;远程工作和跨平台开发变得越来越普遍。Visual Studio Code&#xff08;VSCode&#xff09;作为一个流行的代码编辑器&#xff0c;提供了强大的远程开发功能&#xff0c;使得开发者能够高效地连接和管理远程 Linux 服务器上的项目。通过 VSCod…

【MySQL】索引|作用|底层数据结构|常见问题

目录 1.概念 2.为何引入 3.使用 &#xff08;1&#xff09;查看索引 &#xff08;2&#xff09;创建索引&#xff08;危险操作&#xff09; &#xff08;3&#xff09;删除索引&#xff08;危险操作&#xff09; 4.使用场景 &#x1f525;5.底层数据结构&#xff08;核…

使用300M带宽是否可以流畅地玩原神

本文来自腾讯元宝 ps&#xff1a;搬家了&#xff0c;需要装个路由器打游戏。 根据搜索结果&#xff0c;300M的网络带宽完全可以满足《原神》的流畅游玩需求。以下是具体分析及优化建议&#xff1a; 一、带宽需求与300M网络的适配性 ​带宽要求较低​ 《原神》作为一款开放世界…

《Python百练成仙》31-40章(不定时更新)

第卅一章 函数结丹def开紫府 罗酆山的鬼门关吞吐着猩红的变量阴风&#xff0c;每个风眼都涌动着作用域混乱的灵力乱流。叶军手握薛香遗留的丹田玉简&#xff0c;玉简表面浮现出残缺的函数符文&#xff1a; def 凝聚金丹(灵气):道基 灵气 * 0.618print(金丹品质) # 作用域外变…

iOSUITableVIewCell 自动化点击埋点

iOS 中&#xff0c;经常要实现UITableVIewCell 点击埋点&#xff0c;这里通过自动化埋点的方式进行实现。 思路&#xff1a;通过运行时hook tableViewCell的 setSelected:animated:方法&#xff0c; 在交换的方法中实现埋点逻辑&#xff0c;并调用原来的实现 cell分类 propert…

【摸鱼指南】--- VSCode 使用 Thief-Book 隐形阅读模式配置教程 程序员必备插件

在代码的理性森林里&#xff0c;摸鱼是调试生活的快捷键 —— 我们用Coffee Break的灵感碎片&#xff0c;编译出更高效率的人生程序真正的效率大师&#xff0c;从不在单一线程里耗尽人生 —— 我们在主进程敲打代码&#xff0c;却在后台线程编译星辰大海 【摸鱼指南】--- VSCod…

【踩坑随笔】`npm list axios echarts`查看npm依赖包报错

npm list axios echarts查看npm依赖包出现以下报错&#xff0c;原因就是包的版本匹配问题&#xff0c;按照提示降axios版本或者自己升找合适的got版本&#xff0c;我这里是选择了降版本。本文记录仅做解决思路参考不一定适配大家的实际情况。 weed-detection-system1.0.0 E:\P…

​DeepSeek:如何通过自然语言生成HTML文件与原型图?

在当今快节奏的开发与设计环境中&#xff0c;快速生成HTML文件或原型图是每个开发者与设计师的迫切需求。虽然DeepSeek无法直接生成图片&#xff0c;但它却能够通过自然语言生成流程图、原型图以及交互式页面&#xff0c;甚至可以直接输出HTML代码。本文将详细介绍如何与DeepSe…