上一次我们介绍了Java模拟单个Mqtt客户端的场景,但是在实际的业务场景中,可能需要我们模拟多个Mqtt客户端,比如:我们要对云平台的连接和设备上下行做压测。
Java模拟多个Mqtt客户端基本流程
引入Paho MQTT客户端库
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
定义MqttService
java">package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class MqttService {// mqtt brokerprivate String broker = "tcp://42.194.132.44:1883";// QoS 等级public static int qos = 2;private final Map<String, MqttClient> clients = new HashMap<>();private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);@PostConstructpublic void init() {// TODO 动态创建客户端的逻辑String clientIdPrefix = "2024120122480";int i = 0;while (i < 3) {String clientId = clientIdPrefix + i;connect(clientId, "test", "123456");i = i + 1;}// 定期检查客户端连接状态(可选)executorService.scheduleAtFixedRate(() -> {clients.values().forEach(client -> {if (!client.isConnected()) {log.info("{} is not connected, attempting to reconnect...", client.getClientId());// 这里应该实现重连逻辑}});}, 0, 10, TimeUnit.SECONDS);}@PreDestroypublic void destroy() {clients.values().forEach(client -> {try {client.disconnect();} catch (MqttException e) {log.error("destroy failed, clientId:{}", client.getClientId(), e);}});executorService.shutdown();}/*** 创建 mqtt 客户端* @param clientId* @param username* @param password*/public void connect(String clientId, String username, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());// 保留会话options.setCleanSession(true);options.setAutomaticReconnect(true);// 设置回调client.setCallback(new OnMessageCallback());// 建立连接log.info("Connecting to broker: {}, clientId:{}", broker, clientId);client.connect(options);clients.put(clientId, client);log.info("Connected: {}, clientId:{}", broker, clientId);} catch (MqttException me) {log.error("reason:{}, msg:{}, loc:{}, cause:{}", me.getReasonCode(), me.getMessage(), me.getLocalizedMessage(), me.getCause(), me);}}/*** 获取 mqtt 客户端* @param clientId* @return*/public MqttClient getClientByClientId(String clientId) {return clients.get(clientId);}/*** 发送消息* @param clientId* @param topic* @param content*/public void publish(String clientId, String topic, String content) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, content);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅* @param clientId* @param topic*/public void subscribe(String clientId, String topic) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接* @param clientId*/public void disconnect(String clientId) {MqttClient client = getClientByClientId(clientId);try {client.disconnect();clients.remove(clientId);client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}
自定义MqttCallback
java">package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,可以做重连");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}
MqttController
java">package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.service.mqtt.MqttService;
import io.swagger.annotations.*;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** 前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/connect")public ApiResult<String> connect(String clientId, String username, String password) {mqttService.connect(clientId, username, password);return ApiResult.success(clientId);}@GetMapping("/subscribe")public ApiResult<String> subscribe(String clientId, String topic) {mqttService.subscribe(clientId, topic);return ApiResult.success(topic);}@GetMapping("/publish")public ApiResult<String> publish(String clientId, String topic, String message) {mqttService.publish(clientId, topic, message);return ApiResult.success(message);}@GetMapping("/disconnect")public ApiResult<Void> disconnect(String clientId) {mqttService.disconnect(clientId);return ApiResult.success();}
}
代码验证
启动服务
启动服务后,可以看到初始化的3个Mqtt客户端成功连接到Mqtt Broker
动态创建Mqtt客户端
如下图所示,动态创建了3个Mqtt客户端:
发送消息
从下图日志可以看出,消息发送成功: