Python RabbitMQ 消息队列监听

news/2024/10/25 8:28:28/

Python RabbitMQ 消息队列监听

# coding: utf-8
# 测试消息消费import datetime
import logging as log
import os
from pathlib import Path
from typing import Listimport pika# 设置日志格式
Path("./logs").mkdir(parents=True, exist_ok=True)
os.chdir("./logs/")
log_file_name = datetime.date.today().strftime("%Y-%m-%d")
log_format = ("%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s"
)
log.basicConfig(level=log.INFO,filename="python-check-" + log_file_name + ".log",datefmt="%Y-%m-%d %H:%M:%S",format=log_format,encoding="utf-8",
)class RabbitMQConsumer:def __init__(self, host="localhost", queue_name="test-queue", batch_size=5):self.host = hostself.queue_name = queue_nameself.batch_size = batch_sizeself.connection = Noneself.channel = Noneself.message_count = 0self.messages: List[pika.spec.Basic.Deliver] = []self.delivery_tags: List[int] = []def conn(self):log.info("测试消费者 连接RabbitMQ开始!")self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host))self.channel = self.connection.channel()# 声明队列self.channel.queue_declare(queue=self.queue_name, durable=True)# 设置QoS,限制未确认的消息数量self.channel.basic_qos(prefetch_count=self.batch_size)log.info("测试消费者 连接RabbitMQ成功!")def close(self):log.info("测试消费者 关闭RabbitMQ连接!")if self.connection and not self.connection.is_closed:self.connection.close()def start_consuming(self):log.info("测试消费者 监听开始!")try:# 确保已连接if not self.connection or self.connection.is_closed:self.conn()log.info(f"测试消费者 监听队列:{self.queue_name}")# 设置消费回调self.channel.basic_consume(queue=self.queue_name,on_message_callback=self.customer,)# 开始消费self.channel.start_consuming()except KeyboardInterrupt:log.error("测试消费者 停止消费!")self.close()except Exception as e:log.error(f"测试消费者 发生错误 停止消费:{str(e)}")self.close()def customer(self, ch, method, properties, body):log.error(f"测试消费者 接受到消息:{body.decode()}")try:# 打印消息内容print(f"测试消费者 接受到消息:{body.decode()}")# 设置计数器和列表 消息达到batch_size才消费self.messages.append(body)self.delivery_tags.append(method.delivery_tag)self.message_count += 1# 当达到批处理大小时,进行批量确认if self.message_count >= self.batch_size:print(f"\n批量处理完成 {self.batch_size} 条消息")print("消息内容:", [msg.decode() for msg in self.messages])# 确认所有消息ch.basic_ack(delivery_tag=self.delivery_tags[-1], multiple=True)# 重置计数器和列表self.message_count = 0self.messages = []self.delivery_tags = []except Exception as e:log.error(f"测试消费者 处理消息异常:{str(e)}")# 发生错误时,拒绝消息并重新入队ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)if __name__ == "__main__":# 创建消费者实例并开始消费consumer = RabbitMQConsumer(host="localhost", queue_name="test-queue", batch_size=5)consumer.start_consuming()

http://www.ppmy.cn/news/1541792.html

相关文章

74页PPT智能工厂整体规划方案

▲关注智慧方案文库,学习9000多份最新解决方案,其中 PPT、WORD超过7000多份 ,覆盖智慧城市多数领域的深度知识社区,稳定更新4年,日积月累,更懂行业需求。 智能工厂的定义 根据《智能工厂通用技术要求》的…

AIGC智能提示词项目实践(1):深入MySQL高级语法,提升开发效率

AIGC智能提示词项目实践-1:深入MySQL高级语法,提升开发效率 1.读取数据表中的字段进行脱敏(*加密)2.自动获取对应的数据表和字段3.表单有数据才进行更新的条件语句(构成数组)4.动态更新字段且进行条件判断5.动态更新数据表和字段6.字段自身1的操作7.多关…

【纯血鸿蒙】专项测试工具 DevEco Testing

DevEco Testing 为生态合作伙伴接入 HarmonyOS 生态提供专业的测试服务,共筑高品质的智能硬件产品。 云端服务平台面向开发者提供724 小时的远程多终端真机实验室,提供华为专业的应用安全隐私检测,提供基于华为真机的应用自动化测试。 访问地址:https://devecostudio.huawe…

阅读Go源码的顿悟时刻

Mattermost 的 Jess Espino 向 Natalie 讲述了他在阅读 Go 源代码时遇到的 10 个“顿悟时刻”(前六个)。第二部分(其余的顿悟时刻)即将推出! 本篇内容是根据2021年5月份#323 Aha moments reading Go’s source: Part …

rootless模式下istio ambient鉴权策略

环境说明 rootless模式下测试istio Ambient功能 四层鉴权策略 这里四层指的是网络通信模型的第四层,主要的传输协议为TCP和UDP。 用于限制服务间的通信,比如下面的策略应用于带有 app: productpage 标签的 Pod, 并且仅允许来自服务帐户 clus…

数组中的算法

目录 1.什么是数组 2.数组上的算法 2.1二分查找算法 什么是二分查找算法? 算法步骤 算法时间复杂度 一个问题 例题 题目分析 解题代码 2.2双指针法 什么是双指针法? 例题 题目分析 解题代码 1.什么是数组 数组是在一块连续的内存空间…

C/C++每日一练:实现冒泡排序

题目要求 编写一个程序,实现冒泡排序算法。给定一个由 n 个整数组成的数组,要求通过冒泡排序对数组从小到大进行排序。 输入:一个整数数组,长度为 n,数组中的元素可能是正数或负数。 输出:按照升序排序后的…

PHP企业门店订货通进销存系统小程序源码

订货通进销存系统,企业运营好帮手! 📦 开篇:告别繁琐,企业运营新选择 嘿,各位企业主和创业者们!今天我要给大家介绍一款超实用的企业运营神器——“订货通进销存系统”。在这个数字化时代&…