Java模拟多个Mqtt客户端连接Mqtt Broker

ops/2024/12/22 12:35:58/

上一次我们介绍了Java模拟单个Mqtt客户端的场景,但是在实际的业务场景中,可能需要我们模拟多个Mqtt客户端,比如:我们要对云平台的连接和设备上下行做压测。

Java模拟多个Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

定义MqttService

java">package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class MqttService {// mqtt brokerprivate String broker  = "tcp://42.194.132.44:1883";// QoS 等级public static int qos = 2;private final Map<String, MqttClient> clients = new HashMap<>();private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);@PostConstructpublic void init() {// TODO 动态创建客户端的逻辑String clientIdPrefix = "2024120122480";int i = 0;while (i < 3) {String clientId = clientIdPrefix + i;connect(clientId, "test", "123456");i = i + 1;}// 定期检查客户端连接状态(可选)executorService.scheduleAtFixedRate(() -> {clients.values().forEach(client -> {if (!client.isConnected()) {log.info("{} is not connected, attempting to reconnect...", client.getClientId());// 这里应该实现重连逻辑}});}, 0, 10, TimeUnit.SECONDS);}@PreDestroypublic void destroy() {clients.values().forEach(client -> {try {client.disconnect();} catch (MqttException e) {log.error("destroy failed, clientId:{}", client.getClientId(), e);}});executorService.shutdown();}/*** 创建 mqtt 客户端* @param clientId* @param username* @param password*/public void connect(String clientId, String username, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());// 保留会话options.setCleanSession(true);options.setAutomaticReconnect(true);// 设置回调client.setCallback(new OnMessageCallback());// 建立连接log.info("Connecting to broker: {}, clientId:{}", broker, clientId);client.connect(options);clients.put(clientId, client);log.info("Connected: {}, clientId:{}", broker, clientId);} catch (MqttException me) {log.error("reason:{}, msg:{}, loc:{}, cause:{}", me.getReasonCode(), me.getMessage(), me.getLocalizedMessage(), me.getCause(), me);}}/*** 获取 mqtt 客户端* @param clientId* @return*/public MqttClient getClientByClientId(String clientId) {return clients.get(clientId);}/*** 发送消息* @param clientId* @param topic* @param content*/public void publish(String clientId, String topic, String content) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, content);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅* @param clientId* @param topic*/public void subscribe(String clientId, String topic) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接* @param clientId*/public void disconnect(String clientId) {MqttClient client = getClientByClientId(clientId);try {client.disconnect();clients.remove(clientId);client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}

自定义MqttCallback

java">package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,可以做重连");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}

MqttController

java">package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.service.mqtt.MqttService;
import io.swagger.annotations.*;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/***  前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/connect")public ApiResult<String> connect(String clientId, String username, String password) {mqttService.connect(clientId, username, password);return ApiResult.success(clientId);}@GetMapping("/subscribe")public ApiResult<String> subscribe(String clientId, String topic) {mqttService.subscribe(clientId, topic);return ApiResult.success(topic);}@GetMapping("/publish")public ApiResult<String> publish(String clientId, String topic, String message) {mqttService.publish(clientId, topic, message);return ApiResult.success(message);}@GetMapping("/disconnect")public ApiResult<Void> disconnect(String clientId) {mqttService.disconnect(clientId);return ApiResult.success();}
}

代码验证

启动服务

启动服务后,可以看到初始化的3个Mqtt客户端成功连接到Mqtt Broker
在这里插入图片描述

动态创建Mqtt客户端

在这里插入图片描述如下图所示,动态创建了3个Mqtt客户端:
在这里插入图片描述

发送消息

在这里插入图片描述从下图日志可以看出,消息发送成功:
在这里插入图片描述


http://www.ppmy.cn/ops/144017.html

相关文章

git管理

Git 项目管理&#xff1a;从本地开发到远程仓库提交 在这篇博客中&#xff0c;我将向你展示如何将本地的一个 Python 项目与远程 Git 仓库进行关联&#xff0c;并完成代码的提交和管理&#xff0c;同时处理一些常见的 Git 操作问题。以下是详细的步骤&#xff1a; 一、初始化…

srping2.0+升级到spring3.0+遇到的问题,es部分记录一下

最近公司项目,在做版本升级,首先srping2.0升级到spring3.0,目前无漏洞版本是springBoot3.2.12springClould2023.0.3,升级完spring,紧接着升级es,原来我们使用的es客户端版本是7.9.3,升级到无漏洞版本是7.17.23 <elasticsearch.version>7.17.23</elasticsearch.version…

题海拾贝:力扣 86.分隔链表

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 我的专栏&#xff1a;《编程之路》、《数据结构与算法之美》、《题海拾贝》 欢迎点赞&#xff0c;关注&#xff01; 1、题…

医疗挂号的数字化转型:SSM 联合 Vue 的系统设计与实现探索

3系统分析 3.1可行性分析 通过对本医院预约挂号系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本医院预约挂号系统采用SSM框架&#xff0c;JAVA作为开发语…

Pytorch | 从零构建ResNet对CIFAR10进行分类

Pytorch | 从零构建ResNet对CIFAR10进行分类 CIFAR10数据集ResNet核心思想网络结构创新点优点应用 ResNet结构代码详解结构代码代码详解BasicBlock 类ResNet 类ResNet18、ResNet34、ResNet50、ResNet101、ResNet152函数 训练过程和测试结果代码汇总resnet.pytrain.pytest.py 前…

ios的safari下载文件 文件名乱码

当使用nginx代理文件并下载文件时&#xff0c;返回的协议头Content-Disposition中filename%E9%9B%AA%E5%B1%B1.jpg中文内容会是URL编码的形式&#xff0c;当客户端在safari浏览器下载下载文件时&#xff0c;文件名不会转换&#xff08;URL解码&#xff09;为正常的中文。 应该…

wepack的各个版本差异?

‌Webpack的版本问题主要体现在不同版本之间的配置差异和兼容性问题。‌ 不同版本之间的配置差异 ‌Webpack 3.x与4.x的区别‌&#xff1a;在Webpack 3.0之后&#xff0c;配置entry和output路径不再支持相对路径&#xff0c;只能使用__dirname拼接成的绝对路径‌。‌Webpack …

UE5 Lyra项目源码分析-角色配置说明

在上一篇里&#xff0c;我们研究了关卡的配置如何在GameMode实现加载的&#xff0c;并稍微理解了Lyra是如何实现的模块化&#xff0c;由于模块化太彻底&#xff0c;所以理解起来有些难&#xff0c;在这一篇里&#xff0c;我们看一下配置里面比较想了解的一块。就是角色是如何配…