1、创建配置文件:config/queue.php
<?php
return ['default' => 'redis','connections' => ['sync' => ['type' => 'sync',],'database' => ['type' => 'database','queue' => 'default','table' => 'jobs','connection' => null,],'redis' => ['type' => 'redis','queue' => 'default','host' => '127.0.0.1','port' => 6379,'password' => '','select' => 1,'timeout' => 0,'persistent' => false,],],'failed' => ['type' => 'none','table' => 'failed',],
];
2、创建abstract类:app/job/Queue.php
<?php
namespace app\job;use think\App;
use think\facade\Log;
use think\queue\Job;abstract class Queue
{/*** 创建任务*/public function fire(Job $job, $data){if (!$data) {Log::error(sprintf(' ==> [%s][%s] 队列无消息', __CLASS__, __FUNCTION__));}$result = $this->execute($data);if ($result) {Log::record(sprintf(' ==> [%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));$job->delete();} else {if ($job->attempts() > 3) {Log::error(sprintf(' ==> [%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));$job->delete();} else {Log::error(sprintf(' ==> [%s][%s] 队列执行失败,延迟3秒后重试', __CLASS__, __FUNCTION__));$job->release(3);}}}/*** 执行业务逻辑*/abstract protected function execute($data): bool;
}
3、创建业务逻辑处理类:app/job/Task.php
<?php
namespace app\job;use app\job\Queue;
use think\App;
use think\facade\Log;class Task extends Queue
{/*** 执行业务逻辑*/protected function execute($data): bool{Log::record(sprintf(' ==> [%s][%s] 执行成功', __CLASS__, __FUNCTION__));return true;}
}
4、添加消息队列
$handle_name = 'app\job\Task';
$queue_name = 'task';
$params = ['type' => 'test','record_id' => 123,
];
// 立即执行
$pushed = Queue::push($handle_name, json_encode($params, JSON_UNESCAPED_UNICODE), $queue_name);
// 延迟5秒执行
// $pushed = Queue::later(5, $handle_name, $params, $queue_name);
5、命令行执行队列
php think queue:listen --queue task
6、supervisor配置守护进程:/supervisor/example.conf
[program:example]
command=php think queue:work --queue task
directory=/www/example
process_name=%(program_name)s_%(process_num)02d
user=www
autostart=true
autorestart=true
startsecs=1
startretries=20
numprocs=1
priority=100
stdout_logfile_maxbytes=2MB
stderr_logfile_maxbytes=2MB
stdout_logfile=/etc/supervisor/logs/example.out.log
stderr_logfile=/etc/supervisor/logs/example.err.log