supervisor 简单理解

embedded/2024/10/19 23:24:13/

1. 找到配置文件,/etc/supervisor/supervisor.conf

添加
[include]
files = /www/server/panel/plugin/supervisor/profile/*.ini

test.ini文件内容

[program:rabbitmq-consume]
process_name=%(program_name)s_%(process_num)02d
directory=/www/wwwroot
command=/www/server/php/73/bin/php /www/wwwroot/project/yii swoole-consume/run
autostart=true
autorestart=true
user=www
numprocs=1
redirect_stderr=true
stdout_logfile=/www/wwwlogs/project_rabbitmq_consume.log[program:swoole-crontab-server]
process_name=%(program_name)s_%(process_num)02d
directory=/www/wwwroot
command=/www/server/php/73/bin/php /www/wwwroot/project/yii swoole-crontab/server-run
autostart=true
autorestart=true
user=www
numprocs=1
redirect_stderr=true
stdout_logfile=/www/wwwlogs/project_swoole_crontab.log[program:swoole-crontab-client]
process_name=%(program_name)s_%(process_num)02d
directory=/www/wwwroot
command=/www/server/php/73/bin/php /www/wwwroot/project/yii swoole-crontab/client-run
autostart=true
autorestart=true
user=www
numprocs=2
redirect_stderr=true
stdout_logfile=/www/wwwlogs/project_swoole_crontab.log

swoole-consume/run.php

<?php/*** 进程池多消费者,多队列消费rabbitMQ队列*/
namespace console\controllers;use common\enums\RabbitMqEnum;
use console\controllers\swoole_server\BaseSwooleServer;
use Exception;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use ReflectionMethod;
use Swoole\Process\Pool;
use Yii;
/*** rabbitMQ队列消费者进程池*/
class SwooleConsumeController extends BaseSwooleServer
{public $worker_num;public $consume_setting = [];// 进程与消费者的对应关系public $worker_queue_relate = [];public function init(){$worker_num_arr = array_column($this->consume_setting,'worker_num');$this->worker_num = array_sum($worker_num_arr);$worker_id = 0;foreach($this->consume_setting as $key=>$queue_info){$wait_assign_worker_num = $queue_info['worker_num'];while($wait_assign_worker_num > 0){$this->worker_queue_relate[$worker_id] = $key;$worker_id ++;$wait_assign_worker_num --;}}}/*** 启动消费者* @return void*/public function actionRun(){// 计数器,当进程反复退出重启时,可能代码有致命错误,需要处理// $atomic = new Atomic();$pool = new Pool($this->worker_num);//绑定一个事件$pool->on("WorkerStart", function ($pool, $worker_id) {echo 'worker_id:'.$worker_id.PHP_EOL;$queue_key = $this->worker_queue_relate[$worker_id];$queue_info = $this->consume_setting[$queue_key];//设置进程名称swoole_set_process_name('swoole : rabbitmq_process '.$queue_info['queue_name'].'_'.$worker_id);$process = $pool->getProcess($worker_id);try {Yii::$app->services->rabbitMq->listen($queue_info['exchange'], $queue_info['queue_name'], function ($message) {$res = $this->handleMessage($message);Yii::getLogger()->flush(true);return $res;});} catch (Exception $e) {Yii::getLogger()->flush(true);echo 'exception:' . $e->getMessage().$e->getFile().':'.$e->getLine().PHP_EOL;}});//子进程关闭$pool->on("WorkerStop", function ($pool, $workerId) {echo "Worker#{$workerId} is stopped\n";});$pool->start();}/*** 消息处理* @param AMQPMessage $message* @throws* @return bool*/private function handleMessage($message){$date = date('Y-m-d H:i:s');$routing_key = $message->getRoutingKey();$flush = '['.$date.']'.'队列(' . $routing_key . ')接收到消息,其内容为:' . $message->getBody();$message_body = json_decode($message->getBody(), true);$class = $message_body['handler_class'];$method = $message_body['method'];$data = $message_body['data'];$data['message_id'] = $message->get('message_id');if (!method_exists($class, $method)) {//方法不存在,忽略Yii::error('actionTaskConsume: class' . $class . ' ,method:' . $method . ' ,not exist');return false;}$reflect_method = new ReflectionMethod($class, $method);Yii::$app->db->close();Yii::$app->redis->close();$time1 = time();if ($reflect_method->isStatic()) {$res = $class::$method($data);if ($res === false) Yii::error($class::$static_error ?? '未定义错误');//if($res === false) throw new Exception($class::$static_error ?? '未定义错误');} else {$obj = new $class();$res = $obj->$method($data);if ($res === false) Yii::error($obj->error ?? '未定义错误');//if($res === false) throw new Exception($obj->error ?? '未定义错误');}$time2 = time();$total_time = $time2 - $time1;if ($total_time > 1) $flush .= ',队列耗时:' .$total_time . '秒';echo $flush . PHP_EOL . PHP_EOL;return $res;}/*** 重启消费者* @return void*/public function actionRestartConsume(){}
}

http://www.ppmy.cn/embedded/28513.html

相关文章

Spring Clound介绍

Spring Cloud 是一系列框架的集合&#xff0c;它利用 Spring Boot 的开发便利性简化了分布式系统&#xff08;例如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话和集群状态&#xff09;的开发。Spring Cloud 旨在为开发者…

Linux专栏07:Linux基本指令之文件搜索指令

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Linux专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Linux基本指令之文件搜索指令 编号&#xff1a;07 文章目录 Linux基…

CRC计算-Verilog实现

一、前言 循环冗余校验&#xff08;Cyclic Redundancy Check&#xff0c; CRC&#xff09;是一种根据网络数据包和计算机文件等数据产生简短固定位数校验码的一种信道编码技术&#xff0c;主要用来检测或校验数据传输或者保存后可能出现的错误。&#xff08;只能检验错误&…

保存钉钉群直播回放下载:直播回放下载步骤详解

今天&#xff0c;我们就来拨开云雾&#xff0c;揭开保存钉钉群直播回放的神秘面纱。教会你们如何下载钉钉群直播回放 首先用到的工具我全部打包好了&#xff0c;有需要的自己下载一下 钉钉群直播回放工具下载&#xff1a;https://pan.baidu.com/s/1WVMNGoKcTwR_NDpvFP2O2A?p…

Python爬取豆瓣电影Top250数据

任务 爬取豆瓣电影top250中的影片名称、影片海报、年份、地区、类型、评分、评价人数、总体评价&#xff0c;并输出到douban_top250.xlsx文件中 环境 Python 3.8 requests bs4 openpyxl 源码 # 创建一个新的Excel工作簿 workbook openpyxl.Workbook() # 获取默认的工作表…

网络安全之弱口令与命令爆破(中篇)(技术进阶)

目录 一&#xff0c;什么是弱口令&#xff1f; 二&#xff0c;为什么会产生弱口令呢&#xff1f; 三&#xff0c;字典的生成 四&#xff0c;使用Burpsuite工具验证码爆破 总结 笔记改错 一&#xff0c;什么是弱口令&#xff1f; 弱口令就是容易被人们所能猜到的密码呗&a…

解析transformer中的各模块结构

transformer是一种编解码&#xff08;encoder-decoer&#xff09;结构&#xff0c;用于自然语言处理、计算机视觉等领域&#xff0c;编解码结构是当前大模型必包含的部分。 文章目录 1. 词嵌入模块 2.位置编码模块 3. 多头注意力机制模块 3.1 自注意力机制模块 3.2 多头注…

04_jvm性能调优_并行收集器介绍

并行收集器&#xff08;此处也称为吞吐量收集器&#xff09;是类似于串行收集器的分代收集器。串行和并行收集器之间的主要区别在于并行收集器具有多个线程&#xff0c;用于加速垃圾回收过程。 通过命令行选项-XX:UseParallelGC 可启用并行收集器。默认情况下&#xff0c;使用…