thinkphp6+swoole使用rabbitMq队列

ops/2025/2/3 11:11:59/
  1. 安装think-swoole
  2. 安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用
  3. 安装rabbitMq延迟队列插件
    1. 安装 rabbitmq_delayed_message_exchange 插件,按照以下步骤操作:  
      下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases以下路由不一定是一样的!!!
      将插件复制到 RabbitMQ 插件目录: 将下载的插件文件复制到 RabbitMQ 插件目录。  
      sudo cp rabbitmq_delayed_message_exchange-3.8.9.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/
      将 <version> 替换为您的 RabbitMQ 服务器版本。  
      启用插件: 使用 RabbitMQ 命令行工具启用插件。  
      sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      重启 RabbitMQ: 重启 RabbitMQ 服务器以应用更改。  
      sudo systemctl restart rabbitmq-server

  4. config目录创建 rabbitmq.php 文件,内容如下
return ['host' => '服务器地址','port' => '端口','user' => '账户','password' => '密码','vhost' => '/','exchange' => 'delayed_exchange','exchange_type' => 'direct', // 交换机类型(如 direct、fanout、topic)'exchange_arguments' => ['x-delayed-type' => 'direct'], // 延迟交换机参数
];

创建 RabbitMQService 类


class RabbitMQService
{protected $connection;protected $channel;public function __construct(){$config = config('rabbitmq');$this->connection = new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);$this->channel = $this->connection->channel();$this->channel->exchange_declare($config['exchange'],'x-delayed-message', // 指定延迟交换机类型false,true,false,false,false,new AMQPTable(['x-delayed-type' => $config['exchange_type']]) // 设置延迟交换机的底层类型);}public function publish($message, $queue, $delay = 0){// 声明队列$this->channel->queue_declare($queue, false, true, false, false);// 绑定队列到交换机$this->channel->queue_bind($queue, config('rabbitmq.exchange'));// 设置延迟头部信息$headers = new AMQPTable(['x-delay' => $delay // 延迟时间,单位为毫秒]);// 创建消息$msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 持久化消息]);$msg->set('application_headers', $headers); // 正确设置头信息// 发布消息到交换机$this->channel->basic_publish($msg, config('rabbitmq.exchange'));}public function consume($queue, $callback){$this->channel->queue_declare($queue, false, true, false, false);$this->channel->basic_consume($queue, '', false, true, false, false, $callback);while ($this->channel->is_consuming()) {$this->channel->wait();}}public function __destruct(){$this->channel->close();$this->connection->close();}
}

创建 RabbitMqUseService 类文件


class RabbitMqUseService
{// 消费队列public static function consumption(){$rabbitMQ = new RabbitMQService();$rabbitMQ->consume('queue', function ($msg){Log::error('消费队列'.$msg->body);$con = json_decode($msg->body,true);$class = $con['class'];Log::error("class->>".$class);if(class_exists($class)){$obj = new $class;$obj->handle($con['body']);}});}/*** @param $obj* @param $data* @param $delay* @return void*/public static function push($obj,$data,$delay = 0){$rabbitMQ = new RabbitMQService();$class = get_class($obj);// 构造消息体$message = json_encode(['class' => $class,  // 类名'body' => $data     // 具体数据]);$rabbitMQ->publish($message, 'queue', $delay * 1000);var_dump('已加入');}public function test(){self::push(new TestJob(),['name'=>'test'],10);}
}

配置消费任务

新建文件类 RabbitConsumptionHandleclass RabbitConsumptionHandle
{public function handle(){RabbitMqUseService::consumption();}
}在app/event.php listen 中引入'listen'    => ['swoole.init' => [RabbitConsumptionHandle::class]]

新增队列

      RabbitMqUseService::push(new \app\job\TestJob(),['a'=>1,'b'=>2]);


http://www.ppmy.cn/ops/155288.html

相关文章

android 音视频系列引导

音视频这块的知识点自己工作中有用到&#xff0c;一直没有好好做一个总结&#xff0c;原因有客观和主观的。 客观是工作太忙&#xff0c;没有成段时间做总结。 主观自己懒。 趁着这次主动离职拿了n1的钱&#xff0c;休息一下&#xff0c;对自己的人生做一下总结&#xff0c;…

车载软件架构 --- 基于AUTOSAR软件架构的ECU开发流程小白篇

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 简单&#xff0c;单纯&#xff0c;喜欢独处&#xff0c;独来独往&#xff0c;不易合同频过着接地气的生活…

MySQL数据库环境搭建

下载MySQL 官网&#xff1a;https://downloads.mysql.com/archives/installer/ 下载社区版就行了。 安装流程 看b站大佬的视频吧&#xff1a;https://www.bilibili.com/video/BV12q4y1477i/?spm_id_from333.337.search-card.all.click&vd_source37dfd298d2133f3e1f3e3c…

操作系统和中间件的信息收集

在浏览器中收集操作系统与中间件信息时&#xff0c;主要通过客户端JavaScript&#xff08;用于操作系统/浏览器信息&#xff09;和服务器端脚本&#xff08;用于中间件信息&#xff09;实现。以下是分步指南&#xff1a; 一、客户端操作系统信息收集&#xff08;JavaScript&am…

毕业设计--具有车流量检测功能的智能交通灯设计

摘要&#xff1a; 随着21世纪机动车保有量的持续增加&#xff0c;城市交通拥堵已成为一个日益严重的问题。传统的固定绿灯时长方案导致了大量的时间浪费和交通拥堵。为解决这一问题&#xff0c;本文设计了一款智能交通灯系统&#xff0c;利用车流量检测功能和先进的算法实现了…

【人工智能学习笔记 一】 AI分层架构、基本概念分类与产品技术架构

新的一年2025要对AI以及LLM有个强化的学习&#xff0c;所以第一篇先对整体有个大概的认知&#xff0c;一直分不清LLM和AI的关系&#xff0c;在整个体系里的位置&#xff0c;以及AIGC是什么东西&#xff0c;AI AGENT类似豆包等和大语言模型的具体关系是什么&#xff0c;整个AI的…

独立开发经验谈:如何借助 AI 辅助产品 UI 设计

我在业余时间开发了一款自己的独立产品&#xff1a;升讯威在线客服与营销系统。陆陆续续开发了几年&#xff0c;从一开始的偶有用户尝试&#xff0c;到如今线上环境和私有化部署均有了越来越多的稳定用户&#xff0c;在这个过程中&#xff0c;我也积累了不少如何开发运营一款独…

AI协助探索AI新构型的自动化创新概念

训练AI自生成输出模块化代码&#xff0c;生成元代码级别的AI功能单元代码&#xff0c;然后再由AI组织为另一个AI&#xff0c;实现AI开发AI的能力&#xff1b;用AI协助探索迭代新构型AI将会出现&#xff0c;并成为一种新的技术路线潮流。 有限结点&#xff0c;无限的连接形式&a…