@Async 注解的使用和实现
- 1.使用
- 2.实现
- 3.自定义@Async 的线程池
- 3.1 第一种方式
- 3.2 第二种方式
1.使用
1.1 SpringBootApplication 启动类或者配置类中添加@EnableAsync 注解。
1.2 在需要异步执行的方法中加上@Async 注解。
注意:
@Async 注解应该用在“public”方法上。
@Async 注解所在的类应该被 Spring 容器管理。
@Async 注解作用的方法,被调用的时候需要被Spring动态代理到,同类方法不能直接用this.xxx()调用否则不生效。
@EnableAsync
@SpringBootApplication
public class DemoAsyncApplication {public static void main(String[] args) {SpringApplication.run(DemoAsyncApplication.class, args);}}
@Slf4j
@Service
public class AsyncService {@Asyncpublic void test(){try {TimeUnit.SECONDS.sleep(1);log.info(Thread.currentThread().getName()+"AsyncService");} catch (InterruptedException e) {throw new RuntimeException(e);}}}
2.实现
@Async 注解 的默认使用的是bean name为”taskExecutor“,核心线程数为8,最大线程数为Integer.MAX_VALUE,阻塞队列为LinkedBlockingQueue,阻塞队列的大小Integer.MAX_VALUE,为线程空闲超时时间为60秒。
public static class Pool {/*** Queue capacity. An unbounded capacity does not increase the pool and therefore* ignores the "max-size" property.*/private int queueCapacity = Integer.MAX_VALUE;/*** Core number of threads.*/private int coreSize = 8;/*** Maximum allowed number of threads. If tasks are filling up the queue, the pool* can expand up to that size to accommodate the load. Ignored if the queue is* unbounded.*/private int maxSize = Integer.MAX_VALUE;/*** Whether core threads are allowed to time out. This enables dynamic growing and* shrinking of the pool.*/private boolean allowCoreThreadTimeout = true;/*** Time limit for which threads may remain idle before being terminated.*/private Duration keepAlive = Duration.ofSeconds(60);
使用的默认线程是在配置类TaskExecutionAutoConfiguration 里配置的。
/*** {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.** @author Stephane Nicoll* @author Camille Vienot* @since 2.1.0*/
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {/*** Bean name of the application {@link TaskExecutor}.*/public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";@Bean@ConditionalOnMissingBeanpublic TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,ObjectProvider<TaskDecorator> taskDecorator) {TaskExecutionProperties.Pool pool = properties.getPool();TaskExecutorBuilder builder = new TaskExecutorBuilder();builder = builder.queueCapacity(pool.getQueueCapacity());builder = builder.corePoolSize(pool.getCoreSize());builder = builder.maxPoolSize(pool.getMaxSize());builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());builder = builder.keepAlive(pool.getKeepAlive());Shutdown shutdown = properties.getShutdown();builder = builder.awaitTermination(shutdown.isAwaitTermination());builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());builder = builder.threadNamePrefix(properties.getThreadNamePrefix());builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);builder = builder.taskDecorator(taskDecorator.getIfUnique());return builder;}@Lazy@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })@ConditionalOnMissingBean(Executor.class)public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {return builder.build();}}
使用用线程池去执行任务:
AsyncExecutionAspectSupport.java
@Override@Nullablepublic Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};return doSubmit(task, executor, invocation.getMethod().getReturnType());}
获取线程池,先看注解的value是否有值,有值则根据bean 那么获取线程池,否则从默认的线程池中获取。
@Nullableprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {targetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}
3.自定义@Async 的线程池
自定义@Async 的线程池有两种实现方式,第一种是自定义一个线程池,然后写在注解里 @Async(value = "asyncMarkValueExecutor")
;第二种是实现AsyncConfigurer
接口,重写getAsyncExecutor()
方法。
3.1 第一种方式
1.定义一个线程池,注入spring
@Bean(name = "asyncMarkValueExecutor")public Executor asyncMarkValueExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(1000);executor.setQueueCapacity(10000);executor.setThreadNamePrefix("async-thread-pool-");executor.initialize();return executor;}
2.将线程池的bean name加到注解里。
@Async(value = "asyncMarkValueExecutor")public void test(){try {TimeUnit.SECONDS.sleep(1);log.info(Thread.currentThread().getName()+"AsyncService");} catch (InterruptedException e) {throw new RuntimeException(e);}}
3.2 第二种方式
实现AsyncConfigurer
接口,重写getAsyncExecutor()
方法。
@Configuration
public class MyConfig implements AsyncConfigurer {@Bean(name = "asyncExecutor")public Executor asyncExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(2000);executor.setQueueCapacity(10000);executor.setThreadNamePrefix("my-thread-pool-");executor.initialize();return executor;}@Overridepublic Executor getAsyncExecutor() {return asyncExecutor();}
}