Spring Boot 线程池自定义拒绝策略:解决任务堆积与丢失问题

ops/2025/2/11 12:42:12/

如何通过自定义线程池提升系统稳定性

背景

在高并发系统中,线程池管理至关重要。默认线程池可能导致:

  1. 资源浪费(创建过多线程导致 OOM)
  2. 任务堆积(队列满后任务被拒绝)
  3. 任务丢失(默认拒绝策略丢弃任务
    为了防止这些问题,我们使用 Spring Boot 自定义线程池,并优化 异常处理 和 拒绝策略。

线程池方案设计

在 ExecutorConfig 类中,我们定义了两个线程池

  1. myExecutor:用于普通任务,采用CallerRunsPolicy 避免任务丢失。
  2. oneExecutor:用于信号计算任务(单线程模式),具有 自定义异常处理 和 阻塞式拒绝策略。

代码解析

线程池 myExecutor(通用任务池)

@Bean(name = "myExecutor")
public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;
}

设计要点:
CallerRunsPolicy:线程池满了,主线程执行任务,防止丢失但可能影响性能。

线程池 oneExecutor(单线程计算池)

@Bean(name = "oneExecutor")
public Executor oneExecutor() {ThreadFactory threadFactory = new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern("one-thread-%s").build();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(1);executor.setThreadFactory(threadFactory);executor.setThreadGroup(new ThreadGroup("1"));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;
}

设计要点:
线程池(保证任务顺序执行),如果无须,那就按照当前的服务节点配置来设置参数
自定义异常处理(防止线程因异常崩溃)
自定义拒绝策略(任务队列满时阻塞等待)

自定义异常处理

class MyThreadException implements Thread.UncaughtExceptionHandler {@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("异常: {},线程: {}", ExceptionUtils.getStackTrace(e), t.getName());}
}

作用:防止线程因未捕获异常直接终止,提升系统稳定性。当然这个是处理线程池中子任务处理业务逻辑的时候发生业务异常的处理方式,除此之外还有其他的解决方案

异常处理
  • afterExecute() 处理异常(可扩展) :用于处理执行过程中抛出的异常
  • uncaughtExceptionHandler 处理未捕获异常(默认 JVM 打印堆栈): 用于处理线程未捕获的异常;
  • RejectedExecutionHandler 处理任务拒绝:处理任务被拒绝的情况。

处理顺序:

  1. 当任务执行时,如果任务抛出异常,它会首先被 afterExecute() 捕获,并且你可以在这里进行进一步的处理。
  2. 如果任务中的异常没有被 afterExecute() 捕获或处理,且是未捕获异常,它会交由 uncaughtExceptionHandler 进行处理。
  3. RejectedExecutionHandler 是处理线程池拒绝接受新任务的情况,这通常和任务执行过程中的异常无关,主要处理线程池饱和时的情况。
    注意:beforeExecute() 在任务开始执行前调用,通常用于准备工作;
    异常处理上,beforeExecute() 不会直接处理任务执行过程中的异常,但可以捕获并处理自己内部的异常;

相关源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1️⃣ 线程池当前线程数 < corePoolSize,则尝试新增核心线程执行任务
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2️⃣ 线程池已满,尝试加入工作队列
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command); // 任务队列中的任务被拒绝
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 防止线程池为空,确保有线程执行任务
}
// 3️⃣ 线程池满且队列满,尝试新增非核心线程
else if (!addWorker(command, false))
reject(command); // 线程池已满,拒绝任务
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 1️⃣ 执行任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // ⚠ 任务执行点
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 2️⃣ 任务执行后的扩展方法
}
task = null;
w.completedTasks++;
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 3️⃣ 任务异常退出,删除该线程
}
}

自定义拒绝策略-重新放回队列
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.info("队列已满,阻塞等待...");executor.getQueue().put(r);log.info("任务已加入队列");}} catch (Exception e) {log.error("拒绝策略异常", e);}}
}

作用:
默认拒绝策略丢弃任务,而此策略会阻塞等待,确保任务不丢失。
适用于任务量较大,但不能丢失任务的场景(如消息队列处理)

自定义拒绝策略-主线程执行
    /*** 自定义线程池,防止使用默认线程池导致内存溢出** @param* @return* @author bu.junjie* @date 2021/11/10 10:00*/@Bean(name = "myExecutor")public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");// 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}

适用场景

✅ 高并发请求(如 HTTP 任务)
✅ 后台数据处理(如日志分析、批量计算)
✅ 长时间任务(如大文件处理、消息队列消费)

总结

  • 自定义线程池 防止资源浪费,提升吞吐量。
  • 异常处理 避免线程因未捕获异常而终止。
  • 优化拒绝策略 防止任务丢失,提高系统可靠性。

线程池优化是高并发系统的关键,希望本篇博客能帮助你更好地理解和应用线程池! 🚀🚀🚀

完整代码示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置参数** @version 1.0.0* @createTime 2025-11-09 14:01*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {@Resourceprivate ThreadProperties threadProperties;/*** 自定义线程池,防止使用默认线程池导致内存溢出** @param* @return* @author bu.junjie* @date 2021/11/10 10:00*/@Bean(name = "myExecutor")public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");// 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}/*** 信号计算时的线程池(1号线程池)** @param* @return* @author bu.junjie* @date 2022/1/5 13:01*/@Bean(name = "oneExecutor")public Executor oneExecutor() {ThreadFactory threadFactory = new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern("one-thread-%s").build();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setThreadFactory(threadFactory);executor.setQueueCapacity(1);executor.setThreadGroup(new ThreadGroup("1"));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;}class MyThreadException implements Thread.UncaughtExceptionHandler {/*** Method invoked when the given thread terminates due to the* given uncaught exception.* <p>Any exception thrown by this method will be ignored by the* Java Virtual Machine.** @param t the thread* @param e the exception*/@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("MyThreadException is   exception=【{}】,Thread = 【{}】", ExceptionUtils.getStackTrace(e), t.getName());}}/*** 拒绝策略优化** @param* @author bu.junjie* @date 2022/1/8 14:06* @return*/public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// 核心改造点,由blockingqueue的offer改成put阻塞方法if (!executor.isShutdown()) {long start = System.currentTimeMillis();log.info("当前阻塞队列已满开始请求存放队列束!!!");executor.getQueue().put(r);log.info("存放阻塞队列成功,阻塞时间time = 【{}】", System.currentTimeMillis() - start);}} catch (Exception e) {e.printStackTrace();}}}}

思考

为什么拒绝策略要重新抛出异常?

我们会发现默认的四种拒绝策略在处理完业务逻辑之后还会重新抛出异常,就算你是自定义的拒绝策略也需要重新抛出异常,为什么呢?不抛出会怎么样?

如果不抛出异常,调用方(业务代码)无法感知任务被拒绝,可能导致任务丢失或业务逻辑异常。

场景分析

线程池队列满了时,会触发 rejectedExecution 方法。如果我们只是记录日志,而不抛出异常:

  • 主线程会继续执行,但任务并未真正执行,业务方无法感知到这个问题。
  • 可能导致数据丢失,尤其是在关键业务(如支付、订单、消息处理)场景中。
重新抛出异常的好处

✅ 保证调用方可以感知任务拒绝,决定是否降级处理、重试或报警。
✅ 防止静默丢失任务,保证业务的可靠性。
✅ 与 Spring 线程池默认行为保持一致,防止意外吞掉异常。

代码示例

❌ 错误示例(未抛出异常,可能导致任务丢失)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn("队列已满,任务阻塞等待...");executor.getQueue().put(r); // 可能抛出异常log.info("任务已放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 仅恢复中断状态,但未通知调用方}}
}

问题:
调用方不会收到异常,以为任务已经成功执行,但其实可能丢失了。
例如,在支付系统中,如果订单更新任务丢失,可能导致订单状态未更新。

✅ 正确示例(重新抛出异常,保证调用方可感知)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn("队列已满,阻塞等待...");executor.getQueue().put(r);log.info("任务成功进入队列");return; // 任务成功加入队列后不需要抛异常}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 恢复线程中断状态throw new RejectedExecutionException("任务提交被中断", e);} catch (Exception e) {log.error("任务拒绝策略发生异常", e);throw new RejectedExecutionException("自定义拒绝策略异常", e);}}
}

改进点:
任务成功放入队列时不会抛异常,避免不必要的错误。
如果 put() 失败,抛出 RejectedExecutionException,让业务方感知。
捕获 InterruptedException 并恢复中断状态,避免影响后续任务。
其实这个原因和为什么需要恢复线程中断一样的逻辑,也是为了让调用方感知到

业务方如何处理异常?

如果 rejectedExecution 抛出 RejectedExecutionException,业务代码可以捕获异常并进行降级,例如:

try {executor.execute(task);
} catch (RejectedExecutionException e) {log.error("线程池已满,任务执行失败,进行降级处理", e);// 业务降级策略,例如:saveToDatabaseForLaterProcessing(task);
}

降级方案:如果线程池拒绝任务,可以存入 数据库、MQ 或 重试队列,避免任务丢失。

结论

🚀 必须重新抛出异常,否则:

  • 任务可能悄悄丢失,业务方无法感知。
  • 可能影响数据一致性(如支付、订单、日志处理)。
  • 业务代码无法主动补救(重试、降级等)。

最佳实践:

  • 成功放入队列 → 不抛异常
  • 任务无法处理 → 抛出 RejectedExecutionException,让调用方感知
    这样可以既保证任务不丢失,又确保调用方有能力处理拒绝任务!🔥

自定义拒绝策略put()方法?

其实默认拒绝策略是offer()方法是非阻塞的,也就是只要队列中的任务只要有,那就去创建子线程,直至触发拒绝策略
✅ 正确示例

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {System.out.println("队列已满,阻塞等待...");executor.getQueue().put(r);  // 阻塞等待队列有空位System.out.println("任务重新加入队列:" + r.toString());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("任务提交失败,线程被中断", e);}}
}

http://www.ppmy.cn/ops/157525.html

相关文章

博客项目-day02(登录功能)

登录功能 这里使用JWT令牌技术进行登录功能的实现 JWT介绍 需要导入jjwt的依赖 先看接口 传入账号密码返回token 先导入一个JWT工具类 public class JWTUtils {private static final String jwtToken "123456Mszlu!###$$";public static String createToken(Long…

在OAS中设计简单抬头显示器

在OAS中设计简单抬头显示器 本文演示了如何使用OAS工具设计抬头显示器。 简介 车载HUD是一种将关键信息投射到驾驶员前方视野中的设备&#xff0c;使驾驶员无需低头即可获取车辆状态和导航等数据。构建车载HUD的理论模型时&#xff0c;需综合考虑光学系统、投影技术、人机交…

C 移位运算符

宏定义 #define GET_BIT(n) ((1 << (n))) 用于生成一个整数&#xff0c;该整数在第 n 位上是 1&#xff0c;其余位都是 0。这个宏通常用于位操作&#xff0c;比如设置、清除或检查某个特定位置的标志位。 1 << (n)&#xff1a;这是位移操作符。它将数字 1 左移 n …

Spring Test 的作用与优势

场景设定 假设你开了一家餐厅&#xff0c;需要测试每个环节是否正常&#xff1a; 服务员点单&#xff08;Service 层&#xff09;厨师做菜&#xff08;DAO 层&#xff0c;操作数据库&#xff09;菜品配送&#xff08;Web 层&#xff0c;API 接口&#xff09; 问题来了&#…

Http和Socks的区别?

HTTP 和 SOCKS 的区别 HTTP 和 SOCKS 都是用于网络通信的协议&#xff0c;但它们在工作原理、应用场景和实现方式上有显著的区别。以下是详细的对比和说明。 一、HTTP 协议 1. 定义 HTTP&#xff08;HyperText Transfer Protocol&#xff09;是用于传输超文本数据的应用层协…

C#常用集合优缺点对比

先上结论&#xff1a; 在C#中&#xff0c;链表、一维数组、字典、List<T>和ArrayList是常见的数据集合类型&#xff0c;它们各有优缺点&#xff0c;适用于不同的场景。以下是它们的比较&#xff1a; 1. 一维数组 (T[]) 优点&#xff1a; 性能高&#xff1a;数组在内存中…

openAI官方prompt技巧(二)

1. 赋予 ChatGPT 角色 为 ChatGPT 指定一个角色&#xff0c;让其从特定的身份或视角回答问题。这有助于生成针对特定受众或场景的定制化回答。 例如&#xff1a; 你是一名数据分析师&#xff0c;负责我们的市场营销团队。请总结上个季度的营销活动表现&#xff0c;并强调与未…

0210作业

思维导图 作业 练习 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);this->setMouseTracking(true); }Widget::~Widget() {delete ui; }void Widget::mou…