rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积

news/2024/12/15 14:15:01/

rabbitmq_0">rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积

rabbitmq_2">1.python多线程使用rabbitmq包地址

flask_rabbitmq

2.解决后的包

import json
import logging
import signal
import sys
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutorimport pikaclass FlaskRabbitMQ:def __init__(self, app=None, queue=None, heartbeat=60, max_retries=3, max_workers=500):self.app = appself.queue = queueself.config = Noneself.heartbeat = heartbeatself.max_retries = max_retries  # 设置最大重试次数self.rabbitmq_server_host = Noneself.rabbitmq_server_username = Noneself.rabbitmq_server_password = Noneself._channel = Noneself._rpc_class_list = []self.data = {}self.logger = logging.getLogger('ApiLogger')self.error_logger = logging.getLogger('ErrorLogger')self._thread_local = threading.local()  # 为每个线程存储独立通道self.executor = ThreadPoolExecutor(max_workers=max_workers)  # 创建线程池if app:self.init_app(app)def init_app(self, app=None):self.app = appif self.queue:self.queue.init_app(app)self.config = self.app.configself.valid_config()def valid_config(self):self.rabbitmq_server_host = self.config.get('RABBITMQ_HOST')self.rabbitmq_server_username = self.config.get('RABBITMQ_USERNAME')self.rabbitmq_server_password = self.config.get('RABBITMQ_PASSWORD')def _create_new_connection(self, heart_beat=None):credentials = pika.PlainCredentials(self.rabbitmq_server_username,self.rabbitmq_server_password)if heart_beat is None:heart_beat = self.heartbeatparameters = pika.ConnectionParameters(self.rabbitmq_server_host,credentials=credentials,heartbeat=heart_beat)return pika.BlockingConnection(parameters)def _get_connection(self, heart_beat=None):# 为每个线程创建独立连接if not hasattr(self._thread_local, 'connection') or self._thread_local.connection.is_closed:self._thread_local.connection = self._create_new_connection(heart_beat)print(f'创建新连接_thread_local.connection:{self._thread_local.connection}')return self._thread_local.connectiondef _get_channel(self, heart_beat=None):# 每个线程使用独立连接的通道connection = self._get_connection(heart_beat)# 查看当前心跳设置print(f"Heartbeat: {connection._impl.params.heartbeat}")return connection.channel()def temporary_queue_declare(self):return self.queue_declare(exclusive=True, auto_delete=True)def queue_declare(self, queue_name='', passive=False, durable=False, exclusive=False, auto_delete=False,arguments=None):channel = self._get_channel()try:result = channel.queue_declare(queue=queue_name,passive=passive,durable=durable,exclusive=exclusive,auto_delete=auto_delete,arguments=arguments)return result.method.queueexcept pika.exceptions.ChannelClosedByBroker as e:if e.reply_code == 406 and "inequivalent arg 'durable'" in e.reply_text:self.error_logger.error(f"队列 '{queue_name}' 的持久化参数不匹配,正在删除并重新声明。")channel.queue_delete(queue=queue_name)result = channel.queue_declare(queue=queue_name,passive=passive,durable=durable,exclusive=exclusive,auto_delete=auto_delete,arguments=arguments)return result.method.queueelse:self.error_logger.error(f"声明队列 '{queue_name}' 时出错: {e}")raisefinally:channel.close()def queue_delete(self, queue_name):channel = self._get_channel()try:self._channel.queue_delete(queue=queue_name)self.logger.info(f"队列 '{queue_name}' 已成功删除。")except Exception as e:self.error_logger.error(f"删除队列 '{queue_name}' 失败: {e}")raisefinally:channel.close()def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue):channel = self._get_channel()try:channel.exchange_declare(exchange=exchange_name, exchange_type=type)channel.queue_bind(queue=queue, exchange=exchange_name, routing_key=routing_key)except Exception as e:self.error_logger.error(f"绑定队列 '{queue}' 到交换机 '{exchange_name}' 时出错: {e}")raisefinally:channel.close()def exchange_declare(self, exchange_name, exchange_type):channel = self._get_channel()try:channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)except Exception as e:self.error_logger.error(f"交换机 '{exchange_name}' 声明失败: {e}")raisefinally:channel.close()def queue_bind(self, exchange_name, routing_key, queue_name):channel = self._get_channel()try:channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)except Exception as e:self.error_logger.error(f"队列 '{queue_name}' 绑定到交换机 '{exchange_name}' 时出错: {e}")raisefinally:channel.close()def basic_consuming(self, queue_name, callback, arguments=None, auto_ack=False):channel = self._get_channel()try:channel.basic_consume(queue=queue_name, on_message_callback=callback, arguments=arguments,auto_ack=auto_ack)except Exception as e:self.error_logger.error(f"basic_consume 中的流失错误: {e}")finally:channel.close()def send_expire(self, body, exchange, key, properties=None, max_retries=3):channel = None  # 在外部初始化为 Nonetry:# 创建新通道进行消息发布channel = self._get_channel()if properties:channel.basic_publish(exchange=exchange,routing_key=key,body=body,properties=properties)else:channel.basic_publish(exchange=exchange,routing_key=key,body=body)except Exception as e:self.error_logger.error(f'推送消息异常:{e}')finally:if channel:  # 检查 channel 是否已定义channel.close()  # 关闭通道def send(self, body, exchange, key, corr_id=None):channel = self._get_channel()try:if not corr_id:channel.basic_publish(exchange=exchange,routing_key=key,body=body)else:channel.basic_publish(exchange=exchange,routing_key=key,body=body,properties=pika.BasicProperties(correlation_id=corr_id))finally:channel.close()  # 关闭通道def send_json(self, body, exchange, key, corr_id=None):data = json.dumps(body)self.send(data, exchange=exchange, key=key, corr_id=corr_id)def send_sync(self, body, key=None, timeout=5):if not key:raise Exception("The routing key is not present.")corr_id = str(uuid.uuid4())callback_queue = self.temporary_queue_declare()self.data[corr_id] = {'isAccept': False,'result': None,'reply_queue_name': callback_queue}channel = self._get_channel()try:# 设置消费回调channel.basic_consume(queue=callback_queue, on_message_callback=self.on_response, auto_ack=True)# 发送消息channel.basic_publish(exchange='',routing_key=key,body=body,properties=pika.BasicProperties(reply_to=callback_queue,correlation_id=corr_id,))# 等待响应end = time.time() + timeoutwhile time.time() < end:if self.data[corr_id]['isAccept']:self.logger.info("已接收到 RPC 服务器的响应 => {}".format(self.data[corr_id]['result']))return self.data[corr_id]['result']else:time.sleep(0.3)continueself.error_logger.error("获取响应超时。")return Nonefinally:channel.close()  # 关闭通道def send_json_sync(self, body, key=None):if not key:raise Exception("The routing key is not present.")data = json.dumps(body)return self.send_sync(data, key=key)def accept(self, key, result):self.data[key]['isAccept'] = Trueself.data[key]['result'] = str(result)channel = self._get_channel()try:# 删除回复队列channel.queue_delete(queue=self.data[key]['reply_queue_name'])finally:channel.close()  # 关闭通道def on_response(self, ch, method, props, body):self.logger.info("接收到响应 => {}".format(body))corr_id = props.correlation_idself.accept(corr_id, body)def register_class(self, rpc_class):if not hasattr(rpc_class, 'declare'):raise AttributeError("The registered class must contains the declare method")self._rpc_class_list.append(rpc_class)def _run(self):# 注册所有声明的类for item in self._rpc_class_list:item().declare()# 遍历所有在 Queue 中注册的回调函数for (type, queue_name, exchange_name, routing_key, version, callback, auto_ack,thread_num, heart_beat) in self.queue._rpc_class_list:if type == ExchangeType.DEFAULT:if not queue_name:# 如果队列名称为空,则声明一个临时队列queue_name = self.temporary_queue_declare()elif version == 1:self.basic_consuming(queue_name, callback, auto_ack=auto_ack)else:self._channel.queue_declare(queue=queue_name, auto_delete=True)self.basic_consuming(queue_name, callback)elif type in [ExchangeType.FANOUT, ExchangeType.DIRECT, ExchangeType.TOPIC]:if not queue_name:# 如果队列名称为空,则声明一个临时队列queue_name = self.temporary_queue_declare()elif version == 1:arguments = {'x-match': type,  # 设置 exchange_type'routing_key': routing_key,  # 设置 routing_key}self.basic_consuming(queue_name, callback, arguments=arguments, auto_ack=auto_ack)else:self._channel.queue_declare(queue=queue_name)self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name)# 消费队列self.basic_consuming(queue_name, callback)# 启动指定数量的线程来处理消息for _ in range(thread_num):self.executor.submit(self._start_thread_consumer, queue_name, callback, auto_ack, heart_beat)self.logger.info(" * Flask RabbitMQ 应用正在消费中")def _start_thread_consumer(self, queue_name, callback, auto_ack, heart_beat=None) -> None:channel = self._get_channel(heart_beat)channel.basic_qos(prefetch_count=1)try:channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)channel.start_consuming()except Exception as e:self.error_logger.error(f"Error in consumer thread: {e}")finally:channel.close()  # 消费完成后关闭通道,但不关闭连接def shutdown_executor(self):# 关闭线程池if self.executor:self.logger.info("关闭线程池...")self.executor.shutdown(wait=True)  # 等待所有线程完成再关闭def close_rabbitmq_connection(self):if hasattr(self._thread_local, 'connection') and self._thread_local.connection.is_open:self._thread_local.connection.close()def signal_handler(self, sig, frame):self.logger.info('RabbitMQ开始停止...')# 关闭线程池self.shutdown_executor()# 关闭 RabbitMQ 连接self.close_rabbitmq_connection()sys.exit(0)def run(self):# 捕获终止信号以进行优雅关闭signal.signal(signal.SIGINT, self.signal_handler)signal.signal(signal.SIGTERM, self.signal_handler)self._run()class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class Queue:"""支持多线程的Queue类"""def __init__(self) -> None:self._rpc_class_list = []self.app = Nonedef __call__(self, queue=None, type=ExchangeType.DEFAULT, version=0, exchange='', routing_key='', auto_ack=False,thread_num=1, heart_beat=60):def _(func):self._rpc_class_list.append((type, queue, exchange, routing_key, version, func, auto_ack, thread_num, heart_beat))return _def init_app(self, app=None):self.app = app

3.如何使用

@queue(queue='test', type=ExchangeType.DIRECT, exchange='record_exchange',routing_key='test', version=1, thread_num=15, heart_beat=300)
def prop_code_signal_callback(ch, method, props, body):try:data = json.loads(body)loger.info(f'prop_code_signal -> data:{data}')# 打印当前线程名称current_thread = threading.current_thread().nameloger.info(f'prop_code_signal ->当前线程名称: {current_thread}')# 业务逻辑# 处理成功,确认消息ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:loger.error(f"test出现异常: {e}")

http://www.ppmy.cn/news/1555315.html

相关文章

算法学习——双指针

今天我来分享下算法中的双指针 概念&#xff1a; 双指针是一种常见的算法技巧&#xff0c;通常用于解决数组和链表相关的问题的。 它通过使用两个指针来遍历数据结构&#xff0c;从而在一次遍历中完成某些任务&#xff0c;提高了效率。 注意这里的指针&#xff0c;可不是C语…

GIGABYTE技嘉主板电脑前端耳机接口无声音输出

一、基本情况 今年5月份&#xff0c;台式机电脑配有外放音响&#xff0c;接在主机后端耳机口。使用外放音响多&#xff0c;很少使用前置耳机接口。今天感觉外放效果不明显&#xff0c;想用耳机。拔掉外放音响后&#xff0c;耳机插入前端接口&#xff0c;发现没有声音输出。于是…

python学opencv|读取图像(九)用numpy创建黑白相间灰度图

【1】引言 前述学习过程中&#xff0c;掌握了用numpy创建矩阵数据&#xff0c;把所有像素点的BGR取值设置为0&#xff0c;然后创建纯黑灰度图的方法&#xff0c;具体链接为&#xff1a; python学opencv|读取图像&#xff08;八&#xff09;用numpy创建纯黑灰度图-CSDN博客 在…

状态管理实战:一次 Redux 到 React Query 的重构之旅

"老师,我们的后台管理系统状态管理好混乱啊&#xff01;"上周二的代码评审会上,小王一脸苦恼地说道。我打开代码仓库看了看,确实问题不小 - Redux store 里堆满了各种数据,有本地状态,有服务器数据,还有一些缓存,导致代码难以维护,性能也受到影响。 说实话,这个问题…

XML 在线格式化 - 加菲工具

XML 在线格式化 打开网站 加菲工具 选择“XML 在线格式化” 输入XML&#xff0c;点击左上角的“格式化”按钮 得到格式化后的结果

ERC论文阅读(03)--instructERC论文阅读笔记(2024-12-14)

instructERC论文阅读笔记 2024-12-14 论文题目&#xff1a;InstructERC: Reforming Emotion Recognition in Conversation with Multi-task Retrieval-Augmented Large Language Models 说明&#xff1a;以下内容纯属本人看论文及复现代码的记录&#xff0c;如想了解论文细节&…

【智体OS】官方上新发布智体机器人:使用rtrobot智体应用远程控制平衡车机器人

【智体OS】官方上新发布智体机器人&#xff1a;使用rtrobot智体应用远程控制平衡车机器人 dtns.network是一款主要由JavaScript编写的智体世界引擎&#xff08;内嵌了three.js编辑器的定制版-支持以第一视角浏览3D场馆&#xff09;&#xff0c;可以在浏览器和node.js、deno、e…

数据结构(顺序表)JAVA方法的介绍

前言 在 Java 中&#xff0c;集合类&#xff08;Collections&#xff09;是构建高效程序的核心组件之一&#xff0c;而 List 接口作为集合框架中的重要一员&#xff0c;是一个有序、可重复的元素集合。与 Set 接口不同&#xff0c;List 保证了元素的顺序性&#xff0c;并允许存…