概述
这是一个多线程处理一个文件列表的例子。通过这个例子模拟实际遇到的多线程处理列表的场景。process可以场景中处理每个元素的方法。
有6个函数。
fun1是最简单的遍历处理,需要55s。
fun2是用CompletionService+线程池的方式处理,2s
fun3是用CountDownLatch+线程池的方式处理,2s
fun4是用CompletableFuture异步的方式处理,即主线程直接返回,副线程遍历处理,55s
fun5是用CompletableFuture异步+线程池的方式处理,2s
fun6是用CompletableFuture异步+线程池+CountDownLatch的方式处理,2s。
代码
java">package 多线程.demo05多jsonCountDownLatch;import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;@Slf4j
public class Context {private static ObjectMapper objectMapper = new ObjectMapper();private static String BASE_DIR = "D:\\development\\data\\500QA-json";private static int numThreads = Runtime.getRuntime().availableProcessors();private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);@SneakyThrowspublic static void main(String[] args){testProcessZip();}@SneakyThrowspublic static void test(){String fileDir = BASE_DIR + "\\" + "1ifrnp2e1weewrphw2mpv96k36lh4h71b7w0-QA-1.json";String jsonString = new String(Files.readAllBytes(Paths.get(fileDir)));OriginalObject originalObject = objectMapper.readValue(jsonString, OriginalObject.class);System.out.println("1");}public static void testProcessZip(){ZipProcessor zipProcessor = new ZipProcessor("D:\\development\\data\\500QA-json.zip");
// zipProcessor.streamProcess();zipProcessor.streamParallelProcess();}public static List<String> getFileNames(String dirPath) throws IOException {Path dir = Paths.get(dirPath); // 替换为你的文件夹路径// 使用Files.walk遍历目录树,并通过filter筛选出文件而不是目录,最后map映射为文件名return Files.walk(dir).filter(Files::isRegularFile) // 只保留普通文件,排除目录.map(Path::getFileName) // 获取文件名.map(Path::toString) // 将Path对象转换为字符串.collect(Collectors.toList()); // 收集结果到List}// 单线程遍历处理 55s@SneakyThrowspublic static void fun1(){StopWatch stopWatch = new StopWatch();stopWatch.start();List<String> fileNames = getFileNames(BASE_DIR);for (String fileName : fileNames) {process(fileName);}stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 多线程处理 2s@SneakyThrowspublic static void fun2(){StopWatch stopWatch = new StopWatch();stopWatch.start();// 使用CompletionService来管理任务完成情况CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);// 提交所有任务List<String> fileNames = getFileNames(BASE_DIR);for (String fileName : fileNames) {executorService.submit(() -> {process(fileName);return null;});}// 等待所有任务完成int totalTasks = fileNames.size();for (int i = 0; i < totalTasks; i++) {try {// 阻塞直到获取到一个完成的任务Future<Void> future = completionService.take();// 如果有异常会在这里抛出future.get();} catch (InterruptedException | ExecutionException e) {log.error("异常", e);}}executorService.shutdown(); // 关闭线程池stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 多线程处理 2s@SneakyThrowspublic static void fun3(){StopWatch stopWatch = new StopWatch();stopWatch.start();// 提交所有任务List<String> fileNames = getFileNames(BASE_DIR);// 初始化CountDownLatchCountDownLatch latch = new CountDownLatch(fileNames.size());for (String fileName : fileNames) {executorService.submit(() -> {try{process(fileName);}catch (Exception e){log.error("异常", e);}finally {// 计数-1latch.countDown();}});}// 等待所有任务完成try {latch.await(); // 阻塞当前线程,直到计数变为零} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("主线程被中断: " + e.getMessage());}executorService.shutdown(); // 关闭线程池stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 异步处理, 55s@SneakyThrowspublic static void fun4(){StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");// 异步执行处理JSON的逻辑CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {processFiles(BASE_DIR);} catch (IOException e) {log.error("处理文件时发生异常:{}", e.getMessage(), e);}}, executorService);// 注册回调函数,在异步任务完成后执行future.thenRun(() -> {stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown();});// 接口立即返回log.info("接口已返回,异步处理中...");}// 异步多线程处理, 3s@SneakyThrowspublic static void fun5(){StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");CompletableFuture<Void> allFilesProcessedFuture = processFilesAsync(BASE_DIR);// 注册回调函数,在所有异步任务完成后执行allFilesProcessedFuture.thenRun(() -> {stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown();}).exceptionally(ex -> {stopWatch.stop();log.error("处理过程中发生异常:{}", ex.getMessage(), ex);return null;});// 接口立即返回log.info("接口已返回,异步多线程处理中...");}@SneakyThrowspublic static void fun6() {StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");// 获取文件名列表并初始化CountDownLatchList<String> fileNames = getFileNames(BASE_DIR);CountDownLatch latch = new CountDownLatch(fileNames.size());// 提交所有任务for (String fileName : fileNames) {executorService.submit(() -> {try {process(fileName);} catch (Exception e) {log.error("处理文件时发生异常:{}", e.getMessage(), e);} finally {latch.countDown(); // 任务完成后计数减1}});}log.info("所有的任务提交完成");// 注册回调函数,在所有任务完成后执行CompletableFuture.runAsync(() -> {try {latch.await(); // 阻塞当前线程,直到所有任务完成stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown(); // 关闭线程池} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("主线程被中断:{}", e.getMessage(), e);}}, executorService);// 接口立即返回log.info("接口已返回,fun6异步多线程处理中...");}private static CompletableFuture<Void> processFilesAsync(String baseDir) {try {List<String> fileNames = getFileNames(baseDir);List<CompletableFuture<Void>> futures = fileNames.stream().map(fileName -> CompletableFuture.runAsync(() -> process(fileName), executorService)).collect(Collectors.toList());// 等待所有任务完成return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));} catch (IOException e) {return CompletableFuture.failedFuture(e);}}private static void processFiles(String baseDir) throws IOException {List<String> fileNames = getFileNames(baseDir);for (String fileName : fileNames) {process(fileName);}}@SneakyThrowspublic static void process(String fileName){String jsonDir = BASE_DIR + "\\" + fileName;String jsonString = new String(Files.readAllBytes(Paths.get(jsonDir)));OriginalObject originalObject = objectMapper.readValue(jsonString, OriginalObject.class);Thread.sleep(100);System.out.println(originalObject.getFdId());}public static void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();}}
}
总结一下
CompletableFuture是真好用,可以异步执行,可以接入线程池,还可以设置回调,可以很方便的管理异步逻辑。