使用MASA Stack+.Net 从零开始搭建IoT平台 第五章 使用时序库存储上行数据

news/2024/11/16 7:28:50/

目录

  • 前言
  • 分析
  • 实施步骤
    • 时序库的安装
    • 解决playload没有时间戳问题
    • 代码编写
  • 总结


前言

我们可以将设备上行数据存储到关系型数据库中,我们需要两张带有时间戳的表(最新数据表历史数据表),历史数据表存储所有设备上报的数据,最新数据表需要存储设备最新一条上报数据,这条最新数据相当于设备的当前状态。然后展示的时候只展示最新一条数据的状态,报表查询可以按照设备id和时间从历史数据表查询汇总。
这样是可以的,但是我们的最新数据表需要被频繁的更新,数据量少的时候没问题。但数据量大,并发高的时候就会出现问题。
1、存储成本:数据不会被压缩,导致占用存储资源。
2、维护成本:单表数据量太大时,需要人工分库分表。
3、写入性能:单机写入吞吐量难以满足大量上行数据的写入需求,数据库存在性能瓶颈。
4、查询性能:数据量太大导致查询性能受到影响。

分析

我们可以采用时序库来解决上述问题,首先来了解一下什么是时序数据。时序数据是按照时间维度进行索引的数据,它记录了某个被测量实体在一定时间范围内,每个时间点上的一组测试值。传感器上传的室内PM2.5和甲醛数据、净水器传感器当前的TDS值、计算机系统的监控数据等,都属于时序数据,时序数据有如下特点:
1、数据量较大,写入操作是持续且平稳的,而且写多读少。
2、只有写入操作,几乎没有更新操作,比如去修改传感器的历史数据,是没有意义的。
3、没有随机删除,即使删除也是按照时间范围进行删除。删除某一个时间点的数据没有意义,但是删除2年前的数据是有意义的。
4、数据实时性和时效性强,数据随着时间的推移不断追加,旧数据很快失去意义。
5、大部分以时间和实体为维度进行查询,很少以测试值为维度查询,比如用户会查询某个时间段的温度数据,但是很少会去查询温度高于多少度的数据记录。
显然IoT的业务是符合使用时序库的场景的。
序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB数据库进行演示。
时序数据库有如下几个概念。
1.Metric:度量,相当于关系型数据库中的表(table)。
2.Data Point:数据点,相当于关系型数据库的中的行(row)。
3.Timestamp:时间戳,数据点生成时的时间戳。
4.Field:测量值,比如温度和湿度、PM2.5等。
5.Tag:标签,用于标识数据点,通常用来标识数据点的来源,比如温度和湿度数据来自哪个房间,哪个设备,可以当作关系型数据库表的主键。

如下图,度量为 Wind,每一个数据点都具有一个 timestamp,两个 field:direction 和 speed,两个 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 号码为 95D8-7913 的设备,属性城市是上海。随着时间的变化,风向和风速都发生了改变,风向从 23.4 变成 23.2;而风速从 3.4 变成了 3.3。

图片来自网络

实施步骤

时序库的安装

安装参考官方文档,为了方便,我这里采用docker安装

docker run --name influxdb -p 8086:8086 influxdb:2.7.0

https://docs.influxdata.com/influxdb/v2.7/install/

我们打开 服务器ip:8086 可以看到它自带的管理界面,我们首先创建用户名密码,组织、以及Bucket的名称。
这里的bucket “IoTDemos” 相当于数据库的名称

我们记录一下这个Token,一会连接influxdb需要,相当于账号密码

解决playload没有时间戳问题

对于时序库来讲,时间戳是非常重要的,但是我们拿到的playload并没有时间戳(MQTTNet包我没有找到拿时间戳的方法)。
所以我们需要在mqtt上想办法,让设备上报数据的时候,mqtt自动添加时间戳到playload中。
1、我们在数据集成->规则中新建一条规则名称为"Add_Ts"。SQL编写如下

SELECT*,now_timestamp('millisecond') as payload.Ts
FROM"topic/#"

topic/# 代表消息发布到"topic/#"主题的事件
now_timestamp函数返回当前时间的 Unix 时间戳,我们将时间戳写入到payload的Ts属性中,关于更多内置SQL函数,请参考官方文档

https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html

2、我们打开下面的调试,模拟设备上报一条数据,可以看到这条规则帮我们加入了时间戳。

3、然后我们还需要处理添加了时间戳的处理结果,我们在右侧添加一个动作,选择消息重发布,将刚刚添加了时间戳的消息重发到一个新的Topic上,我们使用topic/dp,并在playload中添加${payload},这样我们就修改了playload中的信息,添加了我们需要的时间戳,当然,我们Hub订阅的消息也需要对应修改,添加/dp后缀。

4、首先我们先修改MASA.IoT.Hub的配置文件,Topic添加"/dp"后缀

  "MqttSetting": {
..."Topic": "$share/IotHub/topic/+/dp"},

5、CallbackAsync中,因为我们设备名称是从Topic截取的,也要对应修改一下。

    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e){var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);Console.WriteLine(deviceDataPointStr);var pubSubOptions = new PubSubOptions{//修改一下获取设备名称的方式DeviceName = e.ApplicationMessage.Topic[6..^3],Msg = deviceDataPointStr,PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),TrackId = Guid.NewGuid()};                            
...}

代码编写

解决完时间戳的问题,我们就可以编写代码向InfluxDB中写入数据了,我们首先在Infrastructure文件夹下创建ITimeSeriesDbClient接口和TimeSeriesDbClient类,使用接口也方便我们日后更换其他的时序库。
这里使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs

namespace MASA.IoT.Core.Infrastructure
{public interface ITimeSeriesDbClient{bool WriteMeasurement<T>(T measurement);}
}

TimeSeriesDbClient.cs

using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;namespace MASA.IoT.Core.Infrastructure
{public class TimeSeriesDbClient : ITimeSeriesDbClient{private readonly InfluxDBClient _client;private readonly string _bucket;private readonly string _org;private readonly AppSettings _appSettings;public TimeSeriesDbClient(IOptions<AppSettings> settings){_appSettings = settings.Value;_org = _appSettings.InfluxDBSetting.Org;_bucket = _appSettings.InfluxDBSetting.Bucket;_client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);}public bool WriteMeasurement<T>(T measurement){try{using var writeApi = _client.GetWriteApi();writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);return true;}catch (Exception ex){Console.WriteLine(ex.Message);return false;}}}
}

这里使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)来构造InfluxDBClient。
Token就是我们创建Bucket过程中保存的Token
Url是我们InfluxDB的访问地址:http://127.0.0.1:8086
写入的方法WriteMeasurement中我们通过**_client.GetWriteApi创建一个写入的api然后直接将我们要写入的泛型实体写入,第二个可选参数代表写入精度,这里我们使用WritePrecision.Ms**
我们在DeviceHandler.cs中注入ITimeSeriesDbClient 并添加一个WriteMeasurementAsync方法,在方法中我们先根据设备名称获取产品,如果识别产品ID为10001(空净产品),
那么我们就写入数据到Measurement:AirPurifierDataPoint
Measurement相当于数据库的表。
MeasurementColumn特性都是InfluxDB.Client.Core提供的,可以用来标识TagTimestamp

using InfluxDB.Client.Core;
using Newtonsoft.Json;namespace MASA.IoT.Core.Contract
{[Measurement("AirPurifierDataPoint")]public class AirPurifierDataPoint{/// <summary>/// 设备名称/// </summary>[Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }/// <summary>/// 产品ID/// </summary>[Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }/// <summary>/// Pm2.5/// </summary>[Column("PM_25")] public double? Pm_25 { get; set; }/// <summary>/// 温度/// </summary>[Column("Temperature")] public double? Temperature { get; set; }/// <summary>/// 湿度/// </summary>[Column("Humidity")] public double? Humidity { get; set; }/// <summary>/// 时间戳/// </summary>[JsonProperty(propertyName: "Ts")][Column(IsTimestamp = true)] public long Timestamp { get; set; }}
}
    public class DeviceHandler : IDeviceHandler{private readonly MASAIoTContext _ioTDbContext;private readonly IMqttHandler _mqttHandler;private readonly ITimeSeriesDbClient _timeSeriesDbClient;public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient){_ioTDbContext = ioTDbContext;_mqttHandler = mqttHandler;_timeSeriesDbClient = timeSeriesDbClient;}/// <summary>/// 写入数据/// </summary>/// <typeparam name="T"></typeparam>/// <param name="pubSubOptions"></param>/// <returns></returns>public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions){var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking().FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);if (device != null && device.ProductInfo.ProductCode == "10001")  //空气净化器产品{var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);airPurifierDataPoint.ProductId = device.ProductInfoId;return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);}return false;}

除了WriteMeasurement方法之外,还提供了很多其他方法,如WritePoint,和批量写入的方法,可自行测试。
#测试
我们启动项目,通过MQTTX向**“topic/284202304230001”**上报一条数据

{"DeviceName":"284202304230001","Pm_25":100,"Temperature":25,"Humidity":50
}

我们在influxDB的管理工具中使用Data Explorer,使用如下的flux query查询语句,即可查出5分钟之内的数据,注意,这里的时间是UTC时间

如果想显示北京时区方便调试,可以在后面添加**|> timeShift(duration: 8h)**

from(bucket: "IoTDemos") 
|> range(start:-5m)


关于flux查询语法

https://docs.influxdata.com/flux/v0.x/

总结

本节我们简单介绍了开源时序数据库influxDB的安装。
我们借助InfluxDB.Client库完成设备从上报到时序库数据存储的全过程,下一节我们介绍从时序库查询数据。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos


http://www.ppmy.cn/news/583637.html

相关文章

【burpsuite安全练兵场-服务端8】文件上传漏洞-7个实验(全)

前言&#xff1a; 介绍&#xff1a; 博主&#xff1a;网络安全领域狂热爱好者&#xff08;承诺在CSDN永久无偿分享文章&#xff09;。 殊荣&#xff1a;CSDN网络安全领域优质创作者&#xff0c;2022年双十一业务安全保卫战-某厂第一名&#xff0c;某厂特邀数字业务安全研究员&…

kali实施文件上传漏洞攻击:

3.1 问题 1&#xff09;DVWA搭建在Win2008虚拟机&#xff08;192.168.111.142&#xff09;2&#xff09;在宿主机访问DVWA&#xff0c;DVWA级别分别设置Low、Medium3&#xff09;利用文件上传漏洞&#xff0c;使用kali&#xff08;192.168.111.142&#xff09;攻击 3.2 步骤 …

永恒之蓝漏洞复现(ms17-010)

目录 前言 基本命令 ms17-010漏洞复现 漏洞介绍 影响版本 漏洞复现 前言 Metasploit Framework(MSF)是一款开源安全漏洞检测工具&#xff0c;附带数千个已知的软件漏洞&#xff0c;并保持持续更新。Metasploit可以用来信息收集、漏洞探测、漏洞利用等渗透测试的全流程&a…

2021-08-29 网安实验-Linux系统渗透测试之Metasploit攻击linux实例

一:扫描端口 nmap -sT -v 10.1.1.136 扫描漏洞主机开放端口信息,扫描结果如下: 其中6667号端口开放IRC服务,可以利用UnrealIRCD IRC软件的漏洞来攻击漏洞主机。 二:UnrealIRCd后门漏洞 1.输入msfconsole命令 2.运行“search unrealircd”命令 3.运行“use ex

使用kali系统中的nmap工具扫描漏洞

如何使用kali系统中的nmap工具扫描网站漏洞 1.我们打开kali在终端输入nmap可以看到nmap工具的所有参数如下图2.现在我们来使用nmap工具中的这个nmap www.xxx.com命令来扫描网站开了那些端口如下图如图我们知道了网站开了那些端口现在我们运行这个nmap -sv -p端口 --scriptvuln…

七、Kali Linux 2 渗透攻击

渗透攻击 如果把目标系统上的漏洞比作阿克琉斯的脚踵&#xff0c;那么漏洞渗透工具就是帕里斯手中的利箭。 在第6章中&#xff0c;我们介绍了如何使用远程控制工具&#xff0c;而在这一章中我们将会介绍如何将远程控制软件的被控端发送到目标主机上。而这个过程最为神奇的方法…

Kali常见攻击手段

Kali常见攻击手段 注意:仅用于教程和科普&#xff0c;切勿做违法之事&#xff0c;否则后果自负 1 网络攻击手段 请正确使用DDos和CC攻击&#xff0c;不要用来做违反当地法律法规的事情&#xff0c;否则后果自负 使用之前kali需要能够上网 参考:kali安装 1.1 DDos攻击 打开命令…

【愚公系列】2023年04月 文件上传渗透测试之绕过黑名单检查(::$DATA、点+空格+点)

文章目录 前言一、::$DATA绕过二、点+空格+点前言 文件上传漏洞是一种安全漏洞,指攻击者通过网站上传功能上传恶意文件来攻击网站的漏洞。攻击者可以通过利用这种漏洞上传恶意文件来执行代码和命令,进而控制网站、窃取敏感信息等。常见的文件上传漏洞主要存在于没有健全的文…