springboot整合mqtt实现android推送功能

ops/2024/10/19 2:17:29/

1、mqtt服务器使用emqx

EMQX: The World's #1 Open Source Distributed MQTT Broker

2、下载安装

下载地址:

Download EMQX

选择系统,版本,安装方法

3、springboot连接mqtt服务方法:

引包:

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定义配置信息:

spring:#MQTT配置信息mqtt:enable: true#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开url: tcp://localhost:1883#用户名username: admin#密码password: public#客户端id(不能重复)provider-id: server-provider#MQTT默认的消息推送主题,实际可在调用接口是指定default-topic: topic

 配置参数对应的自定义配置类:

package com.gnetek.monitor.api.bean;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Description mqtt客户端配置信息* @Author Darren Huang* @Date 2024-04-28 13:16*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class GneMqtt {/*** 启用*/private boolean enable;/*** url*/private String url;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 提供者id*/private String providerId;/*** 消费者id*/private String consumerId;/*** 默认主题*/private String defaultTopic;
}

启动连接mqtt服务器配置类 

注意,发送消息retained=true表示保留消息,发送后,才有客户端订阅也能收到,如果收到retained的消息后要删除消息,需要再发一个空的消息(payload= new byte[0])到此主题上

package com.gnetek.monitor.api.config;import cn.hutool.json.JSONUtil;
import com.gnetek.monitor.api.bean.GneMqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;/*** @Description 消息生产者客户端配置* @Author Darren Huang* @Date 2024-04-28 13:19*/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enable", havingValue = "true")
public class MqttProviderClient {@Autowiredprivate GneMqtt gneMqtt;private MqttClient mqttClient;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try{//创建MQTT客户端对象mqttClient = new MqttClient(gneMqtt.getUrl(), gneMqtt.getProviderId(), new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(gneMqtt.getUsername());//设置连接密码options.setPassword(gneMqtt.getPassword().toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔 1.5*30秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(30);// 自动重连  setCallback需要实现 MqttCallbackExtendedoptions.setAutomaticReconnect(true);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
//            options.setWill("willTopic", (mqttClient + "与服务器断开连接").getBytes(),0,false);//设置回调mqttClient.setCallback(new MqttProviderCallBack());mqttClient.connect(options);} catch(MqttException e){e.printStackTrace();}}/*** 发布** @param topic   主题* @param message 消息*/public void publish(String topic, Object message) {publish(topic, message, true, 0);}/*** 发布** @param topic    主题* @param message  消息* @param retained 保留* @param qos      qos 最多一次 (QoS0) 至少一次 (QoS1) 有且仅有一次 (QoS2)*/public void publish(String topic, Object message, boolean retained, int qos){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);byte[] payload = JSONUtil.toJsonStr(message).getBytes();mqttMessage.setPayload(payload);//主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = mqttClient.getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}@PreDestroypublic void destroy() {try {if (mqttClient.isConnected()) {mqttClient.disconnect();}} catch (MqttException e) {e.printStackTrace();}}
}

 mqtt客户端监听:

注意如果你需要与服务器断开连接后重新连接,需要实现MqttCallbackExtended方法,

// 自动重连  setCallback需要实现 MqttCallbackExtended
 options.setAutomaticReconnect(true);

package com.gnetek.monitor.api.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** @Description 生产者客户端消息回调* @Author Darren Huang* @Date 2024-04-28 13:28*/
@Slf4j
public class MqttProviderCallBack implements MqttCallbackExtended {/*** 连接丢失** @param throwable throwable*/@Overridepublic void connectionLost(Throwable throwable) {log.debug("与服务器断开连接,可重连");throwable.printStackTrace();}/*** 消息到达** @param topic       主题* @param mqttMessage mqtt消息* @throws Exception*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.debug("接收消息的主题 : {}", topic);log.debug("接收消息的内容 : {}", new String(mqttMessage.getPayload()));log.debug("接收消息的Qos : {}", mqttMessage.getQos());log.debug("接收消息的retained : {}", mqttMessage.isRetained());}/*** 交付完成** @param iMqttDeliveryToken MQTT交付令牌*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.debug("发送消息成功");}/*** 连接完成** @param reconnect 重新连接* @param serverURI 服务器uri*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {// 可以做订阅主题 if(reconnect) 如果是重连 订阅主题log.debug("与服务器断连接完成 是否是重连={}, serverURI={}", reconnect, serverURI);}
}

4、android连接mqtt服务

参考:Android 使用 Kotlin 连接 MQTT | EMQ (emqx.com)

里面用到了

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 

高版本的android需要修改一些代码,它的源码地址是:eclipse/paho.mqtt.android: MQTT Android (github.com)


http://www.ppmy.cn/ops/26745.html

相关文章

【JavaEE网络】 TCP的可靠传输机制总结

目录 可靠传输实现机制确认应答超时重传连接管理滑动窗口流量控制拥塞控制延迟应答捎带应答 可靠传输实现机制 确认应答 这是保证可靠性的最核心机制 TCP将每个字节的数据都进行了编号。即为序列号。 这是为了防止连续发多条数据的时候&#xff0c;可能出现“后发先至”的情…

分类分析|KNN分类模型及其Python实现

KNN分类模型及其Python实现 1. KNN算法思想2. KNN算法步骤2.1 KNN主要优点2.2 KNN主要缺点 3. Python实现KNN分类算法3.1 自定义方法实现KNN分类3.2 调用scikit-learn模块实现KNN分类 4. K值的确定 在之前文章 分类分析|贝叶斯分类器及其Python实现中&#xff0c;我们对分类分…

Java 基础重点知识-(Java 语言特性、数据类型、常见类、异常)

文章目录 Java 语言特性形参和实参的区别是什么?值传递和引用传递的区别?Java 是值传递还是引用传递?final 的作用是什么?final finally finalize 有什么不同?static 的作用是什么?static 和 final 的区别是什么? Java 数据类型Java基本数据类型有几种? 各占多少位?基…

form1弹出子窗体form2,拖动子窗体判断是否离开父窗体区域,含源码(学习笔记)

一、效果&#xff08;进入和离开&#xff09; 子窗体到达父窗体边缘时变色。 二、代码分析 判断父窗体的目的&#xff0c;可以控制子窗体要随父窗体走。上面代码需要加以处理。 如&#xff1a;this.Location new Point(parentPoint.X distanceFromEdge, this.Location.Ydis…

知识产权 | 守护科技创新之光,共筑知识产权长城

2024年4月26日&#xff0c;迎来了一年一度的世界知识产权日&#xff0c;今年的主题是&#xff1a;“立足创新创造&#xff0c;构建共同未来。” 易我科技是一家专注于数据安全产品研发、生产、销售、服务一体化的高新技术软件企业。易我科技自成立以来&#xff0c;始终秉持尊重…

基于STC12C5A60S2系列1T 8051单片机的Proteus中的单片机发送一帧或一串数据给串口调试助手软件接收区显示出来的串口通信应用

基于STC12C5A60S2系列1T 8051单片机的Proteus中的单片机发送一帧或一串数据给串口调试助手软件接收区显示出来的串口通信应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机串口通信介绍STC12C5A60S2系列1T 8051单片机串口通信的结构基于STC12C5A60S2系列…

C++ 动态链接库DLL创建及使用

一、动态链接库DLL创建 使用VS2022 创建 1、创建新解决方案 创建即可 2、创建动态链接库新项目 右键解决方案 语言选择C&#xff0c;选择动态链接库 填入项目名称&#xff0c;勾选&#xff1a;将解决方案和项目放在同一目录中 点击创建 3、创建后&#xff0c;显示dllmai…

Android13 源码环境编译app源码报错AndroidManifest.xml.fixed分析解决总结

Android13 源码环境编译app源码报错AndroidManifest.xml.fixed分析解决 文章目录 Android13 源码环境编译app源码报错AndroidManifest.xml.fixed分析解决一、前言二、Android13源码下简单demo应用代码编译报错和分析解决1、AndroidManifest.xml文件代码&#xff1a;2、AndroidM…