Paho-Mqtt库的使用

ops/2024/9/23 4:21:20/

文章目录

      • 1. **安装 MQTT 库 **
      • 2. **MQTT 基本概念**
      • 3. **基本用法**
        • 3.1 创建 MQTT 客户端
        • 3.2 发布消息
        • 3.3 订阅主题
        • 3.4 取消订阅
      • 4. **高级用法**
        • 4.1 持久化会话
        • 4.2 重连机制
        • 4.3 SSL/TLS 安全连接
      • 5. **QoS(服务质量等级)详解**
      • 6. **错误处理与回调**
      • 7. **错误处理**
      • 8. **完整示例:发布和订阅**

1. **安装 MQTT 库 **

首先,你需要安装 paho.mqtt.golang>golang

go get github.com/eclipse/paho.mqtt.golang>golang

2. MQTT 基本概念

  • Broker(代理):MQTT 服务器,负责接收客户端发布的消息并分发给订阅这些主题的客户端。
  • Client(客户端):连接到代理的设备或应用程序,负责发布消息或订阅特定主题。
  • Topic(主题):MQTT 使用主题来对消息进行分类和路由,发布者将消息发送到特定的主题,订阅者从主题中接收消息。
  • QoS(服务质量):MQTT 提供 3 种服务质量级别(QoS 0、QoS 1、QoS 2),用来控制消息的传输可靠性。

3. 基本用法

3.1 创建 MQTT 客户端

首先,创建一个 MQTT 客户端并与代理连接:

go">package mainimport ("fmt""log""time"mqtt "github.com/eclipse/paho.mqtt.golang>golang"
)func main() {// 定义 MQTT 客户端选项opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883"). // 代理地址SetClientID("go_mqtt_client").     // 客户端 IDSetUsername("user").               // 用户名(可选)SetPassword("password")            // 密码(可选)// 创建客户端client := mqtt.NewClient(opts)// 连接到 MQTT 代理if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatalf("MQTT Connection Error: %v", token.Error())}fmt.Println("Connected to MQTT broker")// 延迟关闭客户端defer client.Disconnect(250)
}
3.2 发布消息

使用 Publish 方法可以向特定的主题发布消息:

go">package mainimport ("fmt""log""time"mqtt "github.com/eclipse/paho.mqtt.golang>golang"
)func main() {// 创建客户端选项和连接opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("publisher")client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatalf("MQTT Connection Error: %v", token.Error())}defer client.Disconnect(250)// 发布消息topic := "example/topic"msg := "Hello MQTT"token := client.Publish(topic, 0, false, msg)token.Wait()fmt.Printf("Message published to topic %s: %s\n", topic, msg)
}
  • Publish 方法的参数解释:
    • topic:消息的主题。
    • QoS:服务质量等级,0 表示尽力传输,1 表示至少传输一次,2 表示仅传输一次。
    • retained:是否保留消息,false 表示不保留。
    • payload:消息的内容。
3.3 订阅主题

使用 Subscribe 方法可以订阅一个或多个主题,并接收这些主题的消息:

go">package mainimport ("fmt""log"mqtt "github.com/eclipse/paho.mqtt.golang>golang"
)func main() {// 创建客户端选项和连接opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("subscriber")client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatalf("MQTT Connection Error: %v", token.Error())}defer client.Disconnect(250)// 定义消息处理回调函数messageHandler := func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())}// 订阅主题topic := "example/topic"token := client.Subscribe(topic, 0, messageHandler)token.Wait()fmt.Printf("Subscribed to topic %s\n", topic)// 保持运行,以便接收消息select {}
}
  • Subscribe 方法的参数解释:
    • topic:要订阅的主题。
    • QoS:消息传输的服务质量等级。
    • callback:当有消息发布到该主题时,调用的回调函数。
3.4 取消订阅

可以使用 Unsubscribe 方法取消对某个主题的订阅:

go">client.Unsubscribe("example/topic").Wait()
fmt.Println("Unsubscribed from topic example/topic")

4. 高级用法

4.1 持久化会话

MQTT 支持持久化会话,即当客户端断开连接后,代理可以保留它的订阅和 QoS 1 或 2 消息。在重新连接时,客户端可以接收在离线期间发布的消息。

go">opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("persistent_client").SetCleanSession(false)  // 启用持久化会话
4.2 重连机制

可以通过 SetAutoReconnect 来设置自动重连,确保在网络故障后客户端自动尝试重新连接。

go">opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("reconnect_client").SetAutoReconnect(true)  // 启用自动重连
4.3 SSL/TLS 安全连接

如果 MQTT 代理启用了 SSL/TLS,你可以配置客户端使用加密连接:

go">import ("crypto/tls""crypto/x509""io/ioutil"mqtt "github.com/eclipse/paho.mqtt.golang>golang"
)func main() {// 加载 CA 证书certpool := x509.NewCertPool()ca, err := ioutil.ReadFile("ca.crt")if err != nil {log.Fatalf("Failed to load CA certificate: %v", err)}certpool.AppendCertsFromPEM(ca)// 配置 TLS 连接tlsConfig := &tls.Config{RootCAs: certpool,}opts := mqtt.NewClientOptions().AddBroker("ssl://localhost:8883").SetClientID("ssl_client").SetTLSConfig(tlsConfig)  // 启用 TLS 加密client := mqtt.NewClient(opts)// 连接和发布消息...
}

5. QoS(服务质量等级)详解

MQTT 提供三种服务质量等级,用来决定消息的可靠性:

  • QoS 0:消息传输 “尽力而为”,可能会丢失消息。
  • QoS 1:保证消息至少传递一次,可能会有重复。
  • QoS 2:保证消息仅传递一次,确保消息无丢失且无重复。

PublishSubscribe 方法中,你可以指定不同的 QoS 级别。

6. 错误处理与回调

可以通过为 ClientOptions 设置回调来处理 MQTT 客户端的不同事件:

go">opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("callback_client").SetConnectionLostHandler(func(client mqtt.Client, err error) {fmt.Printf("Connection lost: %v\n", err)}).SetOnConnectHandler(func(client mqtt.Client) {fmt.Println("Connected to MQTT broker")})
  • SetConnectionLostHandler:当连接丢失时触发的回调。
  • SetOnConnectHandler:当连接成功时触发的回调。

7. 错误处理

当执行 ConnectPublishSubscribe 等操作时,可以通过 token.Wait() 来等待操作完成,并检查是否有错误发生:

go">token := client.Publish("example/topic", 0, false, "Hello MQTT")
if token.Wait() && token.Error() != nil {log.Fatalf("Error publishing message: %v", token.Error())
}

8. 完整示例:发布和订阅

一个完整的示例,演示如何发布和订阅消息:

go">package mainimport ("fmt""log"mqtt "github.com/eclipse/paho.mqtt.golang>golang"
)func main() {// 创建并连接 MQTT 客户端opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("mqtt_example")client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil

http://www.ppmy.cn/ops/114578.html

相关文章

JavaEE: 深入探索TCP网络编程的奇妙世界(四)

文章目录 TCP核心机制TCP核心机制四: 滑动窗口为啥要使用滑动窗口?滑动窗口介绍滑动窗口出现丢包咋办? TCP核心机制五: 流量控制 TCP核心机制 上一篇文章 JavaEE: 深入探索TCP网络编程的奇妙世界(三) 书接上文~ TCP核心机制四: 滑动窗口 为啥要使用滑动窗口? 之前我们讨…

09年408考研真题解析-计算机网络

[题34]在无噪声情况下,若某通信链路的带宽为3kHz,采用4个相位,每个相位具有4种振幅的QAM调制技术,则该通信链路的最大数据传输速率是(B) A.12 kbps B.24 kbps C.48 kbps D.96 kbps 解析&#xff…

如何在 Ubuntu 16.04 服务器上安装 Python 3 并设置编程环境

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 介绍 本教程将帮助您在 Ubuntu 16.04 或 Debian 8 服务器上设置 Python 3 编程环境。在服务器上进行编程有许多优势,并且使…

DepthCrafter:为开放世界视频生成一致的长深度序列

通过利用视频扩散模型,我们创新了一种新颖的视频深度估算方法–DepthCrafter。 它可以为开放世界视频生成具有细粒度细节的时间一致性长深度序列,而无需摄像机姿势或光流等附加信息。 简介 动机。 尽管在静态图像的单目深度估算方面取得了重大进展&…

【前端学习】作用域实际问题学习记录

在复习apply,call,bind的时候遇到了一个作用域问题。 let name noName let age 18function getMyname() {console.log(my name is this.name, and I am this.age years old); }getMyname() }在全局使用let定义变量name和age之后,运行g…

认识结构体

目录 一.结构体类型的声明 1.结构的声明 2.定义结构体变量 3.结构体变量初始化 4.结构体的特殊声明 二.结构体对齐(重点难点) 1.结构体对齐规则 2.结构体对齐练习 (一)简单结构体对齐 (二)嵌套结构体对齐 3.为什么存在内存对齐 4.修改默认对齐数 三.结构体传参 1…

Flink 实现无界流

Flink 实现无界流 package org.example.test;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.opera…

【数据结构-差分】力扣1589. 所有排列中的最大和

有一个整数数组 nums ,和一个查询数组 requests ,其中 requests[i] [starti, endi] 。第 i 个查询求 nums[starti] nums[starti 1] … nums[endi - 1] nums[endi] 的结果 ,starti 和 endi 数组索引都是 从 0 开始 的。 你可以任意排列…