【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

news/2024/11/14 0:40:48/

作者:后端小肥肠

创作不易,未经允许禁止转载。

1. 前言

RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。

2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:

  • 用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由后台服务监听并处理发送邮件的任务,从而不会延迟用户的注册过程。
  • 订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步传递订单信息,确保处理流程的连续性和效率。

2.2 应用解耦

RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:

  • 微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。

2.3 流量削峰

在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:

  • 秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理能力逐步从队列中取出并处理这些请求,有效避免了系统崩溃。

2.4 通信与集成

RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:

  • 跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息传递中间件,实现这些系统的有效通信。

2.5 日志处理和应用监控

RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:

  • 集中式日志管理:通过RabbitMQ,各个系统和应用的日志可以被统一收集至一个中央处理位置,便于进行日志分析、监控和报警。

2.6 数据同步

RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:

  • 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。

  • 实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。

  • 缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。

通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。

3. 在项目中如何搭建稳定RabbitMQ架构体系

3.1. RabbitMQ安装

网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:

1. 环境准备,准备Cenos虚拟机,我的是7.x版本:

2. 拉取或解压RabbitMQ镜像:

3. 运行docker容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

4. 进入容器 :

 docker exec -it 容器id /bin/bash

5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。

3.2. 总体技术流程

本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:

上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。

基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:

生产者稳定架构

1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。

2. 消息确认表创建。创建消息确认表message_confirmation,记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。

java">CREATE TABLE "public"."message_confirmation" ("id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"status" int4,"create_time" timestamp(6),"update_time" timestamp(6),"message" varchar(255) COLLATE "pg_catalog"."default",CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
)
;ALTER TABLE "public"."message_confirmation" OWNER TO "postgres";

3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。

消费者稳定架构

1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。

2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。

java">CREATE TABLE "public"."message_error" ("id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,"error_log" text COLLATE "pg_catalog"."default","create_time" timestamp(6),"update_time" timestamp(6),CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
)
;ALTER TABLE "public"."message_error" OWNER TO "postgres";

3.3. 实战讲解

3.3.1. 环境配置
3.3.1.1. 所需版本工具
3.3.1.2. pom依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId></dependency>
</dependencies>
3.3.2. 生产者核心代码讲解

3.3.2.1. yml配置
java">server:port: 8873
spring:datasource:url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producerusername: postgrespassword: postgresdriver-class-name: org.postgresql.Driverrabbitmq:port: 5672host: 192.168.10.11username: adminpassword: adminvirtual-host: my_vhostpublisher-confirm-type: correlatedlistener:simple:acknowledge-mode: manual
3.3.2.2. 编写回调函数
java"> @PostConstructpublic void regCallback() {// 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("cause:"+cause);// 如果ack为true代表消息已经收到String messageId = correlationData.getId();if (!ack) {// 这里可能要进行其他的方式进行存储log.error("MQ队列应答失败,messageId是:" + messageId);return;}try {MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);messageConfirmation.setStatus(1);int count=messageConfirmationMapper.updateById(messageConfirmation);if (count == 1) {log.info("本地消息状态修改成功,消息成功投递到消息队列中...");}} catch (Exception ex) {log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());}}});}

上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。

3.3.2.3. 编写定时任务监听消息确认表
java">@Configuration
@EnableScheduling
@Slf4j
public class confirmMessageTaskService {@Autowiredprivate RabbitTemplate rabbitTemplate;@AutowiredMessageConfirmationMapper messageConfirmationMapper;@Scheduled(cron = "0 */1 * * * ?")public void sendMessage(){// 把消息为0的状态消息重新查询出来,投递到MQ中。LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(MessageConfirmation::getStatus, 0);List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper).stream().collect(Collectors.toList());noConfirmMessages.forEach((noConfirmMessage)->{rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),new CorrelationData(noConfirmMessage.getId()));});}
}

 上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。

3.3.2.4. 消息投递
java">    public void sendMessage(MessageConfirmation messageConfirmation) {messageConfirmationMapper.insert(messageConfirmation);rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),new CorrelationData(messageConfirmation.getId()));}

3.4. 消费者核心代码讲解

3.4.1. yml配置
java">server:port: 8872
spring:datasource:url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumerusername: postgrespassword: postgresdriver-class-name: org.postgresql.Driverrabbitmq:port: 5672host: 192.168.10.11username: adminpassword: adminvirtual-host: my_vhostlistener:simple:acknowledge-mode: manual
mybatis-plus:typeAliasesPackage: com.xfc.consumer.entitiesmapper-locations: classpath:mapper/*.xml
3.4.2. RabbitMQ配置类
java">@Configuration
public class RabbitMQConfig {/*** 死信队列* @return*/@Beanpublic FanoutExchange deadExchange() {return new FanoutExchange("dead_xfc_fanout_exchange", true, false);}@Beanpublic Queue deadXfcQueue() {return new Queue("dead.xfc.queue", true);}@Beanpublic Binding bindDeadXfc() {return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());}/*** 队列* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("xfc_fanout_exchange", true, false);}@Beanpublic Queue xfcQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");return new Queue("xfc.queue", true, false, false, args);}@Beanpublic Binding bindXfc() {return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());}
}

上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。 

3.4.3. 队列消费
java">@Service
@Slf4j
public class XfcMqConsumer {@RabbitListener(queues = {"xfc.queue"})public void messageconsumer(String message, Channel channel,CorrelationData correlationData,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {MessageConfirmation messageConfirmation=null;try {log.info("收到MQ的消息是: " + message );messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);/*** 编写业务逻辑*/} catch (Exception e) {e.printStackTrace();log.error("消息投放到死信队列"+e.getMessage(),e);channel.basicNack(tag,false,false);// 死信队列}}
}
3.4.4. 死信队列消费
java">@Service
@Slf4j
public class DeadMqConsumer {@AutowiredMessageErrorMapper messageErrorMapper;@RabbitListener(queues = {"dead.xfc.queue"})public void messageconsumer(String message, Channel channel,CorrelationData correlationData,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {MessageConfirmation messageConfirmation=null;try {log.info("收到MQ的消息是: " + message );messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);/*** 编写业务逻辑*/} catch (Exception e) {e.printStackTrace();/*** 写入message_error*/messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));channel.basicNack(tag,false,false);// 死信队列}}
}

3.5 效果测试

以上代码编写完成后需要进行架构效果测试,其步骤如下:

1. 消息投递测试

上图调用了消息投递接口。

在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。

2. 消费者正常消费测试

3. 消费异常测试

上图可看出消息消费异常投入到了死信队列。

在死信队列中依然消费失败。

消费失败后成功写入了日志表。

4. 结语

本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。


http://www.ppmy.cn/news/1453016.html

相关文章

brpc profiler

cpu profiler cpu profiler | bRPC MacOS的额外配置 在MacOS下&#xff0c;gperftools中的perl pprof脚本无法将函数地址转变成函数名&#xff0c;解决办法是&#xff1a; 安装standalone pprof&#xff0c;并把下载的pprof二进制文件路径写入环境变量GOOGLE_PPROF_BINARY_PA…

记录vue报错问题 in ./node_modules/axios/lib/platform/index.js

今天这个问题困扰了我许久 报错内容如下&#xff1a; 最初一直以为是我没装axios&#xff0c;又重新装了一次&#xff0c;后面才发现是axios版本原因&#xff0c;真的总是被版本的原因困住真的很烦 解决方法如下&#xff1a; 将axios的版本改为1.5.0 1、打开项目的文件夹“…

数据结构与算法-单向环形链表与约瑟夫问题(续思路与代码)

上一篇写的单向环形链表与约瑟夫问题简介和举例&#xff0c;这篇写思路和代码~ 目录 3.思路 3.1创建环形链表&#xff1a; 3.2遍历环形链表&#xff1a; 3.3产生出圈序号&#xff1a; 4.代码 4.1在构建环形链表时添加节点&#xff1a; 4.2遍历环形链表&#xff1a; 4.3产…

c语言刷题——输出图案

1.输出用“*”组成的X形图案 题目&#xff1a;请打印用“*”组成的X形图案 描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜线和正斜线的长度。 输出描述&#xff1a; 针对每行输…

eNSP-浮动静态路由配置

ip route-static 192.168.1.0 24 192.168.3.2 preference 60 #设置路由 目标网络地址 和 下一跳地址 preference值越大 优先级越低 一、搭建拓扑结构 二、主机配置 pc1 pc2 三、配置路由器 1.AR1路由器配置 <Huawei>sys #进入系统视图 [Huawei]int g0/0/0 #进入接…

Kafka的优点和缺点,以及适用场景

Kafka作为一个开源的分布式流处理平台&#xff0c;在大数据和实时处理领域具有广泛的应用。以下是Kafka的优点、缺点以及适用场景&#xff1a; 一、Kafka的优点 高吞吐量和低延迟&#xff1a;Kafka能够处理每秒数百万条消息&#xff0c;具有极低的延迟&#xff0c;使得它非常…

EPAI手绘建模APP颜色、贴图、材质、样式

⑦ 颜色选择页面 1) 颜色环选色。 图 65 颜色选择器-颜色环 2) RGB选色。 图 66 颜色选择器-RGB 3) HSL选色。 图 67 颜色选择器-HSL 4) 国风颜色库选色。 图 68 颜色选择器-国风 5) CSS颜色库选色。 图 69 颜色选择器-CSS 6) 历史颜色&#xff1a;保存最近使用的多个颜色&…

SpringBoot之Zuul服务

概述 Spring Cloud Netflix zuul组件是微服务架构中的网关组件,其核心功能是路由和过滤,目前公司业务就是基于Zuul搭建的网关服务,本文主要内容&#xff1a;Zuul的执行流程、Zuul过滤实现、Zuul安全配置 Zuul服务 Zuul执行流程 接收请求 客户端发送请求到Zuul服务器(前置经过…