Android 消息队列之MQTT的使用:物联网通讯,HTTP太重了,使用MQTT;断网重连、注册、订阅、发送数据和接受数据,实现双向通讯。

server/2024/12/27 3:43:13/
http://www.w3.org/2000/svg" style="display: none;">

目录:

  1. 问题
  2. MQTT是什么以及为什么使用
  3. 如何使用:第一阶段、基础功能
  4. 如何使用:第二阶段、增加断网重连
  5. 如何使用:第三阶段、封装

https://i-blog.csdnimg.cn/direct/7bbf34b9bc99486fbfdadeb9d4f685a1.png" alt="在这里插入图片描述" />


一、问题

在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:

  1. 开发成本:需要后台创建接口,前台去请求。
  2. 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
  3. 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
  4. 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。

基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。


二、MQTT是什么以及为什么使用

MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。

你会发现传递的数据量是根据你的内容来决定。

能干吗:

1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。


三、如何使用:第一阶段、基础功能

  1. 如何连接:init方法
  2. 连接后如何订阅:subscribe方法
  3. 如何发送数据,如何接受数据:subscribe方法
/*** 测试环境的设备管理系统*/
class ManageMqtt {private var TAG = "MQTT"private var client: MqttAndroidClient? = null //mqtt客户端private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置@Volatilevar isMqConnected: Boolean = false//初始化,fun init(context: Context?) {try {log("1")if (client != null) {return}log("1")//MQTT的连接设置options = MqttConnectOptions()//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.isCleanSession = true//重连尝试options.isAutomaticReconnect = true// 设置超时时间 单位为秒options.connectionTimeout = 10// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.keepAliveInterval = 90client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称//设置连接的用户名options.userName = "xxx"//设置连接的密码options.password = "xxx".toCharArray()//设置回调client?.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String) {log("已连接mq")isMqConnected = true//连接成功,我们要进行订阅subscribe("xxxx")}override fun connectionLost(cause: Throwable) {log("已断开mq")isMqConnected = false}override fun deliveryComplete(token: IMqttDeliveryToken) {//publish后会执行到这里  发布try {log("发送成功:" + token.message.toString())} catch (e: Exception) {e.printStackTrace()}}override fun messageArrived(topicName: String, message: MqttMessage) {//subscribe后得到的消息会执行到这里面  订阅//topicName 为主题try {//todo 收到消息,要进行一些处理的。log("收到消息:$topicName     $message")} catch (e: Exception) {log("异常:$e")}}})connect()} catch (e: Exception) {e.printStackTrace()}}//进行链接private fun connect() {Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)}private val connect = Runnable {if (client != null && client!!.isConnected) {return@Runnable}try {log("连接Mq............")client?.connect(options, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {log("Connection success")//todo 是否连接成功?要重连的。}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {log("Connection failure")//todo 是否连接成功?要重连的。}})} catch (e: Exception) {e.printStackTrace()}}//订阅信息fun subscribe(topic: String, qos: Int = 1) {try {client?.subscribe(topic, qos, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Subscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to subscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//发送消息fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {try {val message = MqttMessage()message.payload = msg.toByteArray()message.qos = qosmessage.isRetained = retainedclient?.publish(topic, message, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "$msg published to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to publish $msg to $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//释放资源fun closeMqtt() {try {if (client != null) {client!!.disconnect()client = null}} catch (e: java.lang.Exception) {e.printStackTrace()}}//打印logprivate fun log(msg: String) {Log.d(TAG, msg)}}

四、如何使用:第二阶段、断网重连

  1. 即使短暂断网,后面自己也还是可以重连恢复。
  2. 如果第一次没有连接上,增加第一次的断网重连
/*** 测试环境的设备管理系统*/
class ManageMqtt {private var context: Context? = nullprivate var TAG = "MQTT"private var client: MqttAndroidClient? = null //mqtt客户端private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置@Volatilevar isMqConnected: Boolean = false//初始化,fun init(context: Context?) {this.context = contexttry {log("1")if (client != null) {return}log("1")//MQTT的连接设置options = MqttConnectOptions()//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.isCleanSession = true//重连尝试options.isAutomaticReconnect = true// 设置超时时间 单位为秒options.connectionTimeout = 10// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.keepAliveInterval = 90client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称//设置连接的用户名options.userName = "xxx"//设置连接的密码options.password = "xxx".toCharArray()//设置回调client?.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String) {log("已连接mq")isMqConnected = true//连接成功,我们要进行订阅subscribe("xxxx")}override fun connectionLost(cause: Throwable) {log("已断开mq")isMqConnected = false}override fun deliveryComplete(token: IMqttDeliveryToken) {//publish后会执行到这里  发布try {log("发送成功:" + token.message.toString())} catch (e: Exception) {e.printStackTrace()}}override fun messageArrived(topicName: String, message: MqttMessage) {//subscribe后得到的消息会执行到这里面  订阅//topicName 为主题try {//todo 收到消息,要进行一些处理的。 Eventbuslog("收到消息:$topicName     $message")} catch (e: Exception) {log("异常:$e")}}})connect()} catch (e: Exception) {e.printStackTrace()}val intentFilter = IntentFilter()intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)context?.registerReceiver(netWorkBroadCastReciver,intentFilter)}//进行链接private fun connect() {Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)}private val connect = Runnable {if (client != null && client!!.isConnected) {return@Runnable}try {log("连接Mq............")client?.connect(options, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {log("Connection success")//todo 是否连接成功?要重连的。}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {log("Connection failure")//todo 是否连接成功?要重连的。}})} catch (e: Exception) {e.printStackTrace()}}//订阅信息fun subscribe(topic: String, qos: Int = 1) {try {client?.subscribe(topic, qos, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "Subscribed to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to subscribe $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//发送消息/*** @param topic 主题 给这个主题发送消息*  @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次*  @param retained 发布后是否保留,即重新链接时会存在*  @param msg 消息*/fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {try {val message = MqttMessage()message.payload = msg.toByteArray()message.qos = qosmessage.isRetained = retained //发布后是否保留,即重新链接时会存在client?.publish(topic, message, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "$msg published to $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.d(TAG, "Failed to publish $msg to $topic")}})} catch (e: MqttException) {e.printStackTrace()}}//释放资源fun closeMqtt() {try {if (client != null) {client!!.disconnect()client = null}} catch (e: java.lang.Exception) {e.printStackTrace()}context?.unregisterReceiver(netWorkBroadCastReciver)}//打印logprivate fun log(msg: String) {Log.d(TAG, msg)}private var networkState = 100//断网重连查询fun isNetConnected(context: Context): Boolean {Log.d(TAG, "isNetConnected: ")val connectivity =context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManagerif (connectivity != null) {val info = connectivity.activeNetworkInfoif (info != null) {if (info.type == networkState) {return false}networkState = info.typeif (info.type == (ConnectivityManager.TYPE_WIFI)) {if (!isMqConnected) {connect()}return true} else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {if (!isMqConnected) {connect()}return true}}}return false}var netWorkBroadCastReciver = object :BroadcastReceiver() {override fun onReceive(context: Context, intent: Intent?) {isNetConnected(context)log( "NetWorkBroadCastReciver: ")}}}

五、如何使用:第三阶段、封装

  1. 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/*** 对Mqtt操作的进一步封装*/
@Singleton
class MqttHelper @Inject constructor() {@Injectlateinit var mqtt: ManageMqtt/*** 注册*/fun register(context: Context?){mqtt.init(context)}/*** 发送数据*/fun sendData(data :String){Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }}/*** 接收数据*/fun data(kind:String,data:String){//待定,一般都是通过eventbus来解决。}
}

好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。


http://www.ppmy.cn/server/147605.html

相关文章

无线网络技术的发展与技术

无线网络技术在过去几十年中取得了巨大的进步,从最初的2G到如今的5G,无线通信已经深刻改变了我们的生活和工作方式。 本文将详细介绍无线网络技术的演进历程和相关的技术细节,包括无线传输原理、频谱利用、多址技术、调制与解调技术等&#…

react 路由鉴权

权限路由一般两种 1中接口中返回菜单 2 接口中返回权限,前端做匹配 一般都是那种结合,react中没有vue那种钩子函数如何做? 在项目中写一个高阶函数,在高阶函数中判断权限、是否登录等操作app.tsx或者man.tsx中使用 《AuthRouter》…

十五、linux之搭建JavaEE环境

1 概述 如果需要在 Linux 下进行 JavaEE 的开发,我们需要安装如下软件 2 安装 JDK 安装步骤 mkdir /opt/jdk mkdir /opt/jdk通过 xftp6 上传到 /opt/jdk 下 cd /opt/jdk 解压 tar -zxvf jdk-8u261-linux-x64.tar.gz tar -zxvf jdk-8u261-linux-x64.tar.…

加密算法学习笔记

可逆与不可逆 可逆 对称加密与非对称加密 不可逆 解释:一旦加密就不能反向解密得到密码原文 种类: Hash散列算法, 散列算法, 摘要算法等(哈希算法又称散列算法) 用途:一般用于效验下载文件正确性,一般在网站上下载文件都能见到…

开发手札:Win+Mac下工程多开联调

最近完成一个Windows/Android/IOS三端多人网络协同项目V1.0版本,进入测试流程了。为了方便自测,需要用unity将一个工程打开多次,分别是Win/IOS/Android版本,进行多角色联调。 在Win开发机上,以Windows版本为主版…

华为NPU服务器昇腾Ascend 910B2部署通义千问Qwen2.5——基于mindie镜像一路试错版(三)

文章目录 前言纯模型推理启动服务后面干什么?这可咋整啊?愁死了!总结前言 这是咱这个系列的第三个文章了。 毕竟,这是我好几天摸索出的经验,能帮助各位在几个小时内领会,我觉得也算是我的功劳一件了。 所以,一是希望大家耐心看下去,耐心操作下去;而是恳请各位多多关…

深度学习中的图片分类:VGG16 模型详解及代码实现

深度学习中的图片分类:VGG16 模型详解及代码实现 在深度学习的发展中,VGG16 是一个非常经典且重要的卷积神经网络(CNN)架构。尽管它已经不如一些更现代的网络(如 ResNet 和 EfficientNet)那么流行&#xf…

微服务通讯系统(2)

软件设计及核心代码展示 数据库表设计,ES搜索表设计,Redis键值对设计 数据库表设计 (1)用户表设计 这里的ID是指的是在系统中用户是第几个注册的(从1开始) user_id是指用户的唯一ID是通过uuid()函数生成…