.NET 中实现生产者-消费者模型,BlockingCollection<T> 和 Channel<T>使用示例

devtools/2025/2/9 13:58:47/

一、方案对比:不同线程安全集合的适用场景

在这里插入图片描述

二、推荐方案及示例代码

方案 1:使用 BlockingCollection(同步模型)

public class QueueDemo
{private readonly BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();private readonly CancellationTokenSource _cts = new CancellationTokenSource();public QueueDemo(){}// 生产者方法public void ProduceData(){Task.Run(() =>{var rnd = new Random();while (!_cts.IsCancellationRequested){var item = rnd.Next(1, 100);_blockingCollection.Add(item);      // 触发消费者唤醒Console.WriteLine($"Produced1: {item}");Thread.Sleep(500); // 模拟生产间隔//if(DateTime.Now > Convert.ToDateTime("2025-02-05 16:28:00")) break;}_blockingCollection.CompleteAdding(); // 结束消费});}// 消费者方法public void ConsumeData(){// 方式1:阻塞消费(推荐)Task.Run(() =>{try{Thread.Sleep(1000);// 使用阻塞方式消费(自动处理空队列等待)foreach (var item in _blockingCollection.GetConsumingEnumerable(_cts.Token)){// 自动等待新数据Console.WriteLine($"Consumed from BlockingCollection: {item}, 当前个数:{_blockingCollection.Count}");}}catch (OperationCanceledException){Console.WriteLine("Consumption canceled");}});}// 停止所有操作public void Stop(){_cts.Cancel();}
}//使用示例
var demo = new QueueDemo();
demo.ProduceData();
demo.ConsumeData();Console.WriteLine("Press any key to stop...");
Console.ReadKey();demo.Stop();

方案 2:使用 Channel(异步模型 - 推荐)

public class ChannelDemo
{private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = false });private readonly CancellationTokenSource _cts = new CancellationTokenSource();// 生产者(异步写入)public async Task ProduceAsync(){while (true){var item = GenerateItem();await _channel.Writer.WriteAsync(item); // 非阻塞写入Console.WriteLine($"Produce: {item}");await Task.Delay(20);}}// 消费者(异步读取)public async Task ConsumeAsync(){while (await _channel.Reader.WaitToReadAsync()){if (_channel.Reader.TryRead(out var item)){await ProcessItemAsync(item);}}}private int GenerateItem() => new Random().Next(1, 100);private async Task ProcessItemAsync(int item){await Task.Delay(100); // 模拟异步处理Console.WriteLine($"Processed: {item}");}// 停止所有操作public void Stop(){_cts.Cancel();}
}

三、选型建议

在这里插入图片描述


http://www.ppmy.cn/devtools/157366.html

相关文章

ASP.NET Core托管服务

目录 托管服务的异常问题 托管服务中使用DI 托管服务案例&#xff1a;数据的定时导出 场景&#xff0c;代码运行在后台。比如服务器启动的时候在后台预先加载数据到缓存&#xff0c;每天凌晨3点把数据导出到备份数据库&#xff0c;每隔5秒钟在两张表之间同步一次数据。托管服…

智能化时代下教学任务管理的革新之路

一、引言&#xff1a;教学任务管理效率&#xff0c;教学的关键密码 身为教师&#xff0c;相信不少人都有过这样的困扰&#xff1a;精心准备的课程&#xff0c;在课堂上却进展缓慢&#xff0c;总感觉时间不够用&#xff0c;课程进度拖沓&#xff1b;又或者在课堂上&#xff0c;无…

【redis】缓存设计规范

本文是 Redis 键值设计的 14 个核心规范与最佳实践&#xff0c;按重要程度分层说明&#xff1a; 一、通用数据类型选择 这里我们先给出常规的选择路径图。 以下是对每个步骤的分析&#xff1a; 是否需要排序&#xff1f;&#xff1a; zset&#xff08;有序集合&#xff09;用…

DeepSeek LLM 论文解读:相信长期主义开源理念可扩展大语言模型(DeepSeek 吹响通用人工智能的号角)

论文链接&#xff1a;DeepSeek LLM: Scaling Open-Source Language Models with Longtermism&#xff08;相信长期主义开源理念可扩展大语言模型&#xff09; 目录 摘要一、数据处理&#xff08;一&#xff09;数据清洗与丰富&#xff08;二&#xff09;分词器与词汇设置 二、模…

Maven概述与安装

目录 Maven 概述 1. 什么是 Maven 2. Maven 的主要功能 3. Maven 的优势 Maven 安装 1. 系统要求 2. 下载 Maven 3. 解压 Maven 4. 配置环境变量 Windows 系统 1.配置环境变量&#xff1a; 2.验证安装&#xff1a; Linux 系统 1. 打开终端窗口 2. 打开 .bashrc 文…

undetected-chromedriver 使用教程,指定浏览器驱动和浏览器版本

前言 浏览器自动化测试&#xff0c;测试过一些网站检测 目录 前言1. 安装 undetected-chromedriver2. 基本使用示例代码&#xff1a;代码解析&#xff1a; 3. 反自动化检测绕过自定义浏览器设置示例&#xff1a;使用自定义 User-Agent示例&#xff1a;启用无头模式&#xff08…

c#对接deepseek 聊天AI接口

注意&#xff1a;不是免费 对接文档&#xff1a;对话补全 | DeepSeek API Docs 注册地址&#xff1a;DeepSeek 申请key 在线请求示例 apifox deepseek - deepseek

【字节青训营-5】:初探存储系统与数据库及技术原理,解析关系型、非关系型数据库

本文为笔者参加字节青训营时听字节青训营的课时所做课堂笔记。 本文目录 一、一条数据的生命周期二、存储系统2.1 存储系统的特点2.2 存储系统的存储器层级结构2.3 数据怎么从应用到存储介质2.4 RAID技术 三、数据库和存储系统四、单机存储系统五、分布式存储六、单机数据库七、…