Golang Gin框架mqtt消费者

news/2025/2/12 5:17:58/

这里主要是展示一个非常简单的Gin框架下的mqtt消费者,在保持启动后持续轮训消费。简单实用是主旨。

viper全局调用yaml文件中的数据

redisClient 缓存客户端
doJYService 业务逻辑代码

package mainimport ("fmt""github.com/gin-gonic/gin""github.com/spf13/viper""jieyu-gin/mqtt-service/config""jieyu-gin/mqtt-service/doJYService""jieyu-gin/mqtt-service/internal/redisClient"
)func main() {router := gin.Default()gin.SetMode(gin.ReleaseMode)if err := config.InitConfig(); err != nil {fmt.Printf("Failed to initialize config: %v\n", err)return}redisClient.InitRedisClient()doJYService.BatchProcessing()go func() {router.Run(viper.GetString("app.port"))}()
}

func BatchProcessing() {} 方法中 for{}中处理数据。

package doJYServiceimport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""jieyu-gin/mqtt-service/internal/database""jieyu-gin/mqtt-service/internal/log""jieyu-gin/mqtt-service/internal/redisClient""time"
)var broker = "127.0.0.1" //你的broker ip 服务器消费地址
var port = 1883
var userName = "jieyu_mqtt"
var passwd = "jieyu_mqtt_1688"
var topic = "jieyu_online"var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}func BatchProcessing() string {opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))opts.SetClientID("go_mqtt_consumer")opts.SetUsername(userName)opts.SetPassword(passwd)opts.SetKeepAlive(8 * time.Second)opts.SetDefaultPublishHandler(messageRecHandler)opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandlerclient := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}sub(client, false)time.Sleep(30 * time.Second)for {time.Sleep(3 * time.Second)fmt.Println("目前没有数据,等待消息进场。")}return ""
}// 初始化数据库和日志
func InitMysqlAndLog(name string, id string) {writer, err := log.New(id, name)if err != nil {panic(err)}database.Init(writer)redisClient.Set("log-"+name+"-"+id, writer.Name())
}func sub(client mqtt.Client, producer bool) {token := client.Subscribe(topic, 1, nil)token.Wait()if producer {fmt.Printf("Producer subscribed to topic %s", topic)} else {fmt.Printf("Consumer subscribed to topic %s", topic)}
}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {fmt.Println("Connected")
}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("Connect lost: %v", err)
}

效果图如下

消息持续消费,具体处理业务的逻辑代码自己写。


http://www.ppmy.cn/news/1570720.html

相关文章

运用Deek Seeker协助数据分析

我的数据源有两张表,一个是每日销售表(字段有日期、产品名称、实际销量),一个是每月目标表(字段有年度月份、产品名称、目标销量);我的需求是,按月、按年来统计每个产品的目标完成情况请问用PowerBl进行分析,应该如何建立数据模型…

分巧克力

儿童节那天有 K位小朋友到小明家做客。小明拿出了珍藏的巧克力招待小朋友们。小明一共有 N块巧克力,其中第i块是HixWi的方格组成的长方形。为了公平起见,小明需要从这 N块巧克力中切出 K块巧克力分给小朋友们。切出的巧克力需要满足 1.形状是正方形&…

【C语言】指针详细解读3

1. 数组名的理解 我们使用指针一般访问数组内容时,我们可能会这样写: int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 这⾥我们使⽤ &arr[0] 的⽅式拿到了数组第⼀个元素的地址,但是其实数组名本来就是地址,⽽…

学习笔记:机器学习中的数学原理(一)

1. 集合 集合分为有限集和无限集; 对于有限集,两集合元素数相等即为等势; 对于无限集,两集合元素存在一一映射关系即为等势; 无限集根据是否与正整数集等势分为可数集和不可数集。 2. sigmoid函数(也叫…

使用线性回归模型逼近目标模型 | PyTorch 深度学习实战

前一篇文章,计算图 Compute Graph 和自动求导 Autograd | PyTorch 深度学习实战 本系列文章 GitHub Repo: https://github.com/hailiang-wang/pytorch-get-started 使用线性回归模型逼近目标模型 什么是回归什么是线性回归使用 PyTorch 实现线性回归模型代码执行结…

[手机Linux] onepluse6T 系统重新分区

一,刷入TWRP 1. 电脑下载 Fastboot 工具(解压备用)和对应机型 TWRP(.img 后缀文件,将其放入前面解压的文件夹里) 或者直接这里下载:TWRP 2. 将手机关机,长按音量上和下键 开机键 进入 fastbo…

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_get_options函数

声明 就在 main函数所在的 nginx.c 中&#xff1a; static ngx_int_t ngx_get_options(int argc, char *const *argv); 实现 static ngx_int_t ngx_get_options(int argc, char *const *argv) {u_char *p;ngx_int_t i;for (i 1; i < argc; i) {p (u_char *) argv[i]…

Android的MQTT客户端实现

在 Android 平台上实现 MQTT 客户端的完整技术方案&#xff0c;涵盖基础实现、安全连接、性能优化和最佳实践&#xff1a; 一、技术选型与依赖配置 推荐库 Eclipse Paho Android Service&#xff08;官方维护&#xff0c;支持后台运行&#xff09; gradle 复制 // build.gradl…