使用RabbitMQ实现流量削峰填谷

embedded/2025/3/14 17:00:45/

原理

流量削峰填谷是指在面对突发的高流量时,通过消息队列将瞬时大量请求暂时存储起来,并逐步处理这些请求,从而避免系统过载。RabbitMQ 作为消息中间件可以很好地支持这一需求,特别是结合其延时消息插件(rabbitmq_delayed_message_exchange),可以在处理消息时加入延时逻辑,进一步优化系统的负载。

  1. 生产者发送消息:当有大量请求涌入时,生产者将这些请求转化为消息并发送到 RabbitMQ 队列中。
  2. 消费者异步处理:消费者从队列中异步获取消息并进行处理。由于消息是逐步被消费的,因此即使短时间内有大量的请求进入系统,也不会导致系统崩溃。
  3. 延时消息处理:对于某些需要延时处理的消息,可以通过 RabbitMQ 的延时消息插件来设置消息的延时时间,使得这些消息在指定的时间后才被消费者处理。

详细步骤

一、环境准备

1. 安装 RabbitMQ 并启用延时消息插件

首先确保你已经安装了 RabbitMQ,并启用了管理插件以便通过 Web 界面进行管理。如果还没有安装 RabbitMQ,可以通过 Docker 快速启动一个 RabbitMQ 实例:

# 启动 RabbitMQ 容器
docker run -d --name rabbitmq 
-p 5672:5672 -p 15672:15672 
rabbitmq:3-management //指向特定的 3.x 版本并包含管理插件

访问 http://localhost:15672 进入 RabbitMQ 的管理界面,默认用户名和密码都是 guest

接下来,安装并启用 RabbitMQ 延时消息插件:

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez -P /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重启 RabbitMQ 服务
systemctl restart rabbitmq-server

二、RabbitMQ 配置与队列设置

1. 创建队列和交换机

我们使用延时插件的延时交换机来处理延时消息。以下是使用命令行工具 rabbitmqadmin 创建队列和交换机的示例:

# 使用 RabbitMQ 管理插件或命令行工具创建队列
rabbitmqadmin declare queue name=order_queue durable=true arguments='{"x-max-length": 10000, "x-overflow": "drop-head"}'# 创建延时交换机
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message durable=true arguments='{"x-delayed-type": "direct"}'# 绑定队列到交换机
rabbitmqadmin declare binding source=delayed_exchange destination=order_queue routing_key=order_routing_key

三、Spring Boot 应用程序配置

1. pom.xml 添加依赖

在 Spring Boot 项目中添加 RabbitMQ 和相关依赖:

<dependencies><!-- Spring Boot Starter for AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Jackson for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok for reducing boilerplate code --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
2. application.yml 配置文件

src/main/resources/application.yml 中配置 RabbitMQ 连接信息:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /listener:simple:acknowledge-mode: manual  # 手动确认模式concurrency: 10           # 消费者并发数max-concurrency: 20       # 最大消费者并发数
3. RabbitMQConfig.java 配置类

定义 RabbitMQ 的配置类,用于声明队列、交换机和绑定关系:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic Queue orderQueue() {return QueueBuilder.durable("order_queue").withArgument("x-max-length", 10000)  // 设置最大长度为10000.withArgument("x-overflow", "drop-head")  // 当队列满时丢弃最早的未消费消息.build();}@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}@Beanpublic Binding binding(Queue orderQueue, CustomExchange delayExchange) {return BindingBuilder.bind(orderQueue).to(delayExchange).with("order_routing_key").noargs();}
}

四、生产者发送消息

1. OrderProducer.java

编写生产者代码,将订单信息作为消息发送到 order_queue 中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic OrderProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendOrder(String orderData) {System.out.println(" [x] Sent order: " + orderData);rabbitTemplate.convertAndSend("delayed_exchange", "order_routing_key", orderData);}public void sendDelayedOrder(String orderData, long delayTime) {System.out.println(" [x] Sent delayed order: " + orderData + " with delay: " + delayTime + " ms");// 设置消息后处理器,添加延迟时间MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);return message;};rabbitTemplate.convertAndSend("delayed_exchange", "order_routing_key", orderData, messagePostProcessor);}
}
2. OrderController.java

编写控制器以接收 HTTP 请求并调用生产者发送消息:

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {private final OrderProducer orderProducer;@Autowiredpublic OrderController(OrderProducer orderProducer) {this.orderProducer = orderProducer;}@PostMapping("/submitOrder")public ResponseEntity<String> submitOrder(@RequestBody String orderData) {orderProducer.sendOrder(orderData);return ResponseEntity.ok("Order submitted successfully!");}@PostMapping("/submitDelayedOrder")public ResponseEntity<String> submitDelayedOrder(@RequestBody String orderData, @RequestParam long delayTime) {orderProducer.sendDelayedOrder(orderData, delayTime);return ResponseEntity.ok("Delayed order submitted successfully!");}
}

五、消费者处理消息

1. OrderConsumer.java

编写消费者代码,从队列中获取消息并处理订单:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;@Service
public class OrderConsumer {private static final Logger logger = LoggerFactory.getLogger(OrderConsumer.class);@RabbitListener(queues = "order_queue")public void receiveOrder(String orderData) {logger.info("Received order: {}", orderData);processOrder(orderData);}private void processOrder(String orderData) {logger.info("Processing order: {}", orderData);try {// 模拟订单处理逻辑Thread.sleep(2000);  // 模拟处理时间} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

六、测试与验证

1. 启动 RabbitMQ 服务
docker start rabbitmq
2. 编译并启动 Spring Boot 应用程序
mvn spring-boot:run
3. 使用 Postman 或 curl 测试订单提交接口
提交普通订
提交延时订单
curl -X POST 
http://localhost:8080/submitDelayedOrder?delayTime=5000 
-H "Content-Type: application/json" 
-d '{"user_id": 12345, "product_id": 67890, "quantity": 2, "price": 199.99}'1. curl
curl 是一个用于在不同协议之间传输数据的命令行工具。它支持多种协议,包括HTTP、HTTPS、FTP等。在这个例子中,它被用来发送HTTP请求。
2. -X POST
-X 参数允许您指定HTTP请求的方法。这里使用的是POST方法,通常用于向服务器提交数据或更新资源。
3. http://localhost:8080/submitDelayedOrder?delayTime=5000
这是目标URL,表示请求将被发送到运行在本地机器(localhost)上的服务,监听端口为8080。路径/submitDelayedOrder指定了API的具体端点,而查询参数delayTime=5000可能指示该订单将会延迟5秒(5000毫秒)处理。
4. -H "Content-Type: application/json"
-H 标志用于添加HTTP头信息。这里的头信息指定了内容类型为application/json,意味着请求体中的数据将以JSON格式进行编码。
5. -d '{"user_id": 12345, "product_id": 67890, "quantity": 2, "price": 199.99}'
-d 参数用于指定HTTP请求的数据体。在这个例子中,数据是以JSON格式提供的,包含用户ID、产品ID、购买数量和单价的信息。

查看控制台输出,确认消息被发送到队列并由消费者处理。

七、优化与扩展

1. 动态调整消费者数量

为了动态调整消费者的数量,可以使用 Kubernetes 的 Horizontal Pod Autoscaler (HPA) 来根据队列长度自动扩展消费者实例。例如:

Kubernetes Deployment YAML 文件

apiVersion: apps/v1
kind: Deployment
metadata:name: order-consumer
spec:replicas: 3  # 初始副本数selector:matchLabels:app: order-consumertemplate:metadata:labels:app: order-consumerspec:containers:- name: order-consumerimage: your-order-consumer-imageenv:- name: SPRING_RABBITMQ_HOSTvalue: "rabbitmq-service"resources:requests:memory: "64Mi"cpu: "250m"limits:memory: "128Mi"cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:name: rabbitmq-service
spec:selector:app: rabbitmqports:- protocol: TCPport: 5672targetPort: 5672

自动扩展策略

使用 Horizontal Pod Autoscaler (HPA) 来根据队列长度自动扩展消费者实例:

kubectl autoscale deployment order-consumer --min=3 --max=10 --cpu-percent=80
2. 监控与报警

为了确保系统的稳定性,还需要对 RabbitMQ 进行监控和报警。可以使用 Prometheus 和 Grafana 来监控 RabbitMQ 的状态,并设置报警规则。

Prometheus Adapter 配置

为了监控 RabbitMQ 队列长度,需要安装 Prometheus Adapter:

# custom-metrics-config-map.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: adapter-config
data:config.yaml: |rules:- seriesQuery: '{__name__="rabbitmq_queue_messages", queue="order_queue"}'seriesFilters: []resources:overrides:namespace:resource: namespacepod:resource: podname:matches: ""as: "rabbitmq_queue_length"metricsQuery: sum(rabbitmq_queue_messages{queue="order_queue"})

应用配置并更新 HPA 规则:

kubectl apply -f custom-metrics-config-map.yaml
kubectl apply -f hpa-custom-metrics.yaml

其中,hpa-custom-metrics.yaml 文件定义了如何基于自定义指标进行扩展:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:name: order-consumer-hpa
spec:scaleTargetRef:apiVersion: apps/v1kind: Deploymentname: order-consumerminReplicas: 3maxReplicas: 10metrics:- type: Podspods:metric:name: rabbitmq_queue_lengthtarget:type: AverageValueaverageValue: 5000
3. 日志记录与分析

为了更好地排查问题和优化系统性能,建议启用日志记录和分析功能。可以使用 ELK Stack(Elasticsearch, Logstash, Kibana)来进行日志管理和分析。

OrderConsumer.java 中添加日志记录

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;@Service
public class OrderConsumer {private static final Logger logger = LoggerFactory.getLogger(OrderConsumer.class);@RabbitListener(queues = "order_queue")public void receiveOrder(String orderData) {logger.info("Received order: {}", orderData);processOrder(orderData);}private void processOrder(String orderData) {logger.info("Processing order: {}", orderData);try {// 模拟订单处理逻辑Thread.sleep(2000);  // 模拟处理时间} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

使用 ELK Stack 进行日志分析

ELK Stack(Elasticsearch, Logstash, Kibana)是一个强大的日志管理和分析工具集。你可以将日志集中存储在 Elasticsearch 中,并通过 Kibana 进行可视化分析。

八、高级特性与最佳实践

1. 消息确认机制

为了确保消息可靠传递,可以使用手动确认机制。修改 OrderConsumer.java 中的消息监听器:

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")
public void receiveOrder(Channel channel, Message message, String orderData) throws IOException {logger.info("Received order: {}", orderData);try {processOrder(orderData);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}
2. 消息重试机制

当消费者处理失败时,可以设置重试机制。可以在 application.yml 中配置重试策略:

spring:rabbitmq:listener:simple:retry:enabled: trueinitial-interval: 1000max-attempts: 3max-interval: 10000multiplier: 2
3. 消息持久化

为了防止 RabbitMQ 重启导致消息丢失,可以将消息设置为持久化:

MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;
};

总结

通过上述步骤,我们实现了基于 RabbitMQ 的流量削峰填谷系统,并利用 RabbitMQ 的延时消息插件处理延迟消息。


http://www.ppmy.cn/embedded/172530.html

相关文章

C++程序设计语言笔记——基本功能:源文件与程序

0 用头文件表达接口、强调逻辑结构。 我们以 C 语言为例&#xff0c;展示如何通过头文件组织模块化设计&#xff1a; 示例场景&#xff1a;日志模块接口设计 文件结构 include/log.h // 公共接口log_config.h // 配置参数log_internal.h // 内部实现细节&#xf…

什么是大模型微调?

在大模型&#xff08;如GPT、BERT、LLaMA等&#xff09;广泛应用的今天&#xff0c;“微调”&#xff08;Fine-Tuning&#xff09;已成为释放模型潜力的关键技术。它通过针对特定任务调整预训练模型&#xff0c;使其从“通才”变为“专才”。本文将从概念、原理到实践&#xff…

C语言 进阶指针学习笔记

文章目录 字符指针指针数组数组指针数组名数组传参 函数指针函数指针数组指向函数指针数组的指针 回调函数Qsort 的使用通过冒泡排序模拟实现 qsort 大部分的内容都写在代码注释中 指针有类型&#xff0c;指针的类型决定了指针的整数的步长&#xff0c;指针解引用操作的时候的权…

Spring Boot中实现多租户架构

Spring Boot中实现多租户架构 在当今的企业级应用开发中&#xff0c;多租户架构已经成为一项关键技术&#xff0c;尤其是对于需要服务多个客户群体的 SaaS&#xff08;软件即服务&#xff09;系统。多租户架构的核心思想是通过共享资源来降低运营成本&#xff0c;同时确保各个…

如何安全处置旧设备?

每年&#xff0c;数百万台旧设备因老化、故障或被新产品取代而被丢弃&#xff0c;这些设备上存储的数据可能带来安全风险。 如果设备没有被正确删除数据&#xff0c;这些数据往往仍可被恢复。因此&#xff0c;安全处置旧设备至关重要。 旧设备可能包含的敏感数据 旧设备中可能…

产城融合典范:树莓科技如何助力宜宾数字经济腾飞​

宜宾在推动数字经济发展的征程中&#xff0c;树莓科技扮演着至关重要的角色&#xff0c;堪称产城融合的典范。 树莓科技入驻宜宾后&#xff0c;积极与当地政府合作&#xff0c;以产业发展带动城市建设&#xff0c;以城市功能完善促进产业升级。在产业布局上&#xff0c;树莓科…

docker使用robot用户登录harbor

此前一直使用 docker login harbor.devops.baga.life -u ‘robot:$baga’ -p ‘xxxxxxpassword’ 这次登录就报错了 Error response from daemon: Get "https://harbor.devops.tantin.com/v2/": unauthorized:可能是docker版本发生变化&#xff0c;robot用户识别方…

【CentOS】搭建Radius服务器

目录 背景简介&#xff1a;Radius是什么&#xff1f;Radius服务器验证原理搭建Radius服务器环境信息yum在线安装配置FreeRADIUS相关文件clients.conf文件users文件重启服务 验证 参考链接 背景 在项目中需要用到Radius服务器作为数据库代理用户的外部验证服务器&#xff0c;做…