如何兼顾性能+实时性处理缓冲数据?

news/2024/11/8 17:00:44/

我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10
  • interval:两次处理处理的最长间隔,我们设置为5秒
var batcher = new Batcher<string>(processor:Process,batchSize:10,interval: TimeSpan.FromSeconds(5));
var random = new Random();
while (true)
{var count = random.Next(1, 4);for (var i = 0; i < count; i++){batcher.Add(Guid.NewGuid().ToString());}await Task.Delay(1000);
}static void Process(Batch<string> batch)
{using (batch){Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");}
}

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
{private bool _isDisposed;private int? _count;private readonly T[] _data;private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();public int Count{get{if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));if(_count.HasValue) return _count.Value;var count = 0;for (int index = 0; index < _data.Length; index++){if (_data[index] is  null){break;}count++;}return (_count = count).Value;}}public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));public void Dispose(){_pool.Return(_data, clearArray: true);_isDisposed = true;}public IEnumerator<T> GetEnumerator() => new Enumerator(this);IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);private void EnsureNotDisposed(){if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));}private sealed class Enumerator : IEnumerator<T>{private readonly Batch<T> _batch;private readonly T[] _data;private int _index = -1;public Enumerator(Batch<T> batch){_batch = batch;_data = batch._data;}public T Current{get { _batch.EnsureNotDisposed(); return _data[_index]; }}object IEnumerator.Current => Current;public void Dispose() { }public bool MoveNext(){_batch.EnsureNotDisposed();return ++_index < _data.Length && _data[_index] is not null;}public void Reset(){_batch.EnsureNotDisposed();_index = -1;}}
}

三、感知数据处理的时机:BatchChangeToken

Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。

internal sealed class BatchChangeToken : IChangeToken
{private readonly IChangeToken _innerToken;private readonly int _countThreshold;private readonly CancellationTokenSource _expirationTokenSource;private readonly CancellationTokenSource _countTokenSource;private int _counter;public BatchChangeToken(int countThreshold, TimeSpan timeThreshold){_countThreshold = countThreshold;_countTokenSource = new CancellationTokenSource();_expirationTokenSource = new CancellationTokenSource(timeThreshold);var countToken = new CancellationChangeToken(_countTokenSource.Token);var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token);_innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken });}public bool HasChanged => _innerToken.HasChanged;public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks;public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s =>{callback(s);_countTokenSource.Dispose();_expirationTokenSource.Dispose();}, state);public void Increase(){Interlocked.Increment(ref _counter);if (_counter >= _countThreshold){_countTokenSource.Cancel();}}
}

上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。

四、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。通过Add方法接收的数据存储在_data字段返回的数组上,它时通过Batch<T>的静态方法CreatePooledArray提供的。我们使用字段_index表示添加数据在_data数组中存储的位置,并使用InterLocked.Increase方法解决并发问题。

public sealed class Batcher<T> : IDisposable where T : class
{private readonly Action<Batch<T>> _processor;private T[] _data;private BatchChangeToken _changeToken = default!;private readonly int _batchSize;private int _index = -1;private readonly IDisposable _scheduler;public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval){_processor = processor ?? throw new ArgumentNullException(nameof(processor));_batchSize = batchSize;_data = Batch<T>.CreatePooledArray(batchSize);_scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange);void OnChange(){var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize));if (data[0] is not null){Interlocked.Exchange(ref _index, -1);_ = Task.Run(() => _processor.Invoke(new Batch<T>(data)));}}}public void Add(T item){if (item is null) throw new ArgumentNullException(nameof(item));var index = Interlocked.Increment(ref _index);if (index >= _batchSize){SpinWait.SpinUntil(() => _index < _batchSize - 1);Add(item);}_data[index] = item;_changeToken.Increase();}public void Dispose() => _scheduler.Dispose();
}

在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组将字段 ­_data引用的数组替换下来,并将其封装成Batch<T>对象进行处理(如果数据存在)。于此同时,表示添加数据存储索引的_index恢复成-1。Add方法在对_index做自增操作后,如果发现累积的数据量达到阈值,需要等待数据处理完毕。由于数据处理以异步的方式处理,这里的耗时时很低的,所以我们这里选择了自旋的方式等待它完成。

 


http://www.ppmy.cn/news/339564.html

相关文章

计算两个向量的外积numpy.outer()

【小白从小学Python、C、Java】 【等级考试500强双证书考研】 【Python-数据分析】 计算两个向量的外积 numpy.outer() 以下说法正确的是&#xff1a; import numpy as np a np.array([1,2]) print("【显示】a ",a) b np.array([3,4,5]) print("【显示】b &q…

TWS蓝牙耳机怎么挑选?值得买的蓝牙耳机推荐

现如今&#xff0c;TWS耳机市场品类丰富&#xff0c;参差不齐。在挑选蓝牙耳机时应该从需求出发&#xff0c;结合预算&#xff0c;找到真正合适的TWS蓝牙耳机。接下来&#xff0c;我为大家推荐几款值得入手的TWS蓝牙耳机&#xff0c;一起来看看吧。 一、南卡Lite Pro 2蓝牙耳机…

对耳朵伤害最小的耳机类型有哪些、不入耳蓝牙耳机推荐

如果平时喜欢戴着耳机听歌、看剧&#xff0c;并且一戴就是戴几个小时的朋友&#xff01;在此小编想说的是&#xff0c;如果控制不了自己使用耳机的时间&#xff0c;那么就尽量选择不伤耳的耳机&#xff01;骨传导耳机相对于其他耳机&#xff0c;可以更加好的保护听力&#xff0…

为什么建议不要买入耳式的耳机、骨传导耳机和入耳式耳机哪个好?

耳机可以说是我们生活中经常接触和使用的产品了&#xff0c;但是耳机对我们耳朵的影响&#xff0c;很多人并不清楚。经常戴耳机的人肯定深有体会&#xff0c;耳机滑落是常事&#xff0c;戴久了耳朵还胀痛&#xff0c;戴耳机听音乐可能会造成听力损伤和耳膜损伤、损坏&#xff0…

魔法打败魔法?OpenAI用GPT-4 来解释 GPT-2 的行为

大语言模型&#xff08;LLM&#xff09;像大脑一样&#xff0c;它们是由 “神经元” 组成的&#xff0c;它们观察文本中的一些特定模式&#xff0c;以影响整个模型接下来 “说” 什么。但由于 LLM 中的参数数量多到已经无法由人类解释的程度&#xff0c;因此&#xff0c;LLM 给…

我们熟悉的106短信的水好深啊

短信通道在短信平台中的作用起非常重要的作用&#xff0c;短信通道的好坏往往和短信发送的效果有着非常大的关系&#xff0c;下面就学习下有关短信通道的一些知识吧 国内短信通道主要是是由中国移动、中国联通和中国电信提供。目前最大份额的是中国移动&#xff0c;市场份额达到…

170虚拟sim服务器,虚拟大容量SIM卡服务器端管理软件的设计与现实

摘要&#xff1a; 随着移动通信技术在高速的发展,越来越多依赖移动通信网络的应用也相继开展,如移动QQ,移动终端银行,手机电子商务,以及SIM卡上的电子钱包,这对 SIM卡的存储容量带来了挑战.为提高SIM卡的容量下一代的大容量SIM纷纷上市,容量提升了大约1000倍,然而仅依靠物理角度…

罗永浩回应子弹短信、TNT、无限屏的一切质疑

作者 唐小引 来源 CSDN 原创作品&#xff0c;如有转载&#xff0c;请联系公众号授权。 出口即成段子的老罗一直没有学会如何克制自己。 即使反复念叨自己“大嘴巴”&#xff0c;“现在&#xff08;锤子的&#xff09;市场部公关部都不让我讲”&#xff0c;并且吐槽着锤子的一众…