目录:
- 问题
- MQTT是什么以及为什么使用
- 如何使用:第一阶段、基础功能
- 如何使用:第二阶段、增加断网重连
- 如何使用:第三阶段、封装
https://i-blog.csdnimg.cn/direct/7bbf34b9bc99486fbfdadeb9d4f685a1.png" alt="在这里插入图片描述" />
一、问题
在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:
- 开发成本:需要后台创建接口,前台去请求。
- 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
- 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
- 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。
基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。
二、MQTT是什么以及为什么使用
MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。
你会发现传递的数据量是根据你的内容来决定。
能干吗:
1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。
三、如何使用:第一阶段、基础功能
- 如何连接:init方法
- 连接后如何订阅:subscribe方法
- 如何发送数据,如何接受数据:subscribe方法
/*** 测试环境的设备管理系统*/
class ManageMqtt {private var TAG = "MQTT"private var client: MqttAndroidClient? = null //mqtt客户端private lateinit var options: MqttConnectOptions //mqtt 的链接信息设置@Volatilevar isMqConnected: Boolean = false//初始化,fun init(context: Context?) {try {log("1")if (client != null) {return}log("1")//MQTT的连接设置options = MqttConnectOptions()//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.isCleanSession = true//重连尝试options.isAutomaticReconnect = true// 设置超时时间 单位为秒options.connectionTimeout = 10// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.keepAliveInterval = 90client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称//设置连接的用户名options.userName = "xxx"//设置连接的密码options.password = "xxx".toCharArray()//设置回调client?.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String) {log("已连接mq")isMqConnected = true//连接成功,我们要进行订阅subscribe("xxxx")}override fun connectionLost(cause: Throwable) {log("已断开mq")isMqConnected = false}override fun deliveryComplete(token: IMqttDeliveryToken) {//publish后会执行到这里 发布try {log("发送成功:" + token.message.toString())} catch (e: Exception) {e.printStackTrace()}}override fun messageArrived(topicName: String, message: MqttMessage) {//subscribe后得到的消息会执行到这里面 订阅//topicName 为主题try {//todo 收到消息,要进行一些处理的。log("收到消息:$topicName $message")} catch (e: Exception) {log("异常:$e")}}})connect()} catch (e: Exception) {e.printStackTrace()}}//进行链接private fun connect() {Thread(connect).start()
// Schedulers.io().scheduleDirect(connect)}private val connect = Runnable {if (client != null && client!!.isConnected) {return@Runnable}try {log("连接Mq............")client?.connect(options, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {log("Connection success")//todo 是否连接成功?要重连的。}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {log("Connection failure")//todo 是否连接成功?要重连的。}})} catch (e: Exception) {e.printStackTrace()}}//订阅信息fun subscribe(topic: String, qos: Int = 1) {try {client?.subscribe(topic, qos, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Subscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to subscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//发送消息fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {try {val message = MqttMessage()message.payload = msg.toByteArray()message.qos = qosmessage.isRetained = retainedclient?.publish(topic, message, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "$msg published to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to publish $msg to $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//释放资源fun closeMqtt() {try {if (client != null) {client!!.disconnect()client = null}} catch (e: java.lang.Exception) {e.printStackTrace()}}//打印logprivate fun log(msg: String) {Log.d(TAG, msg)}}
四、如何使用:第二阶段、断网重连
- 即使短暂断网,后面自己也还是可以重连恢复。
- 如果第一次没有连接上,增加第一次的断网重连
/*** 测试环境的设备管理系统*/
class ManageMqtt {private var context: Context? = nullprivate var TAG = "MQTT"private var client: MqttAndroidClient? = null //mqtt客户端private lateinit var options: MqttConnectOptions //mqtt 的链接信息设置@Volatilevar isMqConnected: Boolean = false//初始化,fun init(context: Context?) {this.context = contexttry {log("1")if (client != null) {return}log("1")//MQTT的连接设置options = MqttConnectOptions()//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.isCleanSession = true//重连尝试options.isAutomaticReconnect = true// 设置超时时间 单位为秒options.connectionTimeout = 10// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.keepAliveInterval = 90client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称//设置连接的用户名options.userName = "xxx"//设置连接的密码options.password = "xxx".toCharArray()//设置回调client?.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String) {log("已连接mq")isMqConnected = true//连接成功,我们要进行订阅subscribe("xxxx")}override fun connectionLost(cause: Throwable) {log("已断开mq")isMqConnected = false}override fun deliveryComplete(token: IMqttDeliveryToken) {//publish后会执行到这里 发布try {log("发送成功:" + token.message.toString())} catch (e: Exception) {e.printStackTrace()}}override fun messageArrived(topicName: String, message: MqttMessage) {//subscribe后得到的消息会执行到这里面 订阅//topicName 为主题try {//todo 收到消息,要进行一些处理的。 Eventbuslog("收到消息:$topicName $message")} catch (e: Exception) {log("异常:$e")}}})connect()} catch (e: Exception) {e.printStackTrace()}val intentFilter = IntentFilter()intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)context?.registerReceiver(netWorkBroadCastReciver,intentFilter)}//进行链接private fun connect() {Thread(connect).start()
// Schedulers.io().scheduleDirect(connect)}private val connect = Runnable {if (client != null && client!!.isConnected) {return@Runnable}try {log("连接Mq............")client?.connect(options, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {log("Connection success")//todo 是否连接成功?要重连的。}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {log("Connection failure")//todo 是否连接成功?要重连的。}})} catch (e: Exception) {e.printStackTrace()}}//订阅信息fun subscribe(topic: String, qos: Int = 1) {try {client?.subscribe(topic, qos, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Subscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to subscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//发送消息/*** @param topic 主题 给这个主题发送消息* @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次* @param retained 发布后是否保留,即重新链接时会存在* @param msg 消息*/fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {try {val message = MqttMessage()message.payload = msg.toByteArray()message.qos = qosmessage.isRetained = retained //发布后是否保留,即重新链接时会存在client?.publish(topic, message, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "$msg published to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to publish $msg to $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//释放资源fun closeMqtt() {try {if (client != null) {client!!.disconnect()client = null}} catch (e: java.lang.Exception) {e.printStackTrace()}context?.unregisterReceiver(netWorkBroadCastReciver)}//打印logprivate fun log(msg: String) {Log.d(TAG, msg)}private var networkState = 100//断网重连查询fun isNetConnected(context: Context): Boolean {Log.d(TAG, "isNetConnected: ")val connectivity =context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManagerif (connectivity != null) {val info = connectivity.activeNetworkInfoif (info != null) {if (info.type == networkState) {return false}networkState = info.typeif (info.type == (ConnectivityManager.TYPE_WIFI)) {if (!isMqConnected) {connect()}return true} else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {if (!isMqConnected) {connect()}return true}}}return false}var netWorkBroadCastReciver = object :BroadcastReceiver() {override fun onReceive(context: Context, intent: Intent?) {isNetConnected(context)log( "NetWorkBroadCastReciver: ")}}}
五、如何使用:第三阶段、封装
- 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/*** 对Mqtt操作的进一步封装*/
@Singleton
class MqttHelper @Inject constructor() {@Injectlateinit var mqtt: ManageMqtt/*** 注册*/fun register(context: Context?){mqtt.init(context)}/*** 发送数据*/fun sendData(data :String){Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }}/*** 接收数据*/fun data(kind:String,data:String){//待定,一般都是通过eventbus来解决。}
}
好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。