Java实现Mqtt收发消息
文章目录
- Java实现Mqtt收发消息
- windows mqtt 平台服务搭建
- mqtt 客户端工具:mqttbox
- 整体代码结构
- mqtt基础参数配置类
- mqtt客户端连接
- mqtt接收的消息处理类
- 对应的MqttService注解和MqttTopic注解
- MqttGateway 发送消息
- 指定topic接收处理方法
java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建(不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程
windows mqtt 平台服务搭建
下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680,但是 mqtt 服务的端口是 61613
mqtt 客户端工具:mqttbox
这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&depth_1-utm_source=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&spm=1003.2020.3001.6616.1
整体代码结构
mqtt基础参数配置类
@Data
@Component
@ConfigurationProperties("mqtt")
public class MqttProperties {private String username;private String password;private String hostUrl;private String inClientId;private String outClientId;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private boolean clearSession;
}
mqtt客户端连接
import com.bsj.boyun.core.tool.utils.ExceptionUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttMessageHandle mqttMessageHandle;private static String outboundChannel = "mqttOutboundChannel";@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() throws MqttException {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();try {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);} catch (Exception e) {System.out.println("mqtt初始化连接异常:" + ExceptionUtil.getStackStr(e));}return factory;}@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));}@Beanpublic IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {adapter.setCompletionTimeout(5000);adapter.setQos(1);return IntegrationFlows.from(adapter).channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@Beanpublic ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();int maxPoolSize = 200;executor.setMaxPoolSize(maxPoolSize);int corePoolSize = 50;executor.setCorePoolSize(corePoolSize);int queueCapacity = 1000;executor.setQueueCapacity(queueCapacity);int keepAliveSeconds = 300;executor.setKeepAliveSeconds(keepAliveSeconds);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}@Beanpublic IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(), factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return IntegrationFlows.from(outboundChannel).handle(handler).get();}}
mqtt接收的消息处理类
import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttService;
import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttTopic;
import com.bsj.studentcard.gateway.attendance.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler {public static Map<String, Object> mqttServices;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {getMqttTopicService(message);}public Map<String, Object> getMqttServices() {if (mqttServices == null) {mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);}return mqttServices;}public void getMqttTopicService(Message<?> message) {String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);if (receivedTopic == null || "".equals(receivedTopic)) {return;}for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {Class<?> clazz = entry.getValue().getClass();Method[] methods = clazz.getDeclaredMethods();for (Method method : methods) {if (method.isAnnotationPresent(MqttTopic.class)) {MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);if (isMatch(receivedTopic, handleTopic.value())) {try {method.invoke(SpringUtils.getBean(clazz), message);return;} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {log.error("执行 {} 方法出现错误", handleTopic.value(), e);}}}}}}public static boolean isMatch(String topic, String pattern) {if ((topic == null) || (pattern == null)) {return false;}if (topic.equals(pattern)) {return true;}if ("#".equals(pattern)) {return true;}String[] splitTopic = topic.split("/");String[] splitPattern = pattern.split("/");boolean match = true;for (int i = 0; i < splitPattern.length; i++) {if (!"#".equals(splitPattern[i])) {if (i >= splitTopic.length) {match = false;break;}if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {match = false;break;}} else {break;}}return match;}
}
对应的MqttService注解和MqttTopic注解
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;import java.lang.annotation.*;
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {@AliasFor(annotation = Component.class)String value() default "";
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {String value() default "";}
MqttGateway 发送消息
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}
指定topic接收处理方法
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle {private final MqttGateway mqttGateway;@MqttTopic("mqtt/face/basic")public void basic(Message<?> message) throws MqttException {String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = (String) message.getPayload();log.info("接收到的topic为:{},内容:{}", receivedTopic, payload );mqttGateway.sendToMqtt(topic, 0, "收到消息!");}
}