聊一聊 C#线程池 的线程动态注入

ops/2024/12/26 10:40:02/

一:背景

1. 讲故事

上一篇我们用 Thread.Sleep 的方式演示了线程池饥饿场景下的动态线程注入,可以观察到大概 1s 产生 1~2 个新线程,很显然这样的增长速度扛不住上游请求对线程池的DDOS攻击,导致线程池队列越来越大,但C#团队这么优秀,能优化的地方绝对会给大家尽可能的优化,比如这篇我们聊到的 Task.Result 场景下的注入。

二:Task.Result 角度下的动态注入

1. 测试代码

为了直观的体会到优化效果,先上一段测试代码观察一下。

static void Main(string[] args){for (int i = 0; i < 10000; i++){ThreadPool.QueueUserWorkItem((idx) =>{Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {idx}: 这是耗时任务");try{var client = new HttpClient();var content = client.GetStringAsync("https://youtube.com").Result;Console.WriteLine(content.Length);}catch (Exception ex){Console.WriteLine(ex.Message);}}, i);}Console.ReadLine();}

从卦象上来看大概1s产生4个新线程,再仔细看的话大概是250ms一个,虽然250不大好听,但不管怎么说确实比 Thread.Sleep 场景下只产生 1~2 个线程要快了好几倍,以终为始,我们再反向的看下这个优化的底层逻辑在哪?

2. 底层逻辑在哪里

还是那句话,千言万语不抵一张图,流程图大概如下:

接下来解释下其中的几个元素。

1、NotifyThreadBlocked

这是主动通知 GateThread 线程赶紧醒来,通过上一篇的知识大家应该知道 GateThread 会500ms一次被动唤醒,但为了提速不可能再这么干了,需要让人强制唤醒它,修剪后的源码如下:

private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken){var mres = new SetOnInvokeMres();AddCompletionAction(mres, addBeforeOthers: true);bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();var returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);return returnValue;}public bool NotifyThreadBlocked(){GateThread.Wake(this);return true;}public static void Wake(PortableThreadPool threadPoolInstance){DelayEvent.Set();}

卦中的 DelayEvent.Set(); 正是强制唤醒 GateThread 的 event 事件。

2、HasBlockingAdjustmentDelayElapsed

GateThread 是注入线程的官方通道,那到底要不要注入线程呢?肯定少不了一些判断,其中一个判断就是当前的延迟周期是否超过了 250ms,这个250ms的阈值最终由 BlockingConfig.MaxDelayMs 变量指定,这是能否调用 CreateWorkerThread方法需要闯的一个关口,参考代码如下:

private static class BlockingConfig{MaxDelayMs =(uint) AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.Blocking.MaxDelayMs",250,false);}private static void GateThreadStart(){while (true){bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs));currentTimeMs = Environment.TickCount;do{previousDelayElapsed = delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake);if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary && !previousDelayElapsed){break;}uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);} while (false);}}public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake){if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay){return true;}uint elapsedMsSincePreviousBlockingAdjustmentDelay = (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs);return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs;}

从上面的代码可以看到一旦 previousDelayElapsed =false 就直接 break 了,不再调用PerformBlockingAdjustment 方法来闯第二个关口。

3、PerformBlockingAdjustment

一旦满足了250ms阈值之后,接下来就需要观察ThreadPool当前的负载能力,由内部的 ThreadCounts 提供支持,比如 NumProcessingWork 表示当前线程池正在处理的任务数, NumThreadsGoal 表示线程不要超过此上限值,如果超过了就进入动态注入阶段,参考代码如下:

private struct ThreadCounts{public short NumProcessingWork;public short NumExistingThreads;public short NumThreadsGoal;}

有了这个基础之后,接下来再上一段注入线程需要满足的第二个关口。

private static void GateThreadStart(){uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);}private uint PerformBlockingAdjustment(bool previousDelayElapsed){var nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);if (addWorker){WorkerThread.MaybeAddWorkingWorker(this);}return nextDelayMs;}private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker){if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0){addWorker = true;}}

从卦中代码可以看到,一旦线程池中 处理的任务数 >= 线程上限值,这就表示当前线程池正在满负荷的跑,numRequestedWorkers>0 表示有新任务来了需要线程来处理,所以这两组条件一旦满足,就必须要创建新线程。

3. 如何眼见为实

刚才啰嗦了那么多,那如何眼见为实呢?非常简单,还是用 dnspy 的断点日志功能观察,我们下三个断点。

1、第一个条件 HasBlockingAdjustmentDelayElapsed 处增加 1. {!wasSignaledToWake} {this._adjustForBlockingAfterNextDelay}, 延迟时间:{currentTimeMs - this._previousBlockingAdjustmentDelayStartTimeMs} ,上一次延迟:{_previousBlockingAdjustmentDelayMs}

2、第二个条件 PerformBlockingAdjustment 处增加 2. 正在处理任务数:{threadCounts.NumProcessingWork} ,合适线程数:{num},是否要新增线程:{this._separated.numRequestedWorkers>0} 。

3、线程创建 WorkerThread.CreateWorkerThread 处增加 3. 已成功创建线程 。

最后把程序跑起来,观察 output窗口 的结果,非常清爽,吉卦。

三:总结

采用主动通知的方式唤醒GateThread可以让每秒线程注入数由原来的 1~2 个提升到 4 个,虽然有所优化,但面对上游洪水猛兽般的请求,很显然也是杯水车薪,最终还是酿成了线程饥饿的悲剧。

文章转载自:一线码农

原文链接:https://www.cnblogs.com/huangxincheng/p/18627222

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构


http://www.ppmy.cn/ops/145104.html

相关文章

[OpenGL]使用TransformFeedback实现粒子效果

一、简介 本文介绍了如何使用 OpenGL 中的 Transform Feedback 实现粒子效果&#xff0c;最终可以实现下图的效果&#xff1a; 本文的粒子系统实现参考了modern-opengl-tutorial, ogldev-tutorial28 和 粒子系统–喷泉 [OpenGL-Transformfeedback]。 二、使用 TransformFeed…

浏览器http缓存问题

一、什么是浏览器缓存 浏览器将请求过的资源&#xff08;html、js、css、img&#xff09;等&#xff0c;根据缓存机制&#xff0c;拷贝一份副本存储在浏览器的内存或者磁盘上。如果下一次请求的url相同时则根据缓存机制决定是读取内存或者磁盘上的数据还是去服务器请求资源文件…

【汇编】关于函数调用过程的若干问题

1. 为什么需要bp指针&#xff1f; 因为bp是栈帧的起始地址&#xff0c;函数内的局部栈变量&#xff0c;采用相对bp的内存寻址。不能相对于sp&#xff0c;sp是一直在变的。 2. 函数调用过程&#xff1f; 函数开始&#xff0c;先压栈bp&#xff0c;保存父函数栈底指针bp&#…

[Router]路由器常用的后台判断网络ping 可靠公共 IP 地址整理

接受ICMP公共DNS地址 接受 ICMP 的公有 IPv4 和 IPv6 端点的集合&#xff0c;如果使用 ping 方法&#xff0c;则可以使用来跟踪接口的连接状态。这些是具有高可用性的&#xff0c;通常可以可靠地用作确认网络连接的终端节点。或者&#xff0c;您也可以使用 ISP 的 DNS 解析器&a…

C语言结构体位定义(位段)的实际作用深入分析

1、结构体位段格式 struct struct_name {type [member_name] : width; };一般定义结构体&#xff0c;成员都是int、char等类型&#xff0c;占用的空间大小是固定的在成员名称后用冒号来指定位宽&#xff0c;可以指定每个成员所占用空间&#xff0c;并且也不用受结构体成员起始…

Django 模型字段类型详解

在 Django 中,模型是应用程序的核心部分之一。它们是 Python 对象,用于映射数据库表。每个模型都由一系列字段组成,这些字段代表数据库表中的列。Django 提供了丰富的字段类型,用于定义模型字段,以满足各种数据存储需求。 © ivwdcwso (ID: u012172506) 1. CharField Cha…

基于小程序宿舍报修系统的设计与实现ssm+论文源码调试讲解

第2章 开发环境与技术 基于微信小程序的宿舍报修系统的编码实现需要搭建一定的环境和使用相应的技术&#xff0c;接下来的内容就是对基于微信小程序的宿舍报修系统用到的技术和工具进行介绍。 2.1 MYSQL数据库 本课题所开发的应用程序在数据操作方面是不可预知的&#xff0c;…

周期性边界条件、近邻列表和原子间作用势

文章目录 1.周期性边界条件1.什么是周期性边界条件(PBC)2.周期性边界条件基本特点3.最小镜像约定4.Python实现 2.势场的有限距离截断1.原子间相互作用力2.势场截断的理论基础3.势场截断方法 3.近邻列表构筑与更新1.近邻算法&#xff1a;VerletList法2.近邻算法&#xff1a;区间…