我们在后端执行某些耗时逻辑操作时往往会导致长时间的线程阻塞,在这种情况之下,我们往往会引一条异步线程去处理这些异步任务,如果每次都创建新的线程来处理这些任务,不仅会增加代码冗余,还可能造成线程管理混乱,影响系统性能。在我们的Spring框架中是自带异步任务处理机制的,比如我们使用@Async 注解可以处理一些简单的异步任务,但这样确实无法精确去控制线程池资源,也无法灵活去管理任务调度,由此,我们可以去自行设计一个高效的自定义异步任务管理器去统一调度处理我们的自定义任务。
1.前置配置
自定义线程池,并将其注册到IOC容器中
/*** 自定义线程池配置* @Author GuihaoLv**/ @Configuration public class ThreadPoolConfig {// 核心线程池大小private int corePoolSize = 50;// 最大可创建的线程数private int maxPoolSize = 200;// 队列最大长度private int queueCapacity = 1000;// 线程池维护线程所允许的空闲时间private int keepAliveSeconds = 300;/*** 通用任务线程池* @return*/@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setMaxPoolSize(maxPoolSize);executor.setCorePoolSize(corePoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略//当线程池满了,新任务无法加入时,CallerRunsPolicy 让提交任务的线程(即调用方线程)直接执行该任务,// 而不是丢弃或抛出异常,从而保证任务不会丢失。executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}/*** 执行周期性或定时任务*/@Bean(name = "scheduledExecutorService")protected ScheduledExecutorService scheduledExecutorService(){//这里没有最大线程数的概念,所有线程都属于核心线程。return new ScheduledThreadPoolExecutor(corePoolSize,new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d") //设置线程名称,方便排查日志。.daemon(true).build(), //daemon(true) 使线程池中的线程成为 守护线程,即 JVM 退出时不会阻止进程终止。new ThreadPoolExecutor.CallerRunsPolicy()) //使用 CallerRunsPolicy,避免任务丢失。{//任务执行完毕后,调用 Threads.printException(r, t),捕获并记录异常,确保线程池不会因为未捕获的异常而崩溃。@Overrideprotected void afterExecute(Runnable r, Throwable t){super.afterExecute(r, t);Threads.printException(r, t);}};} }
2.异步任务管理器配置
/*** 异步任务管理器* AsyncManager 是 整个异步任务调度的核心,它提供了 任务执行、调度和管理。* @Author GuihaoLv*/ public class AsyncManager {/*** 操作延迟10毫秒*/private final int OPERATE_DELAY_TIME = 10;/*** 异步操作任务调度线程池* executor 采用 ScheduledExecutorService 线程池,可以 定时执行异步任务,提高并发能力*/private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");/*** 单例模式* 采用 单例模式,确保全局只有一个 AsyncManager 实例,保证任务调度统一管理。创建单例对象*/private AsyncManager(){}//创建异步任务管理器的静态对象private static AsyncManager me = new AsyncManager();public static AsyncManager me(){return me;}/*** 使用调度线程池执行任务* @param task 任务*///TimerTask是Java编程语言中的一个抽象类,通常用于安排将来某个时间执行的任务,或者以固定的速率重复执行的任务。// 它是与Timer类一起使用的,Timer负责管理和调度这些任务。public void execute(TimerTask task){executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);}/*** 停止任务线程池*/public void shutdown(){//优雅关闭线程池Threads.shutdownAndAwaitTermination(executor);} }
/*** 确保应用退出时能关闭后台线程* @Author GuihaoLv*/ @Component public class ShutdownManager {private static final Logger logger = LoggerFactory.getLogger("sys-user");@PreDestroy //它用于在 Spring Bean 被销毁前执行清理逻辑。public void destroy(){shutdownAsyncManager();HttpUtils.shutdown();}/*** 停止异步执行任务*/private void shutdownAsyncManager(){try{logger.info("====关闭后台任务任务线程池====");AsyncManager.me().shutdown();}catch (Exception e){logger.error(e.getMessage(), e);}}}
3.异步工厂配置,将异步任务的启动逻辑都扔到异步工厂中处理
/*** 异步工厂(产生任务用)* AsyncFactory 主要用于创建异步任务,它相当于一个 "任务工厂",* 可以根据不同的需求创建不同的任务(如记录用户登录信息、记录操作日志)。** @Author GuihaoLv*/ public class AsyncFactory {private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user");// 定义任务:计算热点文章public static TimerTask calculateHotArticlesTask() {return new TimerTask() {@Overridepublic void run() {try {// 获取 HotArticleAsycTask 实例HotArticleAsycTask hotArticleAsycTask = SpringUtils.getBean(HotArticleAsycTask.class);// 执行热点文章计算任务hotArticleAsycTask.calculateHotArticles();} catch (Exception e) {sys_user_logger.error("计算热点文章失败", e);}}};}// 定义任务:定时热搜清理public static TimerTask cleanupOldKeywords() {return new TimerTask() {@Overridepublic void run() {try {// 获取 HotArticleAsycTask 实例HotSearchCleanupTask hotArticleAsycTask = SpringUtils.getBean(HotSearchCleanupTask.class);// 执行热点文章计算任务hotArticleAsycTask.cleanupOldKeywords();} catch (Exception e) {sys_user_logger.error("清理热搜失败", e);}}};} }
4.调用异步工厂中的逻辑
/*** 热点文章实时计算* @Author GuihaoLv*/ @Component public class HotArticleAsycTask {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String LIKES_KEY = "blog:likes";private static final String FAVORITES_KEY = "blog:favorites";private static final String HOT_ARTICLES_KEY = "blog:hot"; // 存热点文章public void calculateHotArticles() {Set<String> blogIds = redisTemplate.opsForZSet().range(LIKES_KEY, 0, -1);if (blogIds == null || blogIds.isEmpty()) return;for (String blogId : blogIds) {Double likes = redisTemplate.opsForZSet().score(LIKES_KEY, blogId);Double favorites = redisTemplate.opsForZSet().score(FAVORITES_KEY, blogId);// 计算热度double hotScore = (likes != null ? likes * 5.0 : 0) +(favorites != null ? favorites * 8.0 : 0);// 热度超过阈值 500,加入热点文章if (hotScore >= 500) {redisTemplate.opsForZSet().add(HOT_ARTICLES_KEY, blogId, hotScore);System.out.println("🔥 文章 " + blogId + " 进入热点榜,热度:" + hotScore);}}}/*** 每 5 分钟执行一次的定时任务*/@Scheduled(fixedRate = 100000) // 每 5 分钟执行一次public void scheduleHotArticlesCalculation() {// 将任务交给异步任务管理器执行AsyncManager.me().execute(AsyncFactory.calculateHotArticlesTask());} }