EMQX构建简易的云服务

devtools/2024/12/25 2:29:27/

基本思路:

  1. 使用EMQX作为Mqtt broker
  2. mqtt-receive-server服务,用于接收设备上报的数据
  3. mqtt-sender-service服务,用于下发数据给设备
  4. KafKa实现数据解耦,mqtt-receive-server服务接收的数据简单处理下直接扔到Kafka中
  5. 云服务各业务系统从KafKa中消费数据,各业务需要下发数据的话,调用mqtt-sender-service接口下发数据给设备

基本流程

在这里插入图片描述

DashBoard 定义认证用户

在这里插入图片描述

定义Mqtt协议主题

// 设备激活
public final static String ACTIVATE = "mqtt/0/1";
// 设备重置
public final static String RESET = "mqtt/0/0";
// 上线
public final static String ONLINE = "mqtt/1/1";
// 下线
public final static String OFFLINE = "mqtt/1/0";
// 上行-设备上报数据到平台
public final static String REPORT = "mqtt/2/1";
// 下行-平台下发数据给设备
public final static String ISSUED = "%s/2/0";

设备认证流程

首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,首次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否合法,不合法将设备踢下线。
设备订阅${clientId}/2/0主题

@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler());client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));}

mqtt-receive-server服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简单处理,转发到KafKa

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
@PostConstruct
public void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);
}
java">@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}

mqtt-sender-service服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子系统使用,用于下发数据给设备

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_sender_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
java">package com.angel.ocean.listener;import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.domain.client.ActivateData;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
@Component
public class UpDataConsumerListener {@Resourceprivate MqttService mqttService;/*** 批量消费*/@KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {try {log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());for (ConsumerRecord<String, String> record : records) {UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);log.info("{}", record.value());handler(data.getTopic(), data.getData());}} catch (Exception e) {log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);} finally {// 手动确认ack.acknowledge();}}private void handler(String topic, String data) {switch (topic) {case MqttTopicConstant.ACTIVATE:activateHandler(data);break;case MqttTopicConstant.RESET:otherHandler(data);break;case MqttTopicConstant.OFFLINE:otherHandler(data);break;case MqttTopicConstant.ONLINE:otherHandler(data);break;case MqttTopicConstant.REPORT:otherHandler(data);break;default:otherHandler(data);}}private void activateHandler(String data) {ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);String clientId = activateData.getClientId();mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");}private void otherHandler(String data) {log.info("{}", data);}}
java">package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;@Slf4j
@RestController
@RequestMapping("/mqtt/server")
public class MqttController {@Resourceprivate MqttClient server;@Resourceprivate MqttService mqttService;/*** 数据下发接口* @param clientId* @param data* @return*/@RequestMapping("/sender")public ApiResult<?> publish(String clientId, String data) {String topic = String.format(MqttTopicConstant.ISSUED, clientId);mqttService.publish(topic, data);if(server.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {server.publish(topic, message);log.info("Message published, topic:{}, data:{}", topic, data);} catch (MqttException e) {log.error("Message publish failed, topic:{}", topic, e);return ApiResult.error();}return ApiResult.success();}log.info("Message publish failed, not online.");return ApiResult.error();}
}

代码验证

场景:设备上报消息,云服务端回复消息给设备; 云服务主动下发数据给设备。

模拟设备上报消息, 接收云平台回复

发了两次:
在这里插入图片描述mqtt-client 本地客户端日志:

在这里插入图片描述
mqtt-receive-server云服务日志:

在这里插入图片描述
mqtt-sender-server云服务日志:

在这里插入图片描述

模拟云平台主动下发数据

在这里插入图片描述mqtt-sender-server云服务主动下发的日志:

在这里插入图片描述mqtt-client数据接收日志:

在这里插入图片描述


http://www.ppmy.cn/devtools/145134.html

相关文章

JavaEE进阶--mybatis使用测试日志参数传递浏览器访问

文章目录 1.项目创建2.mybatis的使用2.1创建初始页面2.2补充yml文件2.3navicate表2.4用户类的编写2.5查询接口2.6运行测试 3.细节说明3.1java开发规范3.2关于包3.3持久层代码 4.测试文件4.1如何生成4.2生成位置4.3补充方法 5.配置mybatis日志6.参数传递6.1单个参数6.2多个参数 …

青少年编程与数学 02-004 Go语言Web编程 09课题、访问数据库

青少年编程与数学 02-004 Go语言Web编程 09课题、访问数据库 一、数据库访问1. 安装数据库驱动2. 导入所需包3. 创建数据库连接4. 初始化数据库连接5. 使用GORM进行数据库操作 二、GORM三、GORM框架与Gin框架结合使用1. 初始化项目和安装依赖2. 配置数据库连接3. 定义数据模型4…

python学opencv|读取图像(十七)认识alpha通道

【1】引言 前序学习进程中&#xff0c;我们已经掌握了RGB和HSV图像的通道拆分和合并&#xff0c;获得了很多意想不到的效果&#xff0c;相关链接包括且不限于&#xff1a; python学opencv|读取图像&#xff08;十二&#xff09;BGR图像转HSV图像-CSDN博客 python学opencv|读…

Django 模型管理器中自定义方法和添加导出功能

在 Django 中,模型管理器提供了一种扩展模型行为的方式。您可以重写或添加自定义方法,以满足特定的业务需求。在本文中,我们将探讨如何在模型管理器中自定义方法,并提供一些常见的用例。此外,我们还将介绍如何在管理员界面中添加导出数据为 CSV 文件的功能。 什么是模型管理器…

k8s-metrics-server

一&#xff1a;拉取镜像 直接从阿里云的镜像仓库拉取&#xff0c;国外的镜像仓库比较慢。。。。 docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/metrics-server:v0.7.2 打包镜像&#xff0c;之后传到k8s的服务器上面 docker save -o metrics-server.ta…

机试题——疯长的草

题目描述 将种不同的草随机种在一块广漠无垠的二维平面上&#xff08;直角坐标系内&#xff09;&#xff0c;给定二维数组 points 表示第 0 天所有草的初始位置&#xff0c;第 i 项 points [i][xi, yi] 表第 0 天草 i 在点 [xi, yi]。每天&#xff0c;被草覆盖的点会向外蔓延到…

Day13 用Excel表体验梯度下降法

Day13 用Excel表体验梯度下降法 用所学公式创建Excel表 用Excel表体验梯度下降法 详见本Day文章顶部附带资源里的Excel表《梯度下降法》&#xff0c;可以对照表里的单元格公式进行理解&#xff0c;还可以多尝试几次不同的学习率 η \eta η来感受&#xff0c;只需要更改学习率…

【openwrt】openwrt NAT64 NAT46实现简介

NAT64 & NAT46 在 OpenWrt 上实现 NAT46 和 NAT64 可以通过安装和配置相应的软件包来完成。 NAT64 实现步骤 安装必要的软件包 你需要安装 tayga 和 bind 或 dnsmasq 以支持 NAT64 和 DNS64。 使用以下命令安装: opkg update opkg install tayga opkg install bind-s…