一、方案对比:不同线程安全集合的适用场景
二、推荐方案及示例代码
方案 1:使用 BlockingCollection(同步模型)
public class QueueDemo
{private readonly BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();private readonly CancellationTokenSource _cts = new CancellationTokenSource();public QueueDemo(){}public void ProduceData(){Task.Run(() =>{var rnd = new Random();while (!_cts.IsCancellationRequested){var item = rnd.Next(1, 100);_blockingCollection.Add(item); Console.WriteLine($"Produced1: {item}");Thread.Sleep(500); }_blockingCollection.CompleteAdding(); });}public void ConsumeData(){Task.Run(() =>{try{Thread.Sleep(1000);foreach (var item in _blockingCollection.GetConsumingEnumerable(_cts.Token)){Console.WriteLine($"Consumed from BlockingCollection: {item}, 当前个数:{_blockingCollection.Count}");}}catch (OperationCanceledException){Console.WriteLine("Consumption canceled");}});}public void Stop(){_cts.Cancel();}
}
var demo = new QueueDemo();
demo.ProduceData();
demo.ConsumeData();Console.WriteLine("Press any key to stop...");
Console.ReadKey();demo.Stop();
方案 2:使用 Channel(异步模型 - 推荐)
public class ChannelDemo
{private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = false });private readonly CancellationTokenSource _cts = new CancellationTokenSource();public async Task ProduceAsync(){while (true){var item = GenerateItem();await _channel.Writer.WriteAsync(item); Console.WriteLine($"Produce: {item}");await Task.Delay(20);}}public async Task ConsumeAsync(){while (await _channel.Reader.WaitToReadAsync()){if (_channel.Reader.TryRead(out var item)){await ProcessItemAsync(item);}}}private int GenerateItem() => new Random().Next(1, 100);private async Task ProcessItemAsync(int item){await Task.Delay(100); Console.WriteLine($"Processed: {item}");}public void Stop(){_cts.Cancel();}
}
三、选型建议