C# 关于实现保存数据以及数据溯源推送

ops/2024/12/13 0:39:47/

前言

实现了一个数据接收、存储和推送的功能
首先定义我们数据存储的格式(可根据自己的需求定义格式):
数据切割符号:**$是区分数据其他数据的划分
数据内容切割号:
|**是区分时间戳内容数据的划分
以下是我存储的文本格式Data.log或者Data.txt

$2024-12-07 16:26:53.799|数据1
$2024-12-07 16:26:54.920|数据2
$2024-12-07 16:26:55.640|数据3
...
...

采集与推送:

以下是具体的代码内容

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Core;
using Sunny.UI;
using Sunny.UI.Win32;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace DataAcquisitionModule.Server
{public class DataReceiver{private static readonly ILog logger = LogManager.GetLogger(typeof(DataReceiver));private static readonly object lockObject = new object();private readonly string _filePath;private readonly ConcurrentQueue<string> _dataQueue;private readonly CancellationTokenSource _cancellationTokenSource;private bool _isRunning;public DataReceiver(string filePath){_filePath = filePath;_dataQueue = new ConcurrentQueue<string>();_cancellationTokenSource = new CancellationTokenSource();_message = new MessageLog();// 启动数据存储任务_ = StartSavingDataAsync(_cancellationTokenSource.Token);}//数据入列,多肽1public void HandleMsg(byte[] msg){var data = Encoding.UTF8.GetString(msg);_dataQueue.Enqueue(data);}//数据入列,多肽2public void HandleMsg(string topic, string msg){string strMsg = $"{topic}#{msg}";_dataQueue.Enqueue(strMsg);}private async Task StartSavingDataAsync(CancellationToken cancellationToken){while (!cancellationToken.IsCancellationRequested){if (_dataQueue.TryDequeue(out var data)){await SaveDataToFileAsync(data);}else{// 如果队列为空,短暂等待await Task.Delay(100, cancellationToken);}}}private async Task SaveDataToFileAsync(string data){try{logger.Info($"保存数据: {data}");await File.AppendAllTextAsync(_filePath, $"${DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}|{data}\n");}catch (Exception ex){logger.Error("保存数据到文件时发生错误", ex);}}public async Task LoadAndProcessDataAsync(Action<string> callAction){_isRunning = true;while (!_cancellationTokenSource.Token.IsCancellationRequested && _isRunning){List<string> lines = new List<string>();lock (lockObject){if (File.Exists(_filePath)){// 一次性读取整个文件内容string fileContent = await File.ReadAllTextAsync(_filePath);lines.AddRange(fileContent.Split('$', StringSplitOptions.RemoveEmptyEntries).Select(part => part.Trim()));//RemoveEmptyEntries:返回数组元素移除空字符串元素(不包含空字符串元素);}}if (lines.Count > 0){DateTime startTime = DateTime.Parse(lines.First().Split('|')[0]);DateTime timestamp = new DateTime();TimeSpan delay = new TimeSpan(0);string[] strData;foreach (var line in lines){if (!_isRunning){break;}strData = line.Split('|');if (strData.Length < 2 || string.IsNullOrEmpty(strData[1])){continue;}timestamp = DateTime.Parse(strData[0]);delay = timestamp - startTime;if (delay.TotalMilliseconds > 0){await Task.Delay(delay);}startTime = timestamp;callAction?.Invoke(strData[1]);logger.Info($"数据推送: {line}");}callAction?.Invoke("推送完毕!!!");break;}await Task.Delay(10000); // 每10秒检查一次}}public void StopPushData(){_isRunning = false;}public void StopReceiving(){_cancellationTokenSource.Cancel();}}
}

注意:

  1. 优化文件读取和处理
    • 减少文件读取次数:避免频繁读取文件,特别是在文件较大时,可以考虑使用内存缓存。
    • 优化文件写入:使用 File.AppendAllText 方法代替 StreamWriter,减少文件打开和关闭的开销。
  2. 异步处理
    • 异步文件操作:使用 File.ReadAllLinesAsync 和 File.WriteAllLinesAsync 方法进行异步文件操作,减少阻塞。
  3. 错误处理
    • 增加异常处理:在文件操作和网络通信中增加异常处理,确保程序的稳定性。
  4. 日志记录
    • 日志记录:使用 log4net 或其他日志框架记录关键操作的日志,方便调试和维护。
  5. 代码结构优化
    • 分离关注点:将数据接收、存储和推送逻辑分离到不同的类或方法中,提高代码的可读性和可维护性。
    示例:数据采集和调用
    假设我们有一个简单的控制台应用程序,用于启动 DataReceiver 并模拟数据的接收和处理。

1. 创建 DataReceiver 实例

首先,我们需要创建一个 DataReceiver 实例,并指定数据存储的文件路径。

2. 模拟数据接收

我们可以模拟从外部源(如 MQTT 消息队列)接收到的数据,并调用 HandleMsg 方法将其添加到队列中。

3. 处理和推送数据

启动一个任务来处理和推送数据,调用 LoadAndProcessDataAsync 方法。

示例代码

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Config;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;namespace DataAcquisitionModule
{class Program{private static readonly ILog logger = LogManager.GetLogger(typeof(Program));static async Task Main(string[] args){// 配置 log4netXmlConfigurator.Configure(new FileInfo("log4net.config"));// 指定数据存储文件路径string filePath = "data.log";// 创建 DataReceiver 实例DataReceiver dataReceiver = new DataReceiver(filePath);// 模拟数据接收SimulateDataReception(dataReceiver);// 启动数据处理和推送任务await dataReceiver.LoadAndProcessDataAsync(data => string[] msg = data.Split('#');if (msg.Length == 2){mqttServer.PublishAllClientTopicPayload(msg[0], Encoding.UTF8.GetBytes(msg[1]));Console.WriteLine($"推送的topic: {msg[0]}");Console.WriteLine($"处理具体的数据: {Encoding.UTF8.GetBytes(msg[1])}");});// 模拟停止数据接收和推送Console.WriteLine("按任意键停止数据接收和推送...");Console.ReadKey();dataReceiver.StopPushData();dataReceiver.StopReceiving();}private static void SimulateDataReception(DataReceiver dataReceiver){// 模拟从外部源接收到的数据Task.Run(async () =>{for (int i = 0; i < 10; i++){string topic = "Topic" + i;string message = "Message" + i;dataReceiver.HandleMsg(topic, message);await Task.Delay(1000); // 模拟每秒接收一条数据}});}}
}

解释

  1. 配置 log4net:
    • 使用 XmlConfigurator.Configure 方法加载 log4net 配置文件,确保日志记录正常工作。
  2. 创建 DataReceiver 实例:
    • 指定数据存储文件路径 data.log,并创建 DataReceiver 实例。
  3. 模拟数据接收:
    • 使用 SimulateDataReception 方法模拟从外部源接收到的数据。这里使用一个 Task 来模拟每秒接收一条数据,并调用 HandleMsg 方法将数据添加到队列中。
  4. 启动数据处理和推送任务:
    • 调用 LoadAndProcessDataAsync 方法启动数据处理和推送任务。这里使用 Console.WriteLine 方法来模拟数据处理操作。
  5. 停止数据接收和推送:
    • 按任意键停止数据接收和推送任务,调用 StopPushDataStopReceiving 方法。

运行结果

运行上述代码后,程序会模拟接收数据并将其存储到 data.log 文件中。然后,程序会读取文件中的数据并按时间顺序推送,每条数据的推送时间间隔与实际接收时间间隔一致。
通过这种方式,验证 DataReceiver 类的功能,其实大家可以根据实际需求进行调整和扩展。我这边只是简单演示


http://www.ppmy.cn/ops/141383.html

相关文章

MySQL | 尚硅谷 | 第16章_变量、流程控制与游标

MySQL笔记&#xff1a;第16章_变量、流程控制与游标 文章目录 MySQL笔记&#xff1a;第16章_变量、流程控制与游标第16章_变量、流程控制与游标 1. 变量1.1 系统变量1.1.1 系统变量分类1.1.2 查看系统变量 1.2 用户变量1.2.1 用户变量分类1.2.2 会话用户变量 1.2.3 局部变量1.2…

基于turtle库的圣诞树的绘制

2024年的圣诞节快要到来了&#xff0c;内心无比的happ呀&#xff0c;以下是使用 Python 的 turtle 模块绘制一个美丽的圣诞节贺卡的代码示例。这个贺卡包含一棵装饰精美的圣诞树、一颗闪亮的星星以及“圣诞快乐”的祝福语。你可以根据需要调整颜色和尺寸以达到理想的效果。 完…

什么是封装性?C++中如何实现封装?封装性的好处是什么?

封装性是面向对象编程&#xff08;OOP&#xff09;中的一个重要概念&#xff0c;它指的是将对象的状态&#xff08;属性&#xff09;和行为&#xff08;方法&#xff09;隐藏在对象内部&#xff0c;只通过公共接口与外部进行交互。这种隐藏机制有助于保护对象的内部状态不被外部…

CSS系列(7)-- 背景与边框详解

前端技术探索系列&#xff1a;CSS 背景与边框详解 &#x1f3a8; 致读者&#xff1a;探索视觉设计的艺术 &#x1f44b; 前端开发者们&#xff0c; 今天我们将深入探讨 CSS 背景与边框&#xff0c;学习如何创建丰富多彩的视觉效果。 背景效果详解 &#x1f680; 基础背景属…

使用lvgl9 的 Chart (lv_chart) 控件指南

文章目录 前言主体介绍1. **图表概述**2. **样式部分**样式定义 3. **图表功能**图表类型数据系列数据修改更新模式数据点个数轴范围分隔线光标 4. **事件处理**5. **完整示例代码** 总结 前言 图表是数据可视化的重要工具&#xff0c;lv_chart 是 LittlevGL 提供的一个灵活的…

uniapp uni-table最简单固定表头

需求&#xff1a;固定表头数据&#xff0c;在网上找了半天&#xff0c;啥都有&#xff0c;就是一直实现不了&#xff0c;最后更改代码实现 1.效果 2.主要代码讲解完整代码 表格的父级一定要设置高度&#xff0c;不然会错位&#xff0c;我看网上说设置position&#xff1a;fixed…

解决同一IP访问网站请求过多限制的方法

一.网站限制IP的原因和影响 1.网站限制IP原因&#xff1a;保护服务器资源&#xff0c;防止恶意攻击 2.防止过度爬取数据&#xff0c;保护其服务器资源免受恶意攻击 二.对用户访问影响&#xff1a;无法正常访问网站&#xff0c;业务受阻 1.解决方法&#xff1a;降低IP访问速率…

数仓技术hive与oracle对比(五)

附录说明 附录是对测试过程中涉及到的一些操作进行记录和解析。 oracle清除缓存 alter system flush shared_pool; 将使library cache和data dictionary cache以前保存的sql执行计划全部清空&#xff0c;但不会清空共享sql区或者共享pl/sql区里面缓存的最近被执行的条目。刷…