【大数据测试之:RabbitMQ消息列队测试-发送、接收、持久化、确认、重试、死信队列并处理消息的并发消费、负载均衡、监控等】详细教程---保姆级

devtools/2024/11/29 5:23:19/

RabbitMQ消息列队测试教程

  • 一、环境准备
    • 1. 安装 RabbitMQ
    • 2. 安装 Python 依赖
  • 二、基本消息队列中间件实现
    • 1. 消息发送模块
    • 2. 消息接收模块
  • 三、扩展功能
    • 1. 消息持久化和队列持久化
    • 2. 消息优先级
    • 3. 死信队列(DLQ)
  • 四、并发处理和负载均衡
    • 1. 使用 Python 的 `threading` 模块启动多个消费者
  • 五、消息确认与重试机制
    • 1. 消息确认
    • 2. 重试机制
  • 六、监控与日志记录
    • 1. 日志记录
  • 七、测试步骤

一、环境准备

1. 安装 RabbitMQ

你可以在本地或通过 Docker 启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

默认用户名和密码为 guest。你可以通过访问 http://localhost:15672 来管理 RabbitMQ。

2. 安装 Python 依赖

我们将使用 pika 库与 RabbitMQ 进行通信。安装 pika

pip install pika

二、基本消息队列中间件实现

我们首先构建一个简单的消息队列发送和接收框架,确保消息能够成功传递。

1. 消息发送模块

send_message.py 用于将消息发送到队列:

import pika# 连接到RabbitMQ服务器并发送消息
def send_to_queue(queue_name, message):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 发送消息到队列channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息))print(f"Sent: {message}")except Exception as e:print(f"Error sending message: {e}")finally:# 关闭连接connection.close()if __name__ == "__main__":send_to_queue('task_queue', 'Hello, RabbitMQ!')

2. 消息接收模块

receive_message.py 用于从队列接收消息并进行处理:

import pika# 连接到RabbitMQ服务器并接收消息
def receive_from_queue(queue_name):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 定义回调函数来处理接收到的消息def callback(ch, method, properties, body):print(f"Received: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息已处理# 开始消费消息channel.basic_consume(queue=queue_name, on_message_callback=callback)print(f"Waiting for messages in {queue_name}...")channel.start_consuming()except Exception as e:print(f"Error receiving message: {e}")if __name__ == "__main__":receive_from_queue('task_queue')

三、扩展功能

1. 消息持久化和队列持久化

为了保证消息在 RabbitMQ 重启后不会丢失,我们需要确保队列和消息都被持久化。

  • send_message.py 中,使用 delivery_mode=2 使消息持久化:

    properties=pika.BasicProperties(delivery_mode=2  # 持久化消息
    )
    
  • receive_message.py 中,确保队列是持久化的:

    channel.queue_declare(queue=queue_name, durable=True)
    

2. 消息优先级

RabbitMQ 支持设置消息的优先级,这可以用来保证高优先级的消息先被处理。你可以在声明队列时设置最大优先级:

channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-priority': 10})

然后,在发送消息时,你可以指定消息的优先级:

channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(priority=5  # 设置消息优先级)
)

3. 死信队列(DLQ)

当消息消费失败或超时,可以将消息发送到死信队列。定义死信队列和交换机的方式如下:

channel.queue_declare(queue=queue_name,durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange','x-message-ttl': 10000  # 设置消息的超时时间}
)

然后,在死信队列中进行消息处理。

四、并发处理和负载均衡

RabbitMQ 能够支持多个消费者并行消费队列中的消息。你可以通过创建多个消费者来实现并发消费。

1. 使用 Python 的 threading 模块启动多个消费者

import threadingdef start_consumer():receive_from_queue('task_queue')# 启动5个消费者并行处理消息
for _ in range(5):threading.Thread(target=start_consumer).start()

RabbitMQ 会自动将消息分发到不同的消费者,实现负载均衡。

五、消息确认与重试机制

如果消息处理失败,可以使用 手动确认 来重试消息。

1. 消息确认

在消费者中,使用 ch.basic_ack 来确认消息已被处理:

ch.basic_ack(delivery_tag=method.delivery_tag)

如果消息处理失败,可以拒绝消息并重新排队:

ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

2. 重试机制

可以设置 死信队列 来处理失败的消息,或者使用延迟队列来定时重试消息。

channel.queue_declare(queue=retry_queue,durable=True,arguments={'x-dead-letter-exchange': 'retry_exchange','x-message-ttl': 5000  # 设置消息的超时时间}
)

六、监控与日志记录

为了有效地监控消息队列的运行情况,建议启用 RabbitMQ 的 管理插件

  • 你可以访问 RabbitMQ 管理界面:http://localhost:15672,查看队列、交换机和消费者的状态。

1. 日志记录

在生产环境中,使用 Python 的 logging 模块进行日志记录:

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(ch, method, properties, body):try:logger.info(f"Received message: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.error(f"Error processing message: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

七、测试步骤

  1. 发送测试消息:使用 send_message.py 向队列发送消息。
  2. 接收测试消息:使用 receive_message.py 接收消息并确认消息是否被正确消费。
  3. 消息持久化测试:重启 RabbitMQ,确保消息不会丢失。
  4. 并发消费测试:启动多个消费者,并测试消息是否能够均匀分发。
  5. 消息重试测试:模拟消费失败,并测试死信队列和消息重试机制是否有效。

推荐阅读:《大数据测试专栏》,《Docker实践详解》


下期预告:

  • 大数据测试之:RabbitMQ消息测试-消息加密和认证+不同消息列队测试+处理复杂的消息路由及交换机配置》

http://www.ppmy.cn/devtools/137838.html

相关文章

基于华为昇腾910B,实战InternLM个人小助手认知微调

本文将带领大家基于华为云 ModelArts,使用 XTuner 单卡微调一个 InternLM 个人小助手。 开源链接:(欢迎 star) https://github.com/InternLM/InternLM https://github.com/InternLM/xtuner XTuner 简介 XTuner 是一个高效、灵…

【数字图像处理+MATLAB】通过迭代全局阈值处理算法(Iterative Global Algorithm)实现图像分割

引言 图像分割是将数字图像划分为多个区域(或像素的集合)的过程,这些区域通常对应于真实世界的物体或图像中的特定部分。图像分割的目标是简化或改变图像的表示形式,使得图像更容易理解和分析。图像分割通常用于定位图像中的物体…

比特币libsecp256k1中safegcd算法形式化验证完成

1. 引言 比特币和其他链(如 Liquid)的安全性取决于 ECDSA 和 Schnorr 签名等数字签名算法的使用。Bitcoin Core 和 Liquid 都使用名为 libsecp256k1 的 C 库来提供这些数字签名算法,该库以其所运行的椭圆曲线命名。这些算法利用一种称为modu…

DataGuard 主要参数配置详解

1. 基本概念 DB_NAME:主备各节点实例使用相同的 db_name。推荐与 service_name 一致。DB_UNIQUE_NAME:主备端数据库的唯一名称,设定后不可再更改。注意,如果主备 db_unique_name 不一样,需要与 LOG_ARCHIVE_CONFIG 配…

ffmpeg.js视频播放(转换)

chrome 临时设置SharedArrayBuffer "C:\Program Files\Google\Chrome\Application\chrome.exe" --enable-featuresSharedArrayBuffer 引用的js及相关文件 ffmpeg.min.js ffmpeg.min.js.map ffmpeg-core.js ffmpeg-core.wasm ffmpeg-core.worker.js 以上几个现…

网络安全期末复习

第1章 网络安全概括 (1)用户模式切换到系统配置模式(enable)。 (2)显示当前位置的设置信息,很方便了解系统设置(show running-config)。 (3)显…

C++ 【异步日志模块和std::cout << 一样使用习惯替代性好】 使用示例,后续加上远程日志

简单 易用 使用示例 CLogSystem::Instance().SetLogLevel( E_LOG_LEVEL::LOG_LEVEL_INFO | E_LOG_LEVEL::LOG_LEVEL_DEBUG | E_LOG_LEVEL::LOG_LEVEL_DUMP );CLogSystem::Instance().SetFileInfo(true, "./log.txt");LogDebug() << 12;LogInfo() << &qu…

一分钟食用前端测试框架Jest

安装 其实食用Jest是很简单的,我们只需要安装Jest即可 npm install --save-dev jestyarn add --dev jestpnpm add --save-dev jest ESmodule 本身来说,Jest是不支持Esmodule的,他支持CommonJS,我们需要Babel改一下 npm i --save-dev babel-jest babel/core babel/preset-env …