Unity MQTT
最近接到一个物联网相关的项目,那边要求使用MQTT来进行通讯,第一次接触这个东西,所以写篇文档简单介绍下。
简介
MQTT(消息队列遥测传输) 是一种轻量级的消息传输协议,它可以用于连接 IoT 设备和应用程序。MQTT 由 IBM 在 1999 年首次开发,现在已成为一个开放的 OASIS 标准。MQTT 的设计目标是提供一个简单、可靠、低功耗、易于实现的消息传输协议,适用于各种网络环境和设备。
工作原理
MQTT 协议使用发布-订阅模式,其中消息发布者将消息发布到特定主题(Topic),而订阅者则通过订阅相应的主题来接收消息。MQTT 服务器(Broker)负责接收所有的消息,并将它们路由到对应的订阅者。MQTT 支持多个订阅者和发布者,因此可以构建复杂的消息传输网络。如果写过unity事件系统,就会发现是比较类似的。
主流的 MQTT 是基于 TCP 连接进行数据推送的,但是同样也有基于 UDP 的版本,叫做 MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
协议
-
客户端(Client):MQTT 客户端可以是任何设备或应用程序,包括传感器、嵌入式系统、智能手机或桌面应用程序。客户端通过连接到 MQTT 服务器(Broker)来发送或接收消息。
-
主题(Topic):MQTT 消息可以被发布到一个或多个主题上。主题是消息的标识符,订阅者可以通过订阅主题来接收对应的消息。
-
QoS(Quality of Service):MQTT 支持三种不同的 QoS 等级:
- QoS 0:最多一次传输。消息发布者只发送一次消息,不保证消息是否被接收。
- QoS 1:至少一次传输。消息发布者发送一条消息,确保至少被接收一次。
- QoS 2:恰好一次传输。消息发布者发送一条消息,并确保只被接收一次。
-
保留消息(Retained Message):保留消息是一种特殊类型的消息,它会被保存在 MQTT 服务器上,并在新的订阅者连接到主题时自动发送。这对于提供实时数据更新非常有用。
-
保留消息(Retained Message):保留消息是一种特殊类型的消息,它会被保存在 MQTT 服务器上,并在新的订阅者连接到主题时自动发送。这对于提供实时数据更新非常有用。
特性
-
轻量级:MQTT 协议非常轻量,可以在资源受限的设备上运行。
-
可靠性:MQTT 支持 QoS 等级,可以确保消息的可靠传输。
-
灵活性:MQTT 支持多种主题过滤方式,可以根据需要定制订阅规则。
-
易于实现:MQTT 协议的实现非常简单,可以在各种编程语言和操作系统上使用。
-
开放标准:MQTT 是一个开放标准,有许多开源实现和工具可供使用。
unity中的使用
MQTT版本
目前 MQTT 主流版本有两个,分别是 MQTT3.1.1 和 MQTT5。MQTT3.1.1 是在 2014 年 10 月发布的,而 MQTT5 是在 2019 年 3 月发布的。虽然 MQTT3.1.1 与 MQTT5 在时间相差了将近五年,但是 MQTT3.1.1作为一个经典的版本,目前仍然是主流版本,能够满足大部分实际需求。
MQTT5 是在 MQTT3.1.1 的基础上进行了升级,因此 MQTT5 是完全兼容 MQTT3.1.1 的。而 MQTT5 是 在 MQTT3.1.1 的基础上添加了更多的功能、补充完善 MQTT 协议。
在这里我们使用MQTT5。
MQTT X
MQTT X 是一款开源的 MQTT 5.0 桌面测试客户端,它支持在 macOS,Linux,Windows 操作系统上运行。
工具下载地址:MQTT X: Cross-platform MQTT 5.0 Desktop Client
MQTTnet.dll
要在unity中使用,有一些现成的插件可以使用,这里我们选择了MQTTnet.dll。
github工程:GitHub - dotnet/MQTTnet
也可以在vs中通过NuGet来直接MQTTnet.dll。
获取到dll文件后,导入到unity工程中的Plugins文件夹下就可以。
客户端
客户端主要需要实现的功能就是连接服务器,断线重连,消息订阅,消息监听等。
这是我自己写的一个例子DEMO,希望能帮助到大家。
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using MQTTnet.Client;
using MQTTnet;
using System.Threading.Tasks;
using MQTTnet.Packets;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;/// <summary>
/// MQTT管理器
/// </summary>
public class MQTTManager : MonoSingleton<MQTTManager>
{private static readonly string MQTTURI = "broker.emqx.io";private static readonly int MQTTPort = 1883;private static readonly string MQTTUser = "";private static readonly string MQTTPassword = "";private string _ClientID;/// <summary>/// 重连间隔时间(s)/// </summary>private static readonly float ReconnectGapTime = 10;/// <summary>/// 客户端/// </summary>private IMqttClient _Client;/// <summary>/// 失败次数/// </summary>private int _FailCount;/// <summary>/// 当前状态/// </summary>private MQTTStatus _Status;/// <summary>/// 等待时间/// </summary>private float _WaitTime;// Start is called before the first frame updatevoid Start(){_ClientID = System.Guid.NewGuid().ToString();Task.Run(InitMQTT);}// Update is called once per framevoid Update(){if (_Status == MQTTStatus.Failed){_WaitTime += Time.deltaTime;if (_WaitTime > ReconnectGapTime){_WaitTime = 0;Reconnect();}}}/// <summary>/// 初始化MQTT信息/// </summary>private void InitMQTT(){MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder().WithTcpServer(MQTTURI, MQTTPort) // 要访问的mqtt服务端的 ip 和 端口号.WithCredentials(MQTTUser, MQTTPassword) // 要访问的mqtt服务端的用户名和密码.WithClientId(_ClientID) // 设置客户端id.WithCleanSession().WithTls(new MqttClientOptionsBuilderTlsParameters{UseTls = false // 是否使用 tls加密});MqttClientOptions clientOptions = builder.Build();_Client = new MqttFactory().CreateMqttClient();_Client.ConnectedAsync += Client_ConnectedAsync; // 客户端连接成功事件_Client.DisconnectedAsync += Client_DisconnectedAsync; // 客户端连接关闭事件_Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; ; // 收到消息事件_Status = MQTTStatus.Connecting;_Client.ConnectAsync(clientOptions);}/// <summary>/// 重新连接/// </summary>private void Reconnect(){LogManager.LogInfo("重新连接");Task.Run(delegate (){_Status = MQTTStatus.Connecting;_Client.ReconnectAsync();});}/// <summary>/// 新消息事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private System.Threading.Tasks.Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){string str = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);Loom.QueueOnMainThread(() =>{LogManager.LogInfo("收到消息:" + arg.ApplicationMessage.Topic + "=====" + str);});return Task.CompletedTask;}/// <summary>/// 连接断开事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private System.Threading.Tasks.Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Loom.QueueOnMainThread(() =>{LogManager.LogInfo("MQTT连接断开:" + arg.Reason);});_Status = MQTTStatus.Failed;_FailCount++;return Task.CompletedTask;}/// <summary>/// 连接成功事件/// </summary>/// <param name="arg"></param>/// <returns></returns>private System.Threading.Tasks.Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg){_Status = MQTTStatus.Connected;Loom.QueueOnMainThread(() =>{LogManager.LogInfo("MQTT连接成功");_FailCount = 0;});return Task.CompletedTask;}/// <summary>/// 发布消息/// </summary>public static void PublishAsync(MqttApplicationMessage message){if (_Instance){_Instance._Client.PublishAsync(message);}}/// <summary>/// 订阅消息/// </summary>public static void SubscribeAsync(MqttClientSubscribeOptions options){if (_Instance){_Instance._Client.SubscribeAsync(options);}}/// <summary>/// 取消订阅消息/// </summary>public static void UnsubscribeAsync(MqttClientUnsubscribeOptions options){if (_Instance){_Instance._Client.UnsubscribeAsync(options);}}/// <summary>/// 状态/// </summary>public enum MQTTStatus{Empty = 0,/// <summary>/// 连接中/// </summary>Connecting = 1,/// <summary>/// 连接成功/// </summary>Connected = 2,/// <summary>/// 连接失败/// </summary>Failed = 3,}
}
尾语
参考文档:一文带你搞懂 MQTT - 知乎 (zhihu.com)
如果有写的不好的地方,欢迎各位大佬留言批评指正。