深入详解 C# Task.Run异步任务

server/2024/10/20 6:18:07/

目录

Task.Run

Task.Run 的底层原理

默认并发数量

控制并发

使用 SemaphoreSlim

代码解析

使用 Parallel.ForEach

代码解析

注意事项

自定义任务调度器

代码解析

使用自定义任务调度器:

总结


Task.Run

Task.Run 是 .NET 中创建和启动异步任务的一种便捷方法。它通过将一个委托排队到 .NET 线程池来创建并运行任务。理解 Task.Run 的底层原理、默认并发数量以及并发控制方法对于优化并发程序至关重要。

Task.Run 的底层原理

  1. 任务创建与启动

    • Task.Run 本质上是调用了 Task.Factory.StartNew 方法,并设置了默认的任务调度选项和任务创建选项。
    • 具体来说,Task.Run 等价于 Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)
  2. 线程池

    • Task.Run 使用 .NET 线程池 (ThreadPool) 来执行任务。线程池是一个全局的、共享的资源,管理和复用一组线程。
    • 线程池通过一种自适应算法动态调整线程的数量,以应对当前的工作负载。
  3. 任务调度器

    • Task.Run 使用默认的任务调度器 (TaskScheduler.Default),它依赖于线程池来调度任务。
    • 默认任务调度器会将任务排队到线程池,然后线程池中的线程会从队列中获取任务并执行。

默认并发数量

  1. 线程池的并发策略

    • 线程池的初始线程数取决于系统的处理器核心数(CPU 核数)。对于每个逻辑处理器,线程池会保留一个或多个线程以处理任务。
    • 线程池有一个最小线程数和一个最大线程数。最小线程数可以通过 ThreadPool.SetMinThreads 方法设置,默认情况下等于逻辑处理器的数量。
    • 当线程池中的线程处于忙碌状态时,线程池会判断是否需要创建新线程。创建新线程的决策基于一个自适应的算法,该算法考虑了任务队列的长度、任务的执行时间等因素。
  2. 最大并发数

    虽然线程池可以动态调整线程数量,但默认情况下,线程池的最大线程数是相对较高的。可以通过 ThreadPool.GetMaxThreads 获取这个值。
    int workerThreads, completionPortThreads;
    ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
    Console.WriteLine($"Max Worker Threads: {workerThreads}, Max Completion Port Threads: {completionPortThreads}");
    
     

控制并发

使用 SemaphoreSlim

SemaphoreSlim 是一种轻量级的同步原语,可以用来控制同时执行的任务数量。通过在任务开始时等待信号量,在任务结束时释放信号量来实现并发控制。

// 异步方法,用于处理文档列表,通过信号量限制并发任务的数量
private async Task ProcessDocumentListAsync(List<string> documents)
{// 存储任务的列表var tasks = new List<Task>();// 创建一个信号量,限制并发任务数量为5var semaphore = new SemaphoreSlim(5);// 遍历文档列表foreach (var doc in documents){// 等待信号量,确保并发任务不超过5个await semaphore.WaitAsync();// 将处理文档的任务添加到任务列表tasks.Add(Task.Run(async () =>{try{// 异步处理文档await ProcessDocumentAsync(doc);}finally{// 任务完成后释放信号量semaphore.Release();}}));}// 等待所有任务完成await Task.WhenAll(tasks);
}// 异步方法,用于处理单个文档
private async Task ProcessDocumentAsync(string document)
{// 模拟文档处理,通过延时模拟耗时操作await Task.Delay(1000);// 打印处理完成的信息Console.WriteLine($"Processed document: {document}");
}

 

代码解析
  1. ProcessDocumentListAsync 方法

    • 参数:接受一个文档列表 (List<string> documents)。
    • 任务列表:创建一个空的任务列表 tasks 用于存储所有的处理任务。
    • 信号量:创建一个信号量 semaphore,初始计数为5,这意味着最多允许5个并发任务。
    • 遍历文档列表:使用 foreach 循环遍历所有文档。
      • 等待信号量:调用 await semaphore.WaitAsync(),当信号量计数大于0时,允许任务继续;否则,等待。
      • 添加任务:使用 Task.Run 启动一个新任务来处理文档,并将其添加到任务列表 tasks 中。
      • 任务内容:在任务中调用 ProcessDocumentAsync 方法异步处理文档。
        • try-finally 块:确保任务执行完成后,无论是否发生异常,都会释放信号量。
    • 等待所有任务完成await Task.WhenAll(tasks) 确保所有添加到任务列表中的任务全部完成。
  2. ProcessDocumentAsync 方法

    • 参数:接受一个文档 (string document)。
    • 模拟处理:使用 await Task.Delay(1000) 模拟耗时的文档处理操作,延时1秒。
    • 打印信息:处理完成后,打印处理文档的消息。

        该代码通过使用信号量 (SemaphoreSlim) 控制并发任务的数量,确保同一时刻最多只有5个文档处理任务在运行。每个文档处理任务通过 Task.Run 启动,并调用 ProcessDocumentAsync 方法来模拟实际的文档处理。任务完成后,无论是否发生异常,都会释放信号量,从而允许新的任务启动。最后,ProcessDocumentListAsync 方法等待所有任务完成后才返回。

通过这种方式,可以有效地控制并发任务数量,避免因过多并发任务导致的资源耗尽或性能下降。

使用 Parallel.ForEach

Parallel.ForEach 提供了并行化处理集合的方法,可以通过 ParallelOptions 来控制并行程度。

// 设置并行选项, 限制并行任务的最大数量为5个
var parallelOptions = new ParallelOptions
{MaxDegreeOfParallelism = 5
};// 使用 Parallel.ForEach 并行处理文档列表
Parallel.ForEach(documents, parallelOptions, (doc) =>
{// 调用异步方法处理文档,在此处使用 .Wait() 同步等待异步方法完成ProcessDocumentAsync(doc).Wait();
});
代码解析
  1. 设置并行选项:创建 ParallelOptions 对象,并设置 MaxDegreeOfParallelism 属性为 5,从而限制并行任务的最大数量。

  2. 并行处理文档:使用 Parallel.ForEach 方法遍历文档列表:

    • 传入 documents 作为文档列表。
    • 传入 parallelOptions 以控制并行度。
    • 对每个文档执行 lambda 表达式中的操作,调用 ProcessDocumentAsync(doc).Wait() 同步等待异步文档处理完成。
注意事项
  • 同步等待异步方法:使用 .Wait() 方法将异步方法 ProcessDocumentAsync 转为同步,这种方式可能会引发一些潜在的问题,如死锁。如果 ProcessDocumentAsync 方法内部有 await,应当注意避免在上下文中使用 .Wait().Result,因为它们会阻塞线程。

  • 上下文捕获:如果 ProcessDocumentAsync 方法有上下文捕获(如 UI 线程上下文),则使用 .Wait() 可能导致死锁或性能问题。可以使用 ConfigureAwait(false) 来避免这些问题。

 

自定义任务调度器

可以通过实现自定义的任务调度器 (TaskScheduler) 来更精细地控制任务的调度和并发。

// 自定义任务调度器,用于限制并发任务的最大数量
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{// 最大并发任务数量private readonly int maxDegreeOfParallelism;// 用于存储待执行任务的队列private readonly LinkedList<Task> tasks = new LinkedList<Task>();// 当前正在运行的任务数量private int runningTasks;// 构造函数,初始化最大并发任务数量public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){this.maxDegreeOfParallelism = maxDegreeOfParallelism;}// 返回当前计划的任务。这对于调试和监控工具非常有用。protected override IEnumerable<Task> GetScheduledTasks(){lock (tasks){// 返回任务队列中的任务数组return tasks.ToArray();}}// 将任务排队到任务队列中。如果运行的任务小于最大并发任务数,则立即执行任务。protected override void QueueTask(Task task){lock (tasks){// 将任务添加到队列尾部tasks.AddLast(task);// 如果当前运行的任务数量小于最大并发任务数量,则启动一个新任务if (runningTasks < maxDegreeOfParallelism){runningTasks++;// 通知线程池有待处理工作NotifyThreadPoolOfPendingWork();}}}// 通知线程池有待处理的工作private void NotifyThreadPoolOfPendingWork(){// 使用线程池执行任务ThreadPool.UnsafeQueueUserWorkItem(_ =>{// 从任务队列中取出第一个任务Task item;lock (tasks){item = tasks.First.Value;tasks.RemoveFirst();}// 尝试执行任务base.TryExecuteTask(item);lock (tasks){// 任务执行完成,减少运行中的任务计数runningTasks--;// 如果还有待执行的任务,则递归通知线程池if (tasks.Count > 0){runningTasks++;NotifyThreadPoolOfPendingWork();}}}, null);}// 尝试在线程中的同步上下文内执行任务protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){// 如果任务已排队,则不在线程内执行if (taskWasPreviouslyQueued)return false;// 直接执行任务return base.TryExecuteTask(task);}
}
代码解析
  1. LimitedConcurrencyLevelTaskScheduler:这是一个继承自 TaskScheduler 的自定义任务调度器,主要用于限制并发任务的最大数量。

  2. 字段

    • maxDegreeOfParallelism:最大并发任务数量。
    • tasks:用于存储排队等待执行的任务队列。
    • runningTasks:当前正在运行的任务数量。
  3. 构造函数:初始化最大并发任务数量。

  4. GetScheduledTasks 方法:返回当前排队的任务数组,这对于调试和监控非常有用。

  5. QueueTask 方法:将任务添加到任务队列中,并在当前运行的任务数量少于最大并发数量时启动新任务。

  6. NotifyThreadPoolOfPendingWork 方法:通知线程池有待处理的任务,从任务队列中取出任务并执行,递归地处理后续任务。

  7. TryExecuteTaskInline 方法:尝试在线程中的同步上下文内直接执行任务,如果任务已经排队则返回 false,否则直接执行任务。

使用自定义任务调度器:

var scheduler = new LimitedConcurrencyLevelTaskScheduler(5);
var tasks = new List<Task>();foreach (var doc in documents)
{tasks.Add(Task.Factory.StartNew(() =>ProcessDocumentAsync(doc).Wait(),CancellationToken.None,TaskCreationOptions.None,scheduler));
}Task.WaitAll(tasks.ToArray());

总结

  • Task.Run 将任务排队到线程池中执行,线程池会根据工作负载动态调整线程数量。
  • 默认并发数量 由线程池管理,系统会根据当前的负载自动调整。
  • 控制并发 可以通过使用 SemaphoreSlimParallel.ForEach、自定义任务调度器等方式来实现。

通过理解和应用这些机制,可以更好地控制并发任务的执行,实现高效的并发编程。


http://www.ppmy.cn/server/113551.html

相关文章

探寻 IP 代理地址繁多之因

在当今的网络天地里&#xff0c;IP 代理服务随处可见&#xff0c;且令人称奇的是&#xff0c;它们常常手握海量的 IP 地址可供挑选。那么&#xff0c;究竟是什么原因使得 IP 代理拥有如此众多的地址呢&#xff1f;现在&#xff0c;就让我们一同深入探究这个神秘现象背后的缘由。…

[项目][CMP][Page Cache]详细讲解

目录 1.申请内存2.释放内存3.框架 1.申请内存 当Central Cache向Page Cache申请内存时&#xff0c;Page Cache先检查对应位置有没有span&#xff0c;如果没有则向更大页寻找一个span&#xff0c;如果找到则分裂成两个 比如&#xff1a;申请的是4页page&#xff0c;4页page后面…

lvs DR模式调试

DS配置&#xff1a; # cat /etc/keepalived_docker/keepalived.conf ! Configuration File for keepalived global_defs {router_id LVS_70 # 设置lvs的id&#xff0c;在一个网络内应该是唯一的 }vrrp_instance VI_70 {state MASTER # 两个 DS&#xff0…

使用 macos 制作启动盘给主机安装 Windows 系统

1. 下载系统文件 微软官方地址&#xff0c;根据自己需要的系统版本下载对应的镜像文件即可&#xff0c;下载完成之后双击文件将镜像文件挂载到你的电脑&#xff0c;这个时候打开访达应该可以看到多了一个卷 2. 制作启动盘&#xff0c;并且把你的镜像复制到 U 盘 查看你的U盘…

去中心化身份(DID)与你:SOEX安全的交易未来

在快速发展的数字时代&#xff0c;管理和保护数字身份比以往任何时候都更加重要。去中心化身份 &#xff08;&#xff24;&#xff29;&#xff24;&#xff09; 已成为传统数字身份管理方法的有力替代方案&#xff0c;而传统方法往往存在不足。 &#xff24;&#xff29;&a…

ArcGIS Pro SDK (十二)布局 10 布局导出

ArcGIS Pro SDK (十二)布局 10 布局导出 文章目录 ArcGIS Pro SDK (十二)布局 10 布局导出1 布局导出1.1 将布局导出为 PDF1.2 将地图框导出为 JPG1.3 将与地图框关联的地图视图导出到 BMP1.4 将地图系列导出为单个 PDF1.5 将地图系列导出到单个 TIFF 文件2 布局选项2.1 获…

golang学习笔记07——使用gzip压缩字符减少redis等存储占用的实现

推荐学习文档 基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总golang学习笔记01——基本数据类型golang学习笔记02——gin框架及基本原理golang学习笔记03——gin框架的核心数据结构golang学习笔记04——如何真正写好Golang代码&…

手写 Vue Router 中的 Hash 模式和 History 模式

Vue Router 是 Vue.js 的官方路由库&#xff0c;负责管理 Vue应用中的页面导航。它与 Vue.js 核心深度集成&#xff0c;让用 Vue.js 构建单页应用变得轻而易举。Vue Router 提供两种常见的模式&#xff1a;Hash 模式和 History 模式。这两种模式的主要区别在于它们如何管理 URL…