futureTask实现了Runnable, Future接口,Future接口有如下定义:
/*** 取消任务,如果任务已经在执行,mayInterruptIfRunning为true,则* 向执行线程发送interrupt事件,否则任由任务执行完毕。* 返回值为是否取消成功*/boolean cancel(boolean mayInterruptIfRunning);// 任务是否已经被取消boolean isCancelled();// 任务是否已经执行完毕boolean isDone();/** * 等待并获取结果,会抛出以下异常* 1. CancellationException 任务被取消* 2. ExecutionException 任务执行异常* 3. InterruptedException 执行线程在等待时收到interrupted事件*/V get() throws InterruptedException, ExecutionException;/*** Waits if necessary for at most the given time for the computation* to complete, and then retrieves its result, if available.** @param timeout the maximum time to wait* @param unit the time unit of the timeout argument* @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread was interrupted* while waiting* @throws TimeoutException if the wait timed out*//** * 等待并获取结果,timeout是最大等待时间,会抛出以下异常* 1. CancellationException 任务被取消* 2. ExecutionException 任务执行异常* 3. InterruptedException 执行线程在等待时收到interrupted事件* 4. 等待超时*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
1. isDone示例
package org.example.concurrent;import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Test;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;import static org.assertj.core.api.Assertions.assertThat;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask running");return "ok";});// 提交线程池之前,futureTask的状态为未执行assertThat(futureTask).isNotDone();executorService.execute(futureTask);// 等待100ms后,执行成功assertThat(futureTask).succeedsWithin(100L, TimeUnit.MILLISECONDS, InstanceOfAssertFactories.STRING).contains("ok");}
}
2. cancel和 isCancelled
2.1 任务执行前cancel
示例:
package org.example.concurrent;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask running");return "ok";});// 任务执行前cancelfutureTask.cancel(false);assertThat(futureTask).isCancelled().isNotDone();executorService.execute(futureTask);// 取消的任务get会报取消异常assertThatExceptionOfType(CancellationException.class).isThrownBy(futureTask::get);// get后futureTask变为isDone;assertThat(futureTask).isCancelled().isDone();}
}
2.2 任务执行中取消任务
示例1:取消,不通知任务线程中断
package org.example.concurrent;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.concurrent.*;import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@SneakyThrows@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask start");sleep(300L);log.debug("futureTask end");return "ok";});executorService.execute(futureTask);// 等待100毫秒,此时futureTask正在运行, 然后取消任务sleep(100L);futureTask.cancel(false);// 取消的任务get会报取消异常assertThatExceptionOfType(CancellationException.class).isThrownBy(futureTask::get);// get后futureTask变为isDone;assertThat(futureTask).isCancelled().isDone();// 继续等待1秒,会发现任务会执行完毕sleep(1000L);}@SneakyThrowsprivate void sleep(long delay) {TimeUnit.MILLISECONDS.sleep(delay);}
}
执行日志:
11:53:58.340 [pool-1-thread-1] DEBUG o.example.concurrent.FutureTaskTest - futureTask start
11:53:58.649 [pool-1-thread-1] DEBUG o.example.concurrent.FutureTaskTest - futureTask end
示例2:取消并通知任务线程中断(如果线程处于阻塞中,会感知到自身中断)
package org.example.concurrent;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.concurrent.*;import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@SneakyThrows@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask start");try {sleep(300L);} catch (Exception e) {// 捕获打印异常名称并重新抛出log.error("catch Exception:{}", e.getClass().getSimpleName());throw e;}log.debug("futureTask end");return "ok";});executorService.execute(futureTask);// 等待100毫秒,此时futureTask正在运行, 然后取消任务sleep(100L);futureTask.cancel(true);// 取消的任务get会报取消异常assertThatExceptionOfType(CancellationException.class).isThrownBy(futureTask::get);// get后futureTask变为isDone;assertThat(futureTask).isCancelled().isDone();// 继续等待1秒,会发现任务会执行完毕sleep(1000L);}@SneakyThrowsprivate void sleep(long delay) {TimeUnit.MILLISECONDS.sleep(delay);}
}
执行过程中,futureTask执行线程处于休眠中,能够感知到线程中断,执行结果如下:
11:59:10.410 [pool-1-thread-1] DEBUG o.example.concurrent.FutureTaskTest - futureTask start
11:59:10.513 [pool-1-thread-1] ERROR o.example.concurrent.FutureTaskTest - catch Exception:InterruptedException
11:59:10.515 [pool-1-thread-1] DEBUG o.example.concurrent.FutureTaskTest - futureTask end
示例3:重复取消任务,会取消失败
package org.example.concurrent;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.concurrent.*;import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@SneakyThrows@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask start");sleep(300L);log.debug("futureTask end");return "ok";});executorService.execute(futureTask);// 等待100毫秒,sleep(100L);// 此时futureTask正在运行, 然后取消任务, 如果重复取消,会返回失败assertThat(futureTask.cancel(false)).isTrue();assertThat(futureTask.cancel(false)).isFalse();// 取消的任务get会报取消异常assertThatExceptionOfType(CancellationException.class).isThrownBy(futureTask::get);}@SneakyThrowsprivate void sleep(long delay) {TimeUnit.MILLISECONDS.sleep(delay);}
}
2.3 任务执行完成后cancel
任务执行完毕后在取消任务执行,取消动作会失败。参考下面的代码:
package org.example.concurrent;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Test;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;import static org.assertj.core.api.Assertions.assertThat;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@SneakyThrows@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask start");log.debug("futureTask end");return "ok";});executorService.execute(futureTask);// 等待100毫秒,此时futureTask已经执行完毕, 取消任务执行会失败sleep(100L);assertThat(futureTask.cancel(true)).isFalse();// 判断执行成功assertThat(futureTask).succeedsWithin(0L, TimeUnit.MILLISECONDS, InstanceOfAssertFactories.STRING).contains("ok");}@SneakyThrowsprivate void sleep(long delay) {TimeUnit.MILLISECONDS.sleep(delay);}
}
3. get等待指定时间
get指定最大等待时间,如果超出此时间,则会抛出TimeoutException异常:
package org.example.concurrent;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.util.concurrent.*;import static org.assertj.core.api.Assertions.assertThatExceptionOfType;@Slf4j
public class FutureTaskTest {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@SneakyThrows@Testpublic void test() {FutureTask<String> futureTask = new FutureTask<>(() -> {log.debug("futureTask start");sleep(1000L);log.debug("futureTask end");return "ok";});executorService.execute(futureTask);// get等待10毫秒,由于futureTask会执行1秒钟,get会报超时异常assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> futureTask.get(10L, TimeUnit.MILLISECONDS));}@SneakyThrowsprivate void sleep(long delay) {TimeUnit.MILLISECONDS.sleep(delay);}
}