服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

news/2024/11/25 2:41:58/

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /*** Start the NIO endpoint, creating acceptor, poller threads.*/@Overridepublic void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}initializeConnectionLatch();// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}}

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue(); //无界队列TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());//提前启动核心线程prestartAllCoreThreads();}

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private volatile ThreadPoolExecutor parent = null;// No need to be volatile. This is written and read in a single thread// (when stopping a context and firing the  listeners)private Integer forcedRemainingCapacity = null;public TaskQueue() {super();}...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/*** {@inheritDoc}*/@Overridepublic void execute(Runnable command) {//重载 java.util.concurrent.ThreadPoolExecutor#executeexecute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}//强制入队public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the object//当前线程数达到最大,任务入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queue//如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new thread// 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queue//任务入队(当前线程数大于最大线程数)return super.offer(o);}

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueueif (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//强制入队if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else { //非 TaskQueue 直接触发拒绝策略submittedCount.decrementAndGet();throw rx;}

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) { //capacity是Integer最大值if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author dongguabai* @date 2023-11-18 22:04*/
public class Demo {public static void main(String[] args) {//无界队列TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);taskqueue.setParent(executor);observe(executor);while (true) {executor.execute(new Runnable() {public void run() {excuteForever();}});}}private static void observe(final ThreadPoolExecutor executor) {Runnable task = new Runnable() {public void run() {while (true) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());}}};new Thread(task).start();}public static void excuteForever() {while (true) {}}
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Overridepublic void startInternal() throws Exception {if (!running) {...if ( getExecutor() == null ) {createExecutor(); //如果没有自定义实现,就会使用默认实现}}...}

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述


http://www.ppmy.cn/news/1227942.html

相关文章

springcloudalibaba-3

一、Nacos Config入门 1. 搭建nacos环境【使用现有的nacos环境即可】 使用之前的即可 2. 在微服务中引入nacos的依赖 <!-- nacos配置依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-…

map与set的封装

目录 红黑树的结点 与 红黑树的迭代器 红黑树的实现&#xff1a; 迭代器&#xff1a; ​编辑 红黑树的查找&#xff1a; 红黑树的插入&#xff1a; ​编辑 检查红色结点&#xff1a;​编辑红黑树的左旋 ​编辑红黑树的右旋 ​编辑红黑树的双旋 Map的封装 ​编辑set的…

SpirngBoot + Vue 前后端分离开发工具代码

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; Java从入门到精通 ✨特色专栏&#xf…

C# Array和ArrayList有什么区别

在C#中&#xff0c;Array和ArrayList是集合类型&#xff0c;用于存储一组元素&#xff0c;但它们之间有几个关键区别&#xff1a; 类型安全&#xff1a; Array是类型安全的&#xff0c;意味着它只能存储一种特定类型的元素。例如&#xff0c;一个int[]数组只能存储int类型的元素…

FPGA设计时序约束八、others类约束之Set_Case_Analysis

目录 一、序言 二、Set Case Analysis 2.1 基本概念 2.2 设置界面 2.3 命令语法 2.4 命令示例 三、工程示例 四、参考资料 一、序言 在Vivado的时序约束窗口中&#xff0c;存在一类特殊的约束&#xff0c;划分在others目录下&#xff0c;可用于设置忽略或修改默认的时序…

Gin框架源码解析

概要 目录 Gin路由详解 Gin框架路由之Radix Tree 一、路由树节点 二、请求方法树 三、路由注册以及匹配 中间件含义 Gin框架中的中间件 主要讲述Gin框架路由和中间件的详细解释。本文章将从Radix树&#xff08;基数树或者压缩前缀树&#xff09;、请求处理、路由方法树…

【ATTCK】MITRE Caldera-emu插件

CALDERA是一个由python语言编写的红蓝对抗工具&#xff08;攻击模拟工具&#xff09;。它是MITRE公司发起的一个研究项目&#xff0c;该工具的攻击流程是建立在ATT&CK攻击行为模型和知识库之上的&#xff0c;能够较真实地APT攻击行为模式。 通过CALDERA工具&#xff0c;安全…

C语言链式栈

stack.h typedef struct Node_s {int data;struct Node_s *pNext; } Node_t, *pNode_t;typedef struct Stack_s {pNode_t pHead;//栈顶指针&#xff0c;指向了链表的第一个结点int size;//栈的元素个数 } Stack_t, *pStack_t;void init(pStack_t pStack); void push(pStack_t …