源码分析CompletableFuture使用默认线程池ForkJoinPool的弊端

news/2025/1/18 8:46:51/

先说结论:
假如有20CompletableFuture任务并发执行时,都使用默认线程池ForkJoinPool,但cpu的核心数又小于3,那么就会新建20个线程(不使用默认线程池了),这20个线程相互竞争cpu资源和内存,很多线程都在等待,浪费了大量的性能在线程上下文切换上。

线程池大小设定:

  • 如果服务是cpu密集型的,设置为电脑的核数
  • 如果服务是io密集型的,设置为电脑的核数*2

runAsync方法点进去

    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println(1);});

可以看见使用的是asyncPool。点进asyncPool

    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}

useCommonPool是否为true决定了使用 ForkJoinPool线程池还是新建一个线程池。点进useCommonPool。

    private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

这里判定的是ForkJoinPool common线程池中并行度级别是否大于1。点进 getCommonPoolParallelism() 方法

    private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);

返回的是commonParallelism这个字段,再往下找。

    public static int getCommonPoolParallelism() {return commonParallelism;}

发现只有一个地方对这个属性进行赋值,继续。

    static final int commonParallelism;

在这里插入图片描述

发现commonParallelism 由par决定,par来自common.config SMASK做与运算。SMASK定义为0xffff(65535),common.config由 makeCommonPool()得到。点进makeCommonPool()方法

...
static final ForkJoinPool common;
static {...common = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() {return makeCommonPool(); //common的config由此方法返回}});int par = common.config & SMASK; // report 1 even if threads disabledcommonParallelism = par > 0 ? par : 1;
}

在这里插入图片描述

我简化了下面源码,parallelism 初始化为 -1,若jvm启动参数有java.util.concurrent.ForkJoinPool.common那么parallelism将会被启动参数指定。Runtime.getRuntime().availableProcessors() 是获取虚拟机可使用的处理器数量。在jvm未定义参数的前提下,处理器数量若小于等于2,那么并行度parallelism就为1,反之则为 (处理器数量 - 1)。定义了jvm参数,若参数值大于MAX_CAP(32767),则重新赋值。即parallelism 总会大于0, 继续点进ForkJoinPool的构造方法。

    private static ForkJoinPool makeCommonPool() {...int parallelism = -1;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");if (pp != null)parallelism = Integer.parseInt(pp);} catch (Exception ignore) {}if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, // 注意 LIFO_QUEUE等于0 "ForkJoinPool.commonPool-worker-");}

发现config也是作位运算,即config也会大于0,我们拿到这个config返回开始 par 赋值那一块。

    private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode; // SMASK等于65535,mode等于0long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}

总结就是并行度 parallelism(大于0) 与 65535(111111…) 做两次与运算,即本身。继续回到之前

...
static final ForkJoinPool common;
static {...common = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() {return makeCommonPool(); //common的config由此方法返回}});int par = common.config & SMASK; // report 1 even if threads disabledcommonParallelism = par > 0 ? par : 1;
}

这里判断,

  • 无JVM参数前提下:
    • 若服务器的核心数小于等于2,commonParallelism 则为1,即useCommonPool 为false,new 一个线程池。
    • 若服务器的核心数大于2,commonParallelism 则为 核心数 - 1,即useCommonPool 为true,使用ForkJoinPool线程池。
  • 有JVM参数,以设置参数为准。大于1小于等于32767。和上面判断一致。
    private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);public static int getCommonPoolParallelism() {return commonParallelism;}

在ThreadPerTaskExecutor 中 execute,他会为每个任务新开一个线程,而不是采用ForkJoinPool中的线程。

    static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }}

结论:jvm启动参数中 java.util.concurrent.ForkJoinPool.common 的值为 1或者服务器核心数小于等于2,都会导致不采用ForkJoinPool 中的线程,而是新起一个线程。

如果服务器只有两核,假如业务需要:现在起了20个completablefuture任务,若使用默认线程池,那么就会创建20个线程,20个线程并发执行在两个cpu上竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。


http://www.ppmy.cn/news/1055932.html

相关文章

有关Arm CE支持的sha1 sha224 sha256 sha384 sha512指令

快速链接: . 👉👉👉 个人博客笔记导读目录(全部) 👈👈👈 付费专栏-付费课程 【购买须知】:【精选】ARMv8/ARMv9架构入门到精通-[目录] 👈👈👈再某一款SOC(cortex-A53)上进行数字摘要计算的时候, 发现sha1 sha224 sha256的性能很高,sha384 sha512的性能…

【C++数据结构】二叉搜索树

【C数据结构】二叉搜索树 目录 【C数据结构】二叉搜索树二叉搜索树概念二叉搜索树操作二叉搜索树的查找二叉搜索树的插入二叉搜索树的删除二叉搜索树的实现二叉搜索树的应用二叉搜索树的性能分析 作者&#xff1a;爱写代码的刚子 时间&#xff1a;2023.8.22 前言&#xff1a;二…

数据库容量考虑因素

一、数据库需求分析 1.1 数据类型 1.2 数据量预测 1.3 数据增长速度 二、数据库性能需求 2.1 响应时间 2.2 吞吐量 2.3 并发处理能力 三、数据库成本考虑 3.1 硬件成本 3.2 软件成本 3.3 人力成本 四、数据库扩展性考虑 4.1 升级路径 4.2 兼容性 4.3 容灾备份方…

Linux基础命令2

目录 基础命令 ln命令 grep命令 查看文本内容的五种方式 1.cat命令 2.more命令 3.less命令 4.head命令 5.tail命令 echo命令 alias命令 基础命令 ln命令 作用&#xff1a;创建链接文件 格式&#xff1a;ln 命令选项 目标文件 链接文件名 命令选项&#xff1a;-s…

k8s节点pod驱逐、污点标记

一、设置污点&#xff0c;禁止pod被调度到节点上 kubectl cordon k8s-node-145 设置完成后&#xff0c;可以看到该节点附带了 SchedulingDisabled 的标记 二、驱逐节点上运行的pod到其他节点 kubectl drain --ignore-daemonsets --delete-emptydir-data k8s-node-145 显示被驱逐…

FxFactory 8 Pro Mac 苹果电脑版 fcpx/ae/motion视觉特效软件包

FxFactory pro for mac是应用在Mac上的fcpx/ae/pr视觉特效插件包&#xff0c;包含了成百上千的视觉效果&#xff0c;打包了很多插件&#xff0c;如调色插件&#xff0c;转场插件&#xff0c;视觉插件&#xff0c;特效插件&#xff0c;文字插件&#xff0c;音频插件&#xff0c;…

Scikit-learn降维与度量学习代码批注及相关练习

一、代码批注 代码来自&#xff1a;https://scikit-learn.org/stable/auto_examples/decomposition/plot_pca_iris.html#sphx-glr-auto-examples-decomposition-plot-pca-iris-py import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes…

Xmake v2.8.2 发布,官方包仓库数量突破 1k

Xmake 是一个基于 Lua 的轻量级跨平台构建工具。 它非常的轻量&#xff0c;没有任何依赖&#xff0c;因为它内置了 Lua 运行时。 它使用 xmake.lua 维护项目构建&#xff0c;相比 makefile/CMakeLists.txt&#xff0c;配置语法更加简洁直观&#xff0c;对新手非常友好&#x…