TDengine 集成 EMQX 通过规则引擎实现设备数据直接入库

news/2024/10/30 9:31:06/

背景

曾使用过 IoTDB 自带的 MQTT Broker 实现了设备数据入库,那么使用 TDengine 时,我们可以借助 EMQX (一款优秀的国产开源 MQTT Broker )的规则引擎结合 TDengine 的 RESTful API 完成设备数据的路由与入库。

  • 用到的工具
  1. TDengine RESTful API
  2. EMQX 规则引擎
  3. TDengine GUI图形化管理工具
  4. Node.js下的MQTT客户端
  5. 虚拟机CentOS操作系统
  • 版本信息
  1. TDengine: 2.2.0.0
  2. EMQX: 4.2.4
  3. Node.js: 12.16.1
  4. CentOS: 7

TDengine创建数据库表

create database if not exists ok;create stable if not exists ok.power(ts timestamp, voltage int, current float, temperature float) tags(sn int, city nchar(64), groupid int);create table if not exists ok.device1 using ok.power tags(1, "太原", 1);
create table if not exists ok.device2 using ok.power tags(2, "西安", 2);insert into ok.device1 values("2021-09-04 21:03:38.734", 1, 1.0, 1.0);
insert into ok.device2 values("2021-09-04 21:03:40.734", 2, 2.0, 2.0);

初始数据如下:

2021-09-23-InitialData.jpg

EMQX创建资源

所谓的资源就是将要连接的数据库、中间件等,这里便是 TDengine 的连接,通过其 RESTful API 建立连接,在规则引擎的动作响应中会用到这里的资源。

2021-09-23-TDengineResource.jpg

2021-09-23-ResourceView.jpg

其中关于头信息中的 Authorization 通过以下方式获得。

# 获取token
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl hadoop1:6041/rest/login/root/taosdata
{"status":"succ","code":0,"desc":"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04"}# 测试:附加自定义token在头信息,正常响应
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from ok.power' hadoop1:6041/rest/sql
{"status":"succ","head":["ts","voltage","current","temperature","sn","city","groupid"],"column_meta":[["ts",9,8],["voltage",4,4],["current",6,4],["temperature",6,4],["sn",4,4],["city",10,64],["groupid",4,4]],"data":[["2021-09-04 21:03:38.734",1,1.00000,1.00000,1,"太原",1],["2021-09-04 21:03:40.734",2,2.00000,2.00000,2,"西安",2]],"rows":2}

EMQX创建规则

  • 创建规则:这里直接从主题device/sn中获取payload,结果命名为power

2021-09-23-RuleContent.jpg

  • 测试规则:模拟一条数据,经过测试,定义的规则成功命中。

2021-09-23-RuleTest.jpg

EMQX创建动作响应

当命中数据后,我们的目标是将其存入数据库,那么我们一开始定义的 TDengine 资源就派上用场了。

  1. Action选择Data to Web Server表示我们要将数据发送至Web服务(即 TDengine 的 RESTful API
  2. Resource选择我们创建好的资源
  3. 最后填写Payload Template,写入数据表的SQL语句,这里支持插值:insert into ok.device${power.sn} values ('${power.ts}', ${power.voltage}, ${power.currente}, ${power.temperature})

2021-09-23-Action.jpg

Node.js模拟MQTT客户端

这里通过 Node.js 模拟一个设备,向主题 device/sn 随机发布数据,完成数据上报,当然也可以借助其他客户端来实现。

2021-09-23-NodeClient.jpg

EMQX查看规则引擎Metrics

点击 Rule 菜单下的规则引擎 ID ,可查看已配置的规则详情,还可以看到多少消息被规则命中的度量信息(需刷新页面)。

2021-09-23-Metrics.jpg

TDengine客户端查看数据

数据库中确认写入两条新数据:

2021-09-23-FinalData.jpg

规则引擎扩展

开源版的 EMQX Broker 除了全面支持 MQTT5 新特性、多协议支持外,更强大的地方在于其围绕 MQTT 周边提供了一系列的 WebHook 、 HTTP API 接口以及最为核心的规则引擎。上面我们只是通过主题选择了数据进行规则匹配,其实规则引擎还可以结合一系列的内部事件,编写规则时以$开头,包括客户端连接事件、断开事件、消息确认事件、消息发布事件、订阅事件、取消订阅事件等。

2021-09-23-RuleAdvanced.jpg
EMQX Broker 一开始的定位就是物联网消息中间件,目前开源版本功能已经非常强大,而企业版本与Cloud版本更是提供了高阶功能,全托管、更稳定、更可靠,技术支持更及时。以下是我试用的Cloud版本。

2021-09-23-Cloud.jpg

可能遇到的问题

  • 端口开放问题
    因为通过宿主机访问虚拟机,所以记得关闭防火墙或者开放对应的端口,这里涉及到的端口有:
  1. 6041:TDengine的RESTful API默认端口
  2. 1883:EMQX的MQTT默认端口
  3. 18083:EMQX的Dashboard默认端口
# 关闭防火墙
[root@hadoop1 ~]# systemctl stop firewalld.service# 放行端口
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 6041 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 1883 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 18083 -j ACCEPT
  • 主题名称不匹配导致规则无法命中

作为约定俗成的实践,一般在编码时 MQTT 的主题不以 / 开头,即写作 device/sn ,而不是 /device/sn 。

刚开始我在 MQTT 客户端发送数据时主题名为 /device/sn ,而规则引擎中的主题为 device/sn ,导致无法匹配。

  • SQL模板中的字符串

这里的ts以字符串形式发送,因此需要将插值用引号括起来:‘${power.ts}’。否则 TDengine 日志报错:

[root@hadoop1 taos]# tailf ./log/taosdlog.0
09/23 08:42:11.707621 00001702 TSC ERROR 0x8e async result callback, code:Syntax error in SQL
09/23 08:42:11.707675 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, user:root, query error, code:Syntax error in SQL, sqlObj:0x7f5f74000c10
09/23 08:42:11.725687 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, code:400, error:Syntax error in SQL
  • 规则引擎的Metrics计数与实际发送数据不符

这与客户端发送数据指定的 QoS 相关,如果 QoS = 1 ,则MQTT协议的重发机制可能导致数据重复发送。


http://www.ppmy.cn/news/104151.html

相关文章

【华为OD机试】分糖果【2023 B卷|100分】

【华为OD机试】-真题 !!点这里!! 【华为OD机试】真题考点分类 !!点这里 !! 题目描述 小明从糖果盒中随意抓一把糖果,每次小明会取出一半的糖果分给同学们。 当糖果不能平均分配时,小明可以选择从糖果盒中(假设盒中糖果足够) 取出一个糖果或放回一个糖果。 小明最少需…

STM32F4_常用存储器简介

目录 1. 存储器的种类 单片机和电脑一样,其内核分别为ARM和CPU(CPU通常是不具备存储功能的),内核是用来进行数据的运算和处理的。内核处理数据的来源就是存储器。 1. 存储器的种类 RAM存储器: RAM是 “Random Access Memory”的缩写&#x…

SDUT数据库原理——第七章作业(参考答案)

第7章 作业: 简述数据库需求分析阶段的设计目标和需要调查的内容。答:需求分析阶段的设计目标是通过详细调查现实世界要处理的对象(组织、部门、企业等),充分了解原系统(手工系统或计算机系统)工作概况、业务逻辑及相关社会环境现状,明确用户的各种需求,然后在此基础上…

C++ Primer Plus 第一,二章笔记

目录 第一章笔记 1、C简介 2、C简史 3、可移植性和标准 第二章笔记 1. 进入C 1.3、预处理器和头文件 1.4、名称空间(namespace) 1.5、使用cout进行C的输出 2. C语句 3. 其他C语句 4. 函数 第一章笔记 1、C简介 C融合了3种不同的编程方式&a…

Spring Bean

Spring Bean 哇塞大嘴好帅​ Spring Bean就是容器管理的对象,他是应用程序的基本构造块(最基本的元素),可以是任何Java对象。SPring容器负责实列化,配置和管理这些Bean对象。 ​ Bean是通过配置文件(XML文…

面试题 01.03. URL化

URL化 URL化。编写一种方法,将字符串中的空格全部替换为%20。假定该字符串尾部有足够的空间存放新增字符,并且知道字符串的“真实”长度。(注:用Java实现的话,请使用字符数组实现,以便直接在数组上操作。&…

stackqueue

这篇主要讲栈(stack)和队列(queue),实际要学习的数据结构有三个:stack、queue、priority_queue 这些数据结构已经不属于容器了,而是容器适配器。 list的第二个参数是空间配置器,支持申请空间;而list和queue的第二个参…

目标检测复盘 -- 6. YOLOv4

Backbone YOLOv4的骨干是CSPDarknet53 CSP结构的作用:1. 增强CNN的学习能力 2. 移出计算瓶颈 3. 减少内存开销 CSP首先将输入的特征层分成两个部分,这里以densenet为例,part2分支经过denseblock后,进过一个transition&#xff0c…