RabbitMQ消息列队测试教程
- 一、环境准备
- 1. 安装 RabbitMQ
- 2. 安装 Python 依赖
- 二、基本消息队列中间件实现
- 1. 消息发送模块
- 2. 消息接收模块
- 三、扩展功能
- 1. 消息持久化和队列持久化
- 2. 消息优先级
- 3. 死信队列(DLQ)
- 四、并发处理和负载均衡
- 1. 使用 Python 的 `threading` 模块启动多个消费者
- 五、消息确认与重试机制
- 1. 消息确认
- 2. 重试机制
- 六、监控与日志记录
- 1. 日志记录
- 七、测试步骤
一、环境准备
1. 安装 RabbitMQ
你可以在本地或通过 Docker 启动 RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
默认用户名和密码为 guest
。你可以通过访问 http://localhost:15672 来管理 RabbitMQ。
2. 安装 Python 依赖
我们将使用 pika
库与 RabbitMQ 进行通信。安装 pika
:
pip install pika
二、基本消息队列中间件实现
我们首先构建一个简单的消息队列发送和接收框架,确保消息能够成功传递。
1. 消息发送模块
send_message.py
用于将消息发送到队列:
import pika# 连接到RabbitMQ服务器并发送消息
def send_to_queue(queue_name, message):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 发送消息到队列channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(delivery_mode=2, # 持久化消息))print(f"Sent: {message}")except Exception as e:print(f"Error sending message: {e}")finally:# 关闭连接connection.close()if __name__ == "__main__":send_to_queue('task_queue', 'Hello, RabbitMQ!')
2. 消息接收模块
receive_message.py
用于从队列接收消息并进行处理:
import pika# 连接到RabbitMQ服务器并接收消息
def receive_from_queue(queue_name):try:# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 确保队列存在channel.queue_declare(queue=queue_name, durable=True)# 定义回调函数来处理接收到的消息def callback(ch, method, properties, body):print(f"Received: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息已处理# 开始消费消息channel.basic_consume(queue=queue_name, on_message_callback=callback)print(f"Waiting for messages in {queue_name}...")channel.start_consuming()except Exception as e:print(f"Error receiving message: {e}")if __name__ == "__main__":receive_from_queue('task_queue')
三、扩展功能
1. 消息持久化和队列持久化
为了保证消息在 RabbitMQ 重启后不会丢失,我们需要确保队列和消息都被持久化。
-
在
send_message.py
中,使用delivery_mode=2
使消息持久化:properties=pika.BasicProperties(delivery_mode=2 # 持久化消息 )
-
在
receive_message.py
中,确保队列是持久化的:channel.queue_declare(queue=queue_name, durable=True)
2. 消息优先级
RabbitMQ 支持设置消息的优先级,这可以用来保证高优先级的消息先被处理。你可以在声明队列时设置最大优先级:
channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-priority': 10})
然后,在发送消息时,你可以指定消息的优先级:
channel.basic_publish(exchange='',routing_key=queue_name,body=message,properties=pika.BasicProperties(priority=5 # 设置消息优先级)
)
3. 死信队列(DLQ)
当消息消费失败或超时,可以将消息发送到死信队列。定义死信队列和交换机的方式如下:
channel.queue_declare(queue=queue_name,durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange','x-message-ttl': 10000 # 设置消息的超时时间}
)
然后,在死信队列中进行消息处理。
四、并发处理和负载均衡
RabbitMQ 能够支持多个消费者并行消费队列中的消息。你可以通过创建多个消费者来实现并发消费。
1. 使用 Python 的 threading
模块启动多个消费者
import threadingdef start_consumer():receive_from_queue('task_queue')# 启动5个消费者并行处理消息
for _ in range(5):threading.Thread(target=start_consumer).start()
RabbitMQ 会自动将消息分发到不同的消费者,实现负载均衡。
五、消息确认与重试机制
如果消息处理失败,可以使用 手动确认 来重试消息。
1. 消息确认
在消费者中,使用 ch.basic_ack
来确认消息已被处理:
ch.basic_ack(delivery_tag=method.delivery_tag)
如果消息处理失败,可以拒绝消息并重新排队:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
2. 重试机制
可以设置 死信队列 来处理失败的消息,或者使用延迟队列来定时重试消息。
channel.queue_declare(queue=retry_queue,durable=True,arguments={'x-dead-letter-exchange': 'retry_exchange','x-message-ttl': 5000 # 设置消息的超时时间}
)
六、监控与日志记录
为了有效地监控消息队列的运行情况,建议启用 RabbitMQ 的 管理插件。
- 你可以访问 RabbitMQ 管理界面:http://localhost:15672,查看队列、交换机和消费者的状态。
1. 日志记录
在生产环境中,使用 Python 的 logging
模块进行日志记录:
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(ch, method, properties, body):try:logger.info(f"Received message: {body.decode()}")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.error(f"Error processing message: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
七、测试步骤
- 发送测试消息:使用
send_message.py
向队列发送消息。 - 接收测试消息:使用
receive_message.py
接收消息并确认消息是否被正确消费。 - 消息持久化测试:重启 RabbitMQ,确保消息不会丢失。
- 并发消费测试:启动多个消费者,并测试消息是否能够均匀分发。
- 消息重试测试:模拟消费失败,并测试死信队列和消息重试机制是否有效。
推荐阅读:《大数据测试专栏》,《Docker实践详解》
下期预告:
- 《大数据测试之:RabbitMQ消息测试-消息加密和认证+不同消息列队测试+处理复杂的消息路由及交换机配置》