记一次hyperf框架封装swoole自定义进程

news/2024/11/28 22:53:11/
背景

公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。

自定义启动服务封装
<?php
/*** 进程启动服务【manager】*/
declare(strict_types=1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** @Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';/*** @var ContainerInterface*/protected $container;protected $coroutine = false;public function __construct(ContainerInterface $container){$this->container = $container;parent::__construct('task');}public function configure(){parent::configure();$this->setDescription('自定义进程任务');$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');}public function handle(){$action = $this->input->getArgument('action');if ($action === 'start') {$this->start();} elseif ($action === 'stop') {$this->stop();} elseif ($action === 'restart') {$this->restart();} else {echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;}}/*** 重启:php bin/hyperf.php task restart*/protected function restart(){$this->stop();$this->start();}/*** 停止:php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表,根据状态进行重启$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo "stopping...\n";echo "kill pid $managerPid \n";$managerPid = intval($managerPid);$startTime = time();$timeout = config('server.settings.max_wait_time', 10);@Process::kill($managerPid);//等待主进程结束while (@Process::kill($managerPid, 0)) {//waiting process stopecho "waiting...\r";usleep(100000);echo "              \r";echo "waiting.\r";usleep(100000);echo "              \r";//超时 强杀所有子进程if ($managerPid > 0 && time() - $startTime >= $timeout) {echo "wait timeout, kill -9 child process, pid: $managerPid \n";echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo "stopped. \n";} else {echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动:php bin/hyperf.php task start* 守护进程启动:php bin/hyperf.php task start -d*/protected function start(){$processConfig = config('processes');if ($processConfig) {echo "start now.\n";$daemonize = $this->input->getOption('daemonize');if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程BaseProcess::setProcessName('manager');//主进程$startFuncMap = [];foreach ($processConfig as $processClass) {$processObj = new $processClass;if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {for ($i = 0; $i < $processObj->nums; $i++) {$startFuncMap[] = [[$processObj, 'handle'],$processObj->enableCoroutine ?? false,$i,];}}}$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm = $func[0];//process下的类$idx += 1;BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {echo 'process Message,data=' .json_encode($data). PHP_EOL;});//进程关闭$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {echo "process WorkerId={$workerId} is stopped". PHP_EOL;});$pool->start();} else {printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);}}/*** 查看运行状态:php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid = -1){if ($pid === -1) {$pid = getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}
基础process封装

此处可以用hyperf框架自带的,也可以自己封装

<?phpdeclare (strict_types = 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* @var integer*/public $nums = 0;/*** 进程名称* @var string*/public $name = '';/*** 是否启用协程* @var bool*/public $enableCoroutine = true;/*** 是否随进程启动服务* @var bool*/public $isEnable = true;protected $isRunning = true;protected $process;static $signal = 0;function __construct() {//进程自动命名if (empty($this->name)) {$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');}}final public function handle(Pool $pool, int $workerId): void {try {$this->processInit($pool->getProcess());$this->beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal === SIGTERM) {$this->onProcessExit();break;}$this->run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this->isRunning = false;}protected function processInit($process) {$this->process = $process;echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal = SIGTERM;$maxWaitTime = config('server.settings.max_wait_time', 5);$sTime = time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat = \Swoole\Coroutine::stats();//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {Swoole\Timer::clearAll();$this->process->exit();}//等待超时,强制结束进程elseif (time() - $sTime >= $maxWaitTime) {Swoole\Timer::clearAll();if ($this->isRunning) {$this->onProcessExit();}$this->process->exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);}/*** 事件循环前调用* @return [type] [description]*/abstract function beforeRun();/*** 事件循环,注意这里不能使用死循环* @return [type] [description]*/abstract function run();}
使用demo

demo1

<?phpdeclare (strict_types = 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* @var integer*/public $nums = 5;public $enableCoroutine = true;/*** 不随服务启动进程* @var bool */public $isEnable = false;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体echo date('Y-m-d H:i:s').PHP_EOL;usleep(1000);}}

demo2

<?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';/*** 进程数* @var integer*/public $nums = 1;public $enableCoroutine = true;/*** 随服务启动进程* @var bool */public $isEnable=true;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体$this->updateZeroStock();echo date('Y-m-d H:i:s').PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();$container = ApplicationContext::getContainer();$redis = $container->get(Redis::class);$producer = ApplicationContext::getContainer()->get(Producer::class);$today = date('Y-m-d');if($list_hq){foreach ($list_hq as $item){$spu = trim($item['spu']);$zeroStockKey =  $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){$sendData = $item;$sendData['appKey'] = $this->appSecretKey();$sendData['platform'] = 'hqchip';$message = new UpdateZeroStockProducer($sendData);$res = $producer->produce($message);echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* @param $brand* @param $sku* @return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;}/*** 密钥生产* @return string*/private function appSecretKey(){$a = 'chipmall-spider&V2&' . date('Y-m-d');$appKey = base64_encode(md5($a)   .'||'. base64_encode(time() . '|' . $a));return $appKey;}
}
在配置中进程需要执行的服务

在这里插入图片描述

以守护进程方式启动服务
php bin/hyperf.php task start -d

在这里插入图片描述
查看进程命令

ps -ef|grep taskProcess

在这里插入图片描述

疑惑

这次封装还存在两个点需要完善!!!
1.重复执行:

php bin/hyperf.php task start -d

会启动多个manager进程

2、没有封装查看进程状态的status方法


http://www.ppmy.cn/news/1121368.html

相关文章

靶场练习——SDcms文件上传漏洞靶场

文章目录 前言一、寻找网站后台页面1、点击请登录&#xff0c;查看URL2、修改URL参数&#xff0c;找到后台登录页面 二、登录后台管理系统1、不能使用爆破2、使用弱口令登录 三、寻找文件上传点四、上传文件操作1、上传普通的图片文件&#xff0c;查看数据包2、尝试上传PHP文件…

基于Spring Boot的网上租贸系统

目录 前言 一、技术栈 二、系统功能介绍 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 本课题是根据用户的需要以及网络的优势建立的一个基于Spring Boot的网上租贸系统&#xff0c;来满足用户网络商品租赁的需求。 本网上租贸系统应用Java技术&#xff0…

使用香橙派学习Linux udev的rules 并实现U盘的自动挂载

在之前编程首先语音刷抖音的博文里提到过udev&#xff0c;现在回顾一下&#xff1a; 什么是udev&#xff1f; udev是一个设备管理工具&#xff0c;udev以守护进程的形式运行&#xff0c;通过侦听内核发出来的uevent来管理/dev目录下的设备文件。udev在用户空间运行&#xff0c;…

研究生选控制嵌入式还是机器视觉好?

研究生选控制嵌入式还是机器视觉好&#xff1f; 我是嵌入式/硬件方向转的算法&#xff0c;现在是公司的算法负责人&#xff0c;如果再让我选一次&#xff0c;我是不会再选嵌入式方 向&#xff0c;嵌入式如果只做技术是没前途的。 你要是有一定自学能力&#xff0c;能自己在学校…

CentOS 7 制作openssl 1.1.1w 版本rpm包 —— 筑梦之路

源码下载地址&#xff1a; https://www.openssl.org/source/openssl-1.1.1w.tar.gz 参考之前的文章&#xff1a; openssl 1.1.1L /1.1.1o/1.1.1t rpm包制作——筑梦之路_openssl的rpm包_筑梦之路的博客-CSDN博客 直接上spec文件&#xff1a; Name: openssl Version: 1.1…

递归常用的三种枚举方式

一.递归实现指数型枚举 #include<bits/stdc.h> using namespace std; #define IOS ios::sync_with_stdio(0),cin.tie(0),cout.tie(0) #define endl \n typedef long long LL; typedef pair<int,int> PII; bool st[20]; int n;void dfs(int u) {if(u > n1){for(…

电路的基本定律——基尔霍夫定律

基尔霍夫定律 &#x1f391;预备知识&#x1f391;基尔霍夫电流定律(KCL)&#x1f383;基尔霍夫电流定律的本质&#xff1a;节点上电荷具有连续性(不会突变)&#x1f383;基尔霍夫电流定律的推广&#xff1a; &#x1f391;基尔霍夫的电压定律(KVL)&#x1f383;基尔霍夫电压定…

Java 项目-基于 SpringBoot+Vue的疫情网课管理系统

文章目录 第一章 简介第二章 技术栈第三章 系统分析3.4.2学生用例 第四章 系统设计第五章 系统实现5.1学生功能模块5.2管理员功能模块5.3教师功能模块 六 源码咨询 第一章 简介 疫情网课也都将通过计算机进行整体智能化操作&#xff0c;实现的功能如下。 例如 管理员&#x…