【大数据测试之:RabbitMQ消息列队测试-发送、接收、持久化、确认、重试、死信队列并处理消息的并发消费、负载均衡、监控等】详细教程---保姆级

server/2024/11/30 5:59:14/

RabbitMQ消息列队测试教程

  • 一、环境准备
    • 1. 安装 RabbitMQ
    • 2. 安装 Python 依赖
  • 二、基本消息队列中间件实现
    • 1. 消息发送模块
    • 2. 消息接收模块
  • 三、扩展功能
    • 1. 消息持久化和队列持久化
    • 2. 消息优先级
    • 3. 死信队列(DLQ)
  • 四、并发处理和负载均衡
    • 1. 使用 Python 的 `threading` 模块启动多个消费者
  • 五、消息确认与重试机制
    • 1. 消息确认
    • 2. 重试机制
  • 六、监控与日志记录
    • 1. 日志记录
  • 七、测试步骤

一、环境准备

1. 安装 RabbitMQ

你可以在本地或通过 Docker 启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

默认用户名和密码为 guest。你可以通过访问 http://localhost:15672 来管理 RabbitMQ。

2. 安装 Python 依赖

我们将使用 pika 库与 RabbitMQ 进行通信。安装 pika

pip install pika

二、基本消息队列中间件实现

我们首先构建一个简单的消息队列发送和接收框架,确保消息能够成功传递。

1. 消息发送模块

send_message.py 用于将消息发送到队列:

import pika# 连接到RabbitMQ服务器并发送消息
def send_to_queue(queue_name, message):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 发送消息到队列channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息))print(f"Sent: {message}")except Exception as e:print(f"Error sending message: {e}")finally:# 关闭连接connection.close()if __name__ == "__main__":send_to_queue('task_queue', 'Hello, RabbitMQ!')

2. 消息接收模块

receive_message.py 用于从队列接收消息并进行处理:

import pika# 连接到RabbitMQ服务器并接收消息
def receive_from_queue(queue_name):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 定义回调函数来处理接收到的消息def callback(ch, method, properties, body):print(f"Received: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息已处理# 开始消费消息channel.basic_consume(queue=queue_name, on_message_callback=callback)print(f"Waiting for messages in {queue_name}...")channel.start_consuming()except Exception as e:print(f"Error receiving message: {e}")if __name__ == "__main__":receive_from_queue('task_queue')

三、扩展功能

1. 消息持久化和队列持久化

为了保证消息在 RabbitMQ 重启后不会丢失,我们需要确保队列和消息都被持久化。

  • send_message.py 中,使用 delivery_mode=2 使消息持久化:

    properties=pika.BasicProperties(delivery_mode=2  # 持久化消息
    )
    
  • receive_message.py 中,确保队列是持久化的:

    channel.queue_declare(queue=queue_name, durable=True)
    

2. 消息优先级

RabbitMQ 支持设置消息的优先级,这可以用来保证高优先级的消息先被处理。你可以在声明队列时设置最大优先级:

channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-priority': 10})

然后,在发送消息时,你可以指定消息的优先级:

channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(priority=5  # 设置消息优先级)
)

3. 死信队列(DLQ)

当消息消费失败或超时,可以将消息发送到死信队列。定义死信队列和交换机的方式如下:

channel.queue_declare(queue=queue_name,durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange','x-message-ttl': 10000  # 设置消息的超时时间}
)

然后,在死信队列中进行消息处理。

四、并发处理和负载均衡

RabbitMQ 能够支持多个消费者并行消费队列中的消息。你可以通过创建多个消费者来实现并发消费。

1. 使用 Python 的 threading 模块启动多个消费者

import threadingdef start_consumer():receive_from_queue('task_queue')# 启动5个消费者并行处理消息
for _ in range(5):threading.Thread(target=start_consumer).start()

RabbitMQ 会自动将消息分发到不同的消费者,实现负载均衡。

五、消息确认与重试机制

如果消息处理失败,可以使用 手动确认 来重试消息。

1. 消息确认

在消费者中,使用 ch.basic_ack 来确认消息已被处理:

ch.basic_ack(delivery_tag=method.delivery_tag)

如果消息处理失败,可以拒绝消息并重新排队:

ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

2. 重试机制

可以设置 死信队列 来处理失败的消息,或者使用延迟队列来定时重试消息。

channel.queue_declare(queue=retry_queue,durable=True,arguments={'x-dead-letter-exchange': 'retry_exchange','x-message-ttl': 5000  # 设置消息的超时时间}
)

六、监控与日志记录

为了有效地监控消息队列的运行情况,建议启用 RabbitMQ 的 管理插件

  • 你可以访问 RabbitMQ 管理界面:http://localhost:15672,查看队列、交换机和消费者的状态。

1. 日志记录

在生产环境中,使用 Python 的 logging 模块进行日志记录:

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(ch, method, properties, body):try:logger.info(f"Received message: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.error(f"Error processing message: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

七、测试步骤

  1. 发送测试消息:使用 send_message.py 向队列发送消息。
  2. 接收测试消息:使用 receive_message.py 接收消息并确认消息是否被正确消费。
  3. 消息持久化测试:重启 RabbitMQ,确保消息不会丢失。
  4. 并发消费测试:启动多个消费者,并测试消息是否能够均匀分发。
  5. 消息重试测试:模拟消费失败,并测试死信队列和消息重试机制是否有效。

推荐阅读:《大数据测试专栏》,《Docker实践详解》


下期预告:

  • 大数据测试之:RabbitMQ消息测试-消息加密和认证+不同消息列队测试+处理复杂的消息路由及交换机配置》

http://www.ppmy.cn/server/146089.html

相关文章

Qt—QLineEdit 使用总结

文章参考:Qt—QLineEdit 使用总结 一、简述 QLineEdit是一个单行文本编辑控件。 使用者可以通过很多函数,输入和编辑单行文本,比如撤销、恢复、剪切、粘贴以及拖放等。 通过改变 QLineEdit 的 echoMode() ,可以设置其属性,比如以密码的形式输入。 文本的长度可以由 m…

.npmrc文件的用途

.npmrc 文件是 npm(Node.js 的包管理工具)用于配置项目或用户的设置文件。它可以存储与 npm 相关的配置信息,如注册表地址、认证信息、代理设置、安装路径等。.npmrc 文件可以出现在不同的地方,具有不同的作用范围,通常…

kafka学习-02

kafka分区的分配以及再平衡: 4个: 1、Range 以及再平衡 1)Range 分区策略原理 2)Range 分区分配策略案例 (1)修改主题 first 为 7 个分区。 bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --…

视频流媒体服务解决方案之Liveweb视频汇聚平台

一,Liveweb视频汇聚平台简介: LiveWeb是深圳市好游科技有限公司开发的一套综合视频汇聚管理平台,可提供多协议(RTSP/RTMP/GB28181/海康Ehome/大华,海康SDK等)的视频设备接入,支持GB/T28181上下级联&#xf…

Linux查看网络基础命令

文章目录 Linux网络基础命令1. ifconfig 和 ip一、ifconfig命令二、ip命令 2. ss命令一、基本用法二、常用选项三、输出信息四、使用示例 3. sar 命令一、使用sar查看网络使用情况 4. ping 命令一、基本用法二、常用选项三、输出结果四、使用示例 Linux网络基础命令 1. ifconf…

传输控制协议(TCP)

传输控制协议是Internet一个重要的传输层协议。TCP提供面向连接、可靠、有序、字节流传输服务。 1、TCP报文段结构 注:TCP默认采用累积确认机制。 2、三次握手、四次挥手 (1)当客户向服务器发送完最后一个数据段后,发送一个FIN段…

国产FPGA+DSP 双FMC 6U VPX处理板

高性能国产化信号处理平台采用6U VPX架构,双FMC接口国产V7 FPGA 国产多核 DSP 的硬件架构,可以完成一体化电子系统、有源相控阵雷达、电子侦察、MIMO 通信、声呐等领域的高速实时信号处理。 信号处理平台的组成框图如图 1 所示, DSP处理器采…

【docker】8. 镜像仓库实战

综合实战一:搭建一个 nginx 服务 Web 服务器 Web 服务器,一般是指“网站服务器”,是指驻留于互联网上某种类型计算机的程序。Web 服务器可以向 Web 浏览器等客户端提供文档,也可以放置网站文件,让全世界浏览&#xf…