下面代码测试可知:超时设置需要在map之后才有效,换句话说就是,超时只对超时设置之前的代码有用
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;public class TimeoutTest {public static void main(String[] args) throws InterruptedException {TimeoutTest test = new TimeoutTest();System.out.println("=== 测试超时任务 ===");test.testTimeout();System.out.println("\n=== 测试正常任务 ===");test.testNormal();System.out.println("\n=== 测试长任务 ===");test.testLongTask();// 确保主线程不退出Thread.sleep(5000);}// 测试超时任务public void testTimeout() {Observable.create(emitter -> {System.out.println("超时任务模拟:执行开始...");
// Thread.sleep(2000); // 模拟超长时间任务(超过1秒)emitter.onNext("任务完成");emitter.onComplete();}).map(s->{Thread.sleep(4000); // 模拟超长时间任务(超过1秒)System.out.println("处理s0!");return s + "x";}).timeout(3, TimeUnit.SECONDS) // 设置超时时间为1秒.onErrorResumeNext(throwable -> {if (throwable instanceof java.util.concurrent.TimeoutException) {System.out.println("任务处理超时:跳过当前任务!");} else {System.err.println("任务发生其他异常:" + throwable.getMessage());}return Observable.empty(); // 返回空的Observable,继续处理其他任务}).subscribe(result -> System.out.println("结果: " + result),throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),() -> System.out.println("任务已完成"));}// 测试正常任务public void testNormal() {Observable.create(emitter -> {System.out.println("正常任务模拟:执行开始...");Thread.sleep(500); // 模拟快速任务(小于1秒)emitter.onNext("任务完成");emitter.onComplete();}).timeout(1, TimeUnit.SECONDS) // 设置超时时间为1秒.onErrorResumeNext(throwable -> {System.err.println("任务超时或其他异常:" + throwable.getMessage());return Observable.empty();}).subscribe(result -> System.out.println("结果: " + result),throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),() -> System.out.println("任务已完成"));}// 测试长时间任务public void testLongTask() {Observable.create(emitter -> {System.out.println("长时间任务模拟:执行开始...");for (int i = 0; i < 5; i++) {System.out.println("任务进行中: Step " + (i + 1));Thread.sleep(600); // 模拟分段任务,每次处理600msemitter.onNext("Step " + (i + 1));}emitter.onComplete();}).timeout(1, TimeUnit.SECONDS) // 设置超时时间为1秒.onErrorResumeNext(throwable -> {if (throwable instanceof java.util.concurrent.TimeoutException) {System.out.println("任务处理超时:跳过当前任务!");} else {System.err.println("任务发生其他异常:" + throwable.getMessage());}return Observable.empty(); // 返回空的Observable,继续处理其他任务}).subscribe(result -> System.out.println("结果: " + result),throwable -> System.err.println("订阅时异常: " + throwable.getMessage()),() -> System.out.println("任务已完成"));}
}