EMQX安装
EMQX服务器安装
安装文档,见链接不另外写
https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
启动 EMQX
启动为一个 systemd 服务:
sudo systemctl start emqx
在windows安装客户端
在线 MQTT WebSocket
客户端工具,MQTTX Web
是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.
建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX
,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。
由于是后期被写的博文,图是借官方的。请自行区分一下。
平台安装后的地址
1,平台的地址
- http://127.0.0.1:18083
后台登录 用户名:test 密码:test
MQTT_22">Laravel中处理MQTT订阅
MQTT_23">1,安装MQTT客户端库
在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:
composer require php-mqtt/client
2, 新建command文件
文件路径:app/Console/Commands/MqttClientCommand.php
这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。
MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。
MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。
<?phpnamespace App\Console\Commands;use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;class MQTTUserConfig
{ const SIMPS_MQTT_REMOTE_HOST = '*';const SIMPS_MQTT_PORT = 1883;const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;const SIMPS_MQTT_USER = 'test*';const SIMPS_MQTT_PASSWORD = 'test*';
}class MqttClientCommand extends Command
{protected $signature = 'mqtt:handle {param1}';protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';protected $mqtt ;const SWOOLE_MQTT_CONFIG = ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,];//模拟设备const CLiENT_IDs = ['mqttx_devA','mqttx_devB','mqttx_devC','mqttx_devD'];public function __construct(){parent::__construct();}public function handle(){$param1 =$this->argument('param1');
// $param2 =$this->argument('param2');if ($param1=='subscribe') {$this->info('启动订阅...');$this->subscribeMqtt();} elseif ($param1=='public') {$this->info('启动发布...');$this->publishMQTT();}echo '\r\n\r\n分配工作执行完成!!!';}protected function getTestMQTT5ConnectConfig(){$config = new ClientConfig();$UserConfig = new MQTTUserConfig();return $config->setUserName($UserConfig::SIMPS_MQTT_USER)->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)->setClientId(Client::genClientID())->setKeepAlive(10)->setDelay(3000) // 3s->setMaxAttempts(5)->setProperties(['session_expiry_interval' => 60,'receive_maximum' => 65535,'topic_alias_maximum' => 65535,])->setProtocolLevel(5)->setSwooleConfig( ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,]);}private function heartbeat($message) {if ($message) {parse_str($message,$array);$device = $array['imei'];$hash = ':mqtt:heartbeat:online'.":{$device}";Redis::expire($hash,30); ##30s有效Redis::sAdd($hash,1);}}/** 订阅* private function subscribeMqtt(){Coroutine\run(function () {$client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());....*/private function subscribeMqtt(){Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,$this->getTestMQTT5ConnectConfig());$will = ['topic' => 'simps-mqtt/dinweiyi/delete','qos' => 1,'retain' => 0,'message' => 'byebye','properties' => ['will_delay_interval' => 60,'message_expiry_interval' => 60,'content_type' => 'test','payload_format_indicator' => true, // false 0 1],];$client->connect(true, $will);$topics['simps-mqtt/dinweiyi/subscribe_message'] = ['qos' => 2,'no_local' => true,'retain_as_published' => true,'retain_handling' => 2,];$res = $client->subscribe($topics);$timeSincePing = time();var_dump($res);echo '\r\n\r\n connect success !!!';while (true) {try {$buffer = $client->recv();$message = null;if ($buffer && $buffer !== true) {$message = $buffer["message"];// QoS1 PUBACKif ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {$client->send(['type' => Types::PUBACK,'message_id' => $buffer['message_id'],],false);}if ($buffer['type'] === Types::DISCONNECT) {echo sprintf("Broker is disconnected, The reason is %s [%d]\n",ReasonCode::getReasonPhrase($buffer['code']),$buffer['code']);$client->close($buffer['code']);break;}$reportObj = new DeviceReportController();$ret = $reportObj->store($message);var_dump("182>>>",$ret);unset($reportObj);}if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {$buffer = $client->ping();if ($buffer) {echo 'send ping success ...' ;$this->heartbeat($message);$timeSincePing = time();}}} catch (\Throwable $e) {throw $e;}}});}protected function getMessage() {$client_ids = ['mqttx_devA',
// 'mqttx_devB','mqttx_devC','mqttx_devD'];$message = [];$message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];$message['time'] = time();$message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];return json_encode($message);}/** 发布*/public function publishMQTT() {Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,$this->getTestMQTT5ConnectConfig());$client->connect();while (true) {$message = $this->getMessage();$response = $client->publish('simps-mqtt/user/subscribe_message',$message,1,0,0,['topic_alias' => 1,'message_expiry_interval' => 12,]);var_dump( 'publishMQTT>>>',$message);Coroutine::sleep(1);}});}}
3, 代码流程图
使用Mermaid语法描述的上述PHP代码的流程图:
流程说明:
- 开始:程序启动。
- 构造函数 __construct:初始化命令行工具。
- handle 方法:处理命令行输入。
- param1 参数:根据输入的参数决定是订阅还是发布。
- 调用 subscribeMqtt:如果参数是
subscribe
,则调用此方法。 - 调用 publishMQTT:如果参数是
public
,则调用此方法。 - Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
- 订阅主题:订阅特定的MQTT主题。
- 接收消息:持续监听并接收消息。
- 处理消息:对接收到的消息进行处理。
- 心跳函数 heartbeat:检查设备心跳。
- 存储消息:将消息存储到数据库或其他存储系统。
- 是否断开连接:检查客户端是否断开连接。
- 关闭连接:如果断开,则关闭连接。
- Coroutine 运行 publishMQTT:在协程中运行发布方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 循环发布消息:循环发布消息。
- 获取测试消息:生成要发布的测试消息。
- 发布消息:将消息发布到MQTT服务器。
- 结束:程序结束。
后台常驻运行
1,php artisan命令在后台运行
- 打开您的终端或SSH到您的服务器。
- 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe
3.命令行的php的版本与web php的版本号要一致
2,使用宝塔的守护进程开启进程
也可以添加守护进程。
以上2种最好是只选一个
测试
打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket
主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容
检查发送的结果
打开数据库,检查device_report表是否成功。成功应下图所示: