10.RabbitMQ集群

news/2025/3/10 2:42:56/

十、集群与高可用

RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;

在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器) 被归为两类:一类是磁盘节点,一类是内存节点;

磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点的数据会全部丢失,而磁盘节点的数据不会丢失;

1、默认集群模式

默认集群模式也叫 普通集群模式、或者 内置集群模式;

10.1.1、默认集群简介

(1)、元数据

队列元数据:队列名称和属性(是否可持久化,是否自动删除)

交换器元数据:交换器名称、类型和属性

绑定元数据:交换器和队列的绑定列表

vhost元数据:vhost内的相关属性,如安全属性等;

当用户访问其中任何一个RabbitMQ节点时,查询到的queue/user/exchange/vhost等信息都是相同的;

(2)、数据同步特点

RabbitMQ默认集群模式只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步,队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时会将该消息传递给该队列的拥有者节点上;

集群不复制队列内容和状态到所有节点原因

1)节省存储空间;

2)提升性能;

如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写磁盘,占用磁盘空间。

在这里插入图片描述

(3)、数据访问过程

如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接到节点1进行消息的发送或接收,那么此时集群中的消息收发只与节点1相关,这个没有任何问题;

如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是指向queue的owner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上;

同样,如果消息消费者所连接的节点2或者节点3,那这两个节点也会作为路由节点起到转发作用,将会从节点1的队列1中获取消息进行消费;

10.1.2、安装

1、安装三台RabbitMQ机器

安装过程参考前面

注意:这里先安装一台MQ,然后克隆两台就可以了

2、设置IP地址

启动并设置三台机器的IP

修改配置文件方式

图形界面方式

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、修改主机名
sudo hostnamectl set-hostname rabbit11
4、修改/etc/hosts 文件

首先需要配置一下hosts文件,因为RabbitMQ集群节点名称是读取hosts文件得到的;

注意三台机机器都需要配置

vim /etc/hosts
192.168.1.11 rabbit11
192.168.1.12 rabbit12
192.168.1.13 rabbit13

在这里插入图片描述

5、重启网络

三台机器均重启网络,使节点名生效

sudo systemctl restart NetworkManager

低版本CentOS使用如下命令

systemctl restart network
6、重新连接xshell

重启后三台机器的xshell均退出,然后再重新连接,这样才能刷新主机的名字

7、关闭防火墙

三台机器均需关闭

systemctl stop firewalld  ##关闭防火墙
systemctl disable firewalld  ##开机不启动防火墙
systemctl status firewalld	##查看防火墙状态

在这里插入图片描述

8、修改.erlang.cookie文件

三台机器 .erlang.cookie文件保持一致

由于是clone出的三台机器,所以肯定是一样的

  • 如果使用解压缩方式安装的RabbitMQ,那么该文件会在 用户名目录下 , 也就是 {用户名}目录下,也就是 用户名目录下,也就是{用户名}/.erlang.cookie;

    在这里插入图片描述

  • 如果使用rpm安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq目录下;

注意 .erlang.cookie的权限为400,目前已经是400

9、启动MQ

分别启动三台机器上的rabbitmq

rabbitmq-server -detached
10、查看集群状态
11、构建集群
  • 加入节点1

    rabbitmq12机器上执行命令,让12的rabbitmq加入集群

    注意:一定要先停止节点,将节点重置之后才能加入集群,否则数据同步会出现混乱

    ## 先停止rabbitmq
    rabbitmqctl stop_app
    ## 重置rabbitmq
    rabbitmqctl reset
    ## 节点加入集群:rabbit@rabbit11是主节点的节点名,在集群状态中可以查看到节点名称
    rabbitmqctl join_cluster rabbit@rabbit11 --ram
    ## 启动节点
    rabbitmqctl start_app
    

    –ram 参数表示让rabbitmq12成为一个内存节点,如果不带参数默认为disk磁盘节点;

    在这里插入图片描述

  • 添加节点2

    在rabbit13节点上也执行同样的命令,使rabbit13节点也加入到集群中

    ## 先停止rabbitmq
    rabbitmqctl stop_app
    ## 重置rabbitmq
    rabbitmqctl reset
    ## 节点加入集群:rabbit@rabbit11是主节点的节点名,在集群状态中可以查看到节点名称
    rabbitmqctl join_cluster rabbit@rabbit11 --ram
    ## 启动节点
    rabbitmqctl start_app
    

    当然也可以让rabbit13作为一个磁盘节点

12、添加用户和权限

操作一个节点,添加用户和权限等

#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#设置角色
rabbitmqctl set_user_tags admin administrator

在这里插入图片描述

13、启动web控制台

启动web控制台插件

注意:三台机器都要启动,因为插件不属于元数据,因此需要分别启动

#进入插件目录
cd /usr/local/rabbitmq_server-4.0.7/plugins/
#启动web端插件
rabbitmq-plugins enable rabbitmq_management 

在这里插入图片描述

14、创建虚拟主机

使用web浏览器添加一个虚拟主机:longdidi

在这里插入图片描述

15、再次查看集群状态

当执行完操作以后在浏览器访问web管控台来看看效果;

随便在哪个节点打开web管控台都能看到集群环境各节点的信息;

也可以使用"rabbitmqctl cluster_status"查看集群状态;

在这里插入图片描述

以上就是RabbitMQ默认集群模式(普通集群模式)的搭建;

16、验证集群
  1. 创建队列

在这里插入图片描述

  1. 创建交换机

    在这里插入图片描述

  2. 绑定交换机与队列

    • 进入交换机

      在这里插入图片描述

    • 绑定交换机与队列

      在这里插入图片描述

  3. 发布消息

    在这里插入图片描述

  4. 查看消息

    在任意节点查看消息

    在这里插入图片描述

  5. 停止主节点rabbit@rabbit11节点

    在这里插入图片描述

  6. 再在其它节点查看消息

    在这里插入图片描述

17、删除节点

在这里插入图片描述

10.1.3、节点原理

RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证;

2、镜像集群模式

10.2.1、镜像模式简介

镜像模式是基于默认集群模式加上一定的配置得来的;

在默认模式下的RabbitMQ集群,它会把所有节点的交换机、绑定、队列的元数据进行复制确保所有节点都有一份相同的元数据信息

但是队列数据分为两种

  • 一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息)
  • 一种是队列里面的消息

镜像模式则是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式;

10.2.2、镜像模式配置

实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的

3.X版本设置

镜像队列配置语法:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  • rabbitmqctl set_policy:固定写法

  • -p Vhost: 可选参数,设置虚拟主机的名字(针对指定vhost下的queue进行设置)

  • Name: 设置策略的名称(自己取个名字就可以)

  • Pattern: queue的匹配模式(正则表达式);^表示所有的队列都是镜像队列

  • Definition:镜像定义(json格式),包括三个部分ha-mode、ha-params、ha-sync-mode

    • ha-mode

      指明镜像队列的模式,有效值为 all/exactly/nodes

      all:表示在集群中所有的节点上进行镜像
      exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
      
    • ha-params

      ha-mode模式需要用到的参数

    • ha-sync-mode

      队列中消息的同步方式,有效值为automatic(自动向master同步数据)和manual(手动向master同步数据)

  • priority:可选参数,指的是policy策略的优先级;

在默认集群模式的基础上执行上面这个命令就可以把一个默认的集群模式变成镜像集群模式

比如想配置所有名字开头为policy_的队列进行镜像,镜像数量为2,那么命令如下(在任意节点执行如下命令):

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy -p longdidi my_policy "^policy_"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'  
(1)、同步所有数据

所有节点、所有虚拟主机、所有队列 都进行镜像

如果要在所有节点所有队列上进行镜像则在任意节点执行如下命令

语法:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy my-all "^"  '{"ha-mode":"all"}'   
(2)、同步指定数据

针对某个虚拟主机进行镜像

语法:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

rabbitmqctl set_policy -p longdidi my-all "^" '{"ha-mode": "exactly", "ha-params": 2, "ha-sync-mode": "automatic"}'

在默认集群模式的基础上执行上面这个命令就可以把一个默认的集群模式变成镜像集群模式

4.X版本设置

在这里插入图片描述

3、SpringBoot集成集群

重点连接配置

spring:rabbitmq:# 连接单台rabbitmq 服务器的地址# host: 192.168.1.101# 连接单台rabbitmq 服务器的端口# port: 5672username: adminpassword: 123456virtual-host: longdidipublisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式publisher-returns: true #开启return模式# 开启消费者手动确认listener:simple:acknowledge-mode: manualaddresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672

测试模块:rabbitmq-10-cluster-01

引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>

配置MQ

server:port: 8080spring:application:name: cluster-learn01rabbitmq:# 连接单台rabbitmq 服务器的地址# host: 192.168.1.101# 连接单台rabbitmq 服务器的端口# port: 5672username: adminpassword: 123456virtual-host: longdidipublisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式publisher-returns: true #开启return模式# 开启消费者手动确认listener:simple:acknowledge-mode: manualaddresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672data:redis:host: 192.168.1.4port: 6379#password: 123456database: 0 # 0号数据库

定义MQ队列

package com.longdidi.config;import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {/*** 正常交换机* 使用durable()方法设置持久化** @return*/@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).durable(true).build();}/*** 正常队列* durable()方法就是持久化** @return*/@Beanpublic Queue normalQueue() {return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();}/*** 正常交换机和正常队列绑定** @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);}}

生产者

package com.longdidi.service;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import java.math.BigDecimal;
import java.util.Date;@Service
@Slf4j
public class SendMessageService {@Resourceprivate RabbitTemplate rabbitTemplate;//这个对象可以进行序列化和反序列化(json格式)@Resourceprivate ObjectMapper objectMapper;/*** 构造方法执行后自动执行*/@PostConstructpublic void init() {//开启生产者的确定模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息没有到达交换机,原因为:{}", cause);//TODO 重发消息或者记录错误日志}});rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}", returnedMessage.getReplyText());//TODO 记录错误日志,给程序员发短信或者或者邮件});}public void sendMsg() throws JsonProcessingException {{//创建订单Orders orders1 = Orders.builder().orderId("order_12345").orderName("买的手机").orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();//转成jsonString strOrders1 = objectMapper.writeValueAsString(orders1);MessageProperties messageProperties = new MessageProperties();//设置单条消息的持久化,默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders1.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);}{Orders orders2 = Orders.builder().orderId("order_12345").orderName("买的手机").orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();String strOrders2 = objectMapper.writeValueAsString(orders2);MessageProperties messageProperties = new MessageProperties();//设置单条消息的持久化,默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders2.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);}log.info("消息发送完毕,发送时间为:{}", new Date());}
}

消费者

package com.longdidi.service;import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;import com.rabbitmq.client.Channel;@Component
@Slf4j
public class ReceiveMessageService {@Resourceprivate ObjectMapper objectMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})public void receiveMsg(Message message, Channel channel) throws IOException {//获取消息的唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();//使用objectmapper把字节数组反序列化成对象Orders orders = objectMapper.readValue(message.getBody(), Orders.class);try {log.info("接收到的消息为:{}", orders.toString());//如果不存在就在redis中存储Boolean setResult = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());if (setResult) {// TODO 向数据库插入订单等log.info("向数据库插入订单");}//手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("消息处理出现问题");try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}}
}

定义常量

package com.longdidi.constants;public class RabbitMQConstant {// 正常交换机public static final String EXCHANGE_NAME = "exchange.idempotent.normal.1";// 队列public static final String QUEUE_NAME1 = "queue.idempotent.normal.1";// 路由keypublic static final String ROUTING_NAME1 = "key.idempotent.normal.1";
}

定义实体类

package com.longdidi.vo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {private String orderId;private String orderName;private BigDecimal orderMoney;private Date orderTime; //下单时间
}

发送消息

package com.longdidi;import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Rabbitmq10Cluster01Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Rabbitmq10Cluster01Application.class, args);}@Resourceprivate SendMessageService sendMessageService;/*** 程序一启动就会运行该方法** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {sendMessageService.sendMsg();}
}

测试

在这里插入图片描述


http://www.ppmy.cn/news/1577958.html

相关文章

计算机三级网络技术知识点汇总【7】

第七章 路由器配置及使用 1. 路由器的基础知识 1.1 路由器的基本概念 路由器是工作在网络层的设备&#xff0c;负责将数据分组从源端主机经最佳路径传送到目的端主机&#xff0c;实现在网络层的互联。路由器工作在 TCP/IP 网络模型的网络层&#xff0c;对应于 OSI 网络参考模…

RuoYi框架添加自己的模块(学生管理系统CRUD)

RuoYi框架添加自己的模块&#xff08;学生管理系统&#xff09; 框架顺利运行 首先肯定要顺利运行框架了&#xff0c;这个我不多说了 设计数据库表 在ry数据库中添加表tb_student 表字段如图所示 如图所示 注意id字段是自增的 注释部分是后面成功后前端要展示的部分 导入…

【后端开发】go-zero微服务框架实践(goland框架对比,go-zero开发实践,文件上传问题优化等等)

【后端开发】go-zero微服务框架实践&#xff08;goland框架对比&#xff0c;go-zero开发实践&#xff0c;文件上传问题优化等&#xff09; 文章目录 1、go框架对比介绍2、go-zero 微服务开发实践3、go-zero 文件上传问题优化 1、go框架对比介绍 国内开源goland框架对比 1 go-…

电商行业门店管理软件架构设计与数据可视化实践

一、行业痛点与核心诉求 在电商多平台运营成为主流的背景下,企业普遍面临三大管理难题: ​数据碎片化:某头部服饰品牌2023年运营报告显示,其分布在8个平台的162家门店,日均产生23万条订单数据,但财务部门需要5个工作日才能完成跨平台利润核算。​成本核算失真:行业调研…

Linux网络环境配置及常用命令

一、Linux下网络配置的几种方式 在Linux中配置网络的方式有以下几种&#xff1a; 图形界面配置&#xff0c;操作方式如window系统配置IP&#xff0c;但这种方式会影响服务器的安全性和稳定性。Ifconfig命令临时配置IP地址&#xff0c;在我们重启计算机或重启网络服务后&#…

工程化与框架系列(19)--前端安全防护

前端安全防护 &#x1f512; 引言 随着Web应用的普及&#xff0c;前端安全问题日益突出。本文将深入探讨前端安全的各种威胁及其防护措施&#xff0c;帮助开发者构建更加安全的Web应用。在当今复杂的网络环境中&#xff0c;理解并实施有效的安全策略已经成为前端开发者的必备…

uniapp实现微信小程序一键登录

一、复制 AppID 和 AppSecret 去 微信公众平台 -> 开发与服务 -> 开发管理

六十天前端强化训练之第十四天之深入理解JavaScript异步编程

欢迎来到编程星辰海的博客讲解 目录 一、异步编程的本质与必要性 1.1 单线程的JavaScript运行时 1.2 阻塞与非阻塞的微观区别 1.3 异步操作的性能代价 二、事件循环机制深度解析 2.1 浏览器环境的事件循环架构 核心组件详解&#xff1a; 2.2 执行顺序实战分析 2.3 Nod…