EMQX构建简易的云服务

server/2024/12/27 22:17:28/

基本思路:

  1. 使用EMQX作为Mqtt broker
  2. mqtt-receive-server服务,用于接收设备上报的数据
  3. mqtt-sender-service服务,用于下发数据给设备
  4. KafKa实现数据解耦,mqtt-receive-server服务接收的数据简单处理下直接扔到Kafka中
  5. 云服务各业务系统从KafKa中消费数据,各业务需要下发数据的话,调用mqtt-sender-service接口下发数据给设备

基本流程

在这里插入图片描述

DashBoard 定义认证用户

在这里插入图片描述

定义Mqtt协议主题

// 设备激活
public final static String ACTIVATE = "mqtt/0/1";
// 设备重置
public final static String RESET = "mqtt/0/0";
// 上线
public final static String ONLINE = "mqtt/1/1";
// 下线
public final static String OFFLINE = "mqtt/1/0";
// 上行-设备上报数据到平台
public final static String REPORT = "mqtt/2/1";
// 下行-平台下发数据给设备
public final static String ISSUED = "%s/2/0";

设备认证流程

首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,首次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否合法,不合法将设备踢下线。
设备订阅${clientId}/2/0主题

@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler());client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));}

mqtt-receive-server服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简单处理,转发到KafKa

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
@PostConstruct
public void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);
}
java">@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}

mqtt-sender-service服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子系统使用,用于下发数据给设备

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_sender_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
java">package com.angel.ocean.listener;import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.domain.client.ActivateData;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
@Component
public class UpDataConsumerListener {@Resourceprivate MqttService mqttService;/*** 批量消费*/@KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {try {log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());for (ConsumerRecord<String, String> record : records) {UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);log.info("{}", record.value());handler(data.getTopic(), data.getData());}} catch (Exception e) {log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);} finally {// 手动确认ack.acknowledge();}}private void handler(String topic, String data) {switch (topic) {case MqttTopicConstant.ACTIVATE:activateHandler(data);break;case MqttTopicConstant.RESET:otherHandler(data);break;case MqttTopicConstant.OFFLINE:otherHandler(data);break;case MqttTopicConstant.ONLINE:otherHandler(data);break;case MqttTopicConstant.REPORT:otherHandler(data);break;default:otherHandler(data);}}private void activateHandler(String data) {ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);String clientId = activateData.getClientId();mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");}private void otherHandler(String data) {log.info("{}", data);}}
java">package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;@Slf4j
@RestController
@RequestMapping("/mqtt/server")
public class MqttController {@Resourceprivate MqttClient server;@Resourceprivate MqttService mqttService;/*** 数据下发接口* @param clientId* @param data* @return*/@RequestMapping("/sender")public ApiResult<?> publish(String clientId, String data) {String topic = String.format(MqttTopicConstant.ISSUED, clientId);mqttService.publish(topic, data);if(server.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {server.publish(topic, message);log.info("Message published, topic:{}, data:{}", topic, data);} catch (MqttException e) {log.error("Message publish failed, topic:{}", topic, e);return ApiResult.error();}return ApiResult.success();}log.info("Message publish failed, not online.");return ApiResult.error();}
}

代码验证

场景:设备上报消息,云服务端回复消息给设备; 云服务主动下发数据给设备。

模拟设备上报消息, 接收云平台回复

发了两次:
在这里插入图片描述mqtt-client 本地客户端日志:

在这里插入图片描述
mqtt-receive-server云服务日志:

在这里插入图片描述
mqtt-sender-server云服务日志:

在这里插入图片描述

模拟云平台主动下发数据

在这里插入图片描述mqtt-sender-server云服务主动下发的日志:

在这里插入图片描述mqtt-client数据接收日志:

在这里插入图片描述


http://www.ppmy.cn/server/152968.html

相关文章

【Apache Doris】周FAQ集锦:第 26 期

SQL问题 Q1 doris 3.0存算分离模式下&#xff0c;建表的时是否需要指定表的副本数 不需要&#xff0c;指定了也会忽略&#xff1b;存算分离模式下&#xff0c;数据副本由远端存储去管控。 Q2 doris 通过dbeaver查询时报错&#xff1a;[SXXXX]… doris的错误码通常都是EXXXX&…

Nacos的下载和启动(如何快速稳定下载在github中)

目录 Nacos的下载 下载加速器 在githup中找到Nacos 启动Nacos 访问Nacos Nacos的下载 下载加速器 首先&#xff0c;我们需要进入githup中&#xff0c;我们直接访问&#xff0c;肯定是访问不到的。 这里我们经常玩游戏的同学肯定知道steam&#xff0c;这个加速器。直接进入…

Ubuntu下通过Docker部署Caddy服务器

Docker和Caddy简介 Docker是一个强大的容器化平台&#xff0c;而Caddy是一个现代化的Web服务器&#xff0c;支持自动HTTPS和简单配置。这两款软件在现代IT领域扮演着重要的角色。 步骤一&#xff1a;安装Docker 首先&#xff0c;安装Docker。执行以下命令&#xff1a; sudo…

物联网水文观测设备

物联网水文观测设备的功能涵盖了水文监测的多个方面&#xff0c;以下是其主要功能&#xff1a; 实时数据收集&#xff1a; 物联网水文观测设备能够实时收集水位、流量、水质等参数&#xff0c;实现对水环境的持续监测。这种连续的数据收集方法有助于及时发现和预警水资源问题&a…

LLM客户端开源工具cherry studio

Cherry Studio AI 是一款强大的多模型AI 助手&#xff0c;支持iOS、macOS 和Windows 平台。快速切换多个先进的LLM 模型&#xff0c;提升工作学习效率。通过自定义LLM的接口&#xff0c;可以方便使用最新的大预言模型&#xff0c;包括多模态模型。比如google的最新的 gemini的G…

day14-补充静态网卡配置

修改网络模式&#xff0c;修改静态ip&#xff0c;动态ip获取方式 查看当前的上网信息 1.确保你的机器&#xff0c;是连接的网络的&#xff0c;是插上了网线的。&#xff08;模拟了物理服务器的软件是什么&#xff1f;看你的虚拟的机器&#xff08;vmware&#xff09;&#xf…

【jvm】内存泄漏的8种情况

目录 1. 说明2. 静态集合类持有对象引用3. 单例模式4. 内部类持有外部类5. 未关闭的连接6. 变量不合理的作用域7. 改变对象的哈希值8. 缓存Cache泄漏9. 监听器和回调 1. 说明 1.内存泄漏&#xff08;Memory Leak&#xff09;指的是程序中动态分配的内存由于某种原因没有被释放…

基于Springboot的数字科技风险报告管理系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…