这里主要是展示一个非常简单的Gin框架下的mqtt消费者,在保持启动后持续轮训消费。简单实用是主旨。
viper全局调用yaml文件中的数据
redisClient 缓存客户端
doJYService 业务逻辑代码
package mainimport ("fmt""github.com/gin-gonic/gin""github.com/spf13/viper""jieyu-gin/mqtt-service/config""jieyu-gin/mqtt-service/doJYService""jieyu-gin/mqtt-service/internal/redisClient"
)func main() {router := gin.Default()gin.SetMode(gin.ReleaseMode)if err := config.InitConfig(); err != nil {fmt.Printf("Failed to initialize config: %v\n", err)return}redisClient.InitRedisClient()doJYService.BatchProcessing()go func() {router.Run(viper.GetString("app.port"))}()
}
func BatchProcessing() {} 方法中 for{}中处理数据。
package doJYServiceimport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""jieyu-gin/mqtt-service/internal/database""jieyu-gin/mqtt-service/internal/log""jieyu-gin/mqtt-service/internal/redisClient""time"
)var broker = "127.0.0.1" //你的broker ip 服务器消费地址
var port = 1883
var userName = "jieyu_mqtt"
var passwd = "jieyu_mqtt_1688"
var topic = "jieyu_online"var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}func BatchProcessing() string {opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))opts.SetClientID("go_mqtt_consumer")opts.SetUsername(userName)opts.SetPassword(passwd)opts.SetKeepAlive(8 * time.Second)opts.SetDefaultPublishHandler(messageRecHandler)opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandlerclient := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}sub(client, false)time.Sleep(30 * time.Second)for {time.Sleep(3 * time.Second)fmt.Println("目前没有数据,等待消息进场。")}return ""
}// 初始化数据库和日志
func InitMysqlAndLog(name string, id string) {writer, err := log.New(id, name)if err != nil {panic(err)}database.Init(writer)redisClient.Set("log-"+name+"-"+id, writer.Name())
}func sub(client mqtt.Client, producer bool) {token := client.Subscribe(topic, 1, nil)token.Wait()if producer {fmt.Printf("Producer subscribed to topic %s", topic)} else {fmt.Printf("Consumer subscribed to topic %s", topic)}
}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {fmt.Println("Connected")
}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("Connect lost: %v", err)
}
效果图如下
消息持续消费,具体处理业务的逻辑代码自己写。