java多线程场景2-多线程处理一个列表

news/2025/1/13 8:40:49/

概述

这是一个多线程处理一个文件列表的例子。通过这个例子模拟实际遇到的多线程处理列表的场景。process可以场景中处理每个元素的方法。
有6个函数。
fun1是最简单的遍历处理,需要55s。
fun2是用CompletionService+线程池的方式处理,2s
fun3是用CountDownLatch+线程池的方式处理,2s
fun4是用CompletableFuture异步的方式处理,即主线程直接返回,副线程遍历处理,55s
fun5是用CompletableFuture异步+线程池的方式处理,2s
fun6是用CompletableFuture异步+线程池+CountDownLatch的方式处理,2s。

代码

java">package 多线程.demo05多jsonCountDownLatch;import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;@Slf4j
public class Context {private static ObjectMapper objectMapper = new ObjectMapper();private static String BASE_DIR = "D:\\development\\data\\500QA-json";private static int numThreads = Runtime.getRuntime().availableProcessors();private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);@SneakyThrowspublic static void main(String[] args){testProcessZip();}@SneakyThrowspublic static void test(){String fileDir = BASE_DIR + "\\" + "1ifrnp2e1weewrphw2mpv96k36lh4h71b7w0-QA-1.json";String jsonString = new String(Files.readAllBytes(Paths.get(fileDir)));OriginalObject originalObject = objectMapper.readValue(jsonString, OriginalObject.class);System.out.println("1");}public static void testProcessZip(){ZipProcessor zipProcessor = new ZipProcessor("D:\\development\\data\\500QA-json.zip");
//        zipProcessor.streamProcess();zipProcessor.streamParallelProcess();}public static List<String> getFileNames(String dirPath) throws IOException {Path dir = Paths.get(dirPath); // 替换为你的文件夹路径// 使用Files.walk遍历目录树,并通过filter筛选出文件而不是目录,最后map映射为文件名return Files.walk(dir).filter(Files::isRegularFile) // 只保留普通文件,排除目录.map(Path::getFileName) // 获取文件名.map(Path::toString) // 将Path对象转换为字符串.collect(Collectors.toList()); // 收集结果到List}// 单线程遍历处理 55s@SneakyThrowspublic static void fun1(){StopWatch stopWatch = new StopWatch();stopWatch.start();List<String> fileNames = getFileNames(BASE_DIR);for (String fileName : fileNames) {process(fileName);}stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 多线程处理 2s@SneakyThrowspublic static void fun2(){StopWatch stopWatch = new StopWatch();stopWatch.start();// 使用CompletionService来管理任务完成情况CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);// 提交所有任务List<String> fileNames = getFileNames(BASE_DIR);for (String fileName : fileNames) {executorService.submit(() -> {process(fileName);return null;});}// 等待所有任务完成int totalTasks = fileNames.size();for (int i = 0; i < totalTasks; i++) {try {// 阻塞直到获取到一个完成的任务Future<Void> future = completionService.take();// 如果有异常会在这里抛出future.get();} catch (InterruptedException | ExecutionException e) {log.error("异常", e);}}executorService.shutdown(); // 关闭线程池stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 多线程处理 2s@SneakyThrowspublic static void fun3(){StopWatch stopWatch = new StopWatch();stopWatch.start();// 提交所有任务List<String> fileNames = getFileNames(BASE_DIR);// 初始化CountDownLatchCountDownLatch latch = new CountDownLatch(fileNames.size());for (String fileName : fileNames) {executorService.submit(() -> {try{process(fileName);}catch (Exception e){log.error("异常", e);}finally {// 计数-1latch.countDown();}});}// 等待所有任务完成try {latch.await(); // 阻塞当前线程,直到计数变为零} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("主线程被中断: " + e.getMessage());}executorService.shutdown(); // 关闭线程池stopWatch.stop();log.info("处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}// 异步处理, 55s@SneakyThrowspublic static void fun4(){StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");// 异步执行处理JSON的逻辑CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {processFiles(BASE_DIR);} catch (IOException e) {log.error("处理文件时发生异常:{}", e.getMessage(), e);}}, executorService);// 注册回调函数,在异步任务完成后执行future.thenRun(() -> {stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown();});// 接口立即返回log.info("接口已返回,异步处理中...");}// 异步多线程处理, 3s@SneakyThrowspublic static void fun5(){StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");CompletableFuture<Void> allFilesProcessedFuture = processFilesAsync(BASE_DIR);// 注册回调函数,在所有异步任务完成后执行allFilesProcessedFuture.thenRun(() -> {stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown();}).exceptionally(ex -> {stopWatch.stop();log.error("处理过程中发生异常:{}", ex.getMessage(), ex);return null;});// 接口立即返回log.info("接口已返回,异步多线程处理中...");}@SneakyThrowspublic static void fun6() {StopWatch stopWatch = new StopWatch();stopWatch.start();log.info("接口开始处理...");// 获取文件名列表并初始化CountDownLatchList<String> fileNames = getFileNames(BASE_DIR);CountDownLatch latch = new CountDownLatch(fileNames.size());// 提交所有任务for (String fileName : fileNames) {executorService.submit(() -> {try {process(fileName);} catch (Exception e) {log.error("处理文件时发生异常:{}", e.getMessage(), e);} finally {latch.countDown(); // 任务完成后计数减1}});}log.info("所有的任务提交完成");// 注册回调函数,在所有任务完成后执行CompletableFuture.runAsync(() -> {try {latch.await(); // 阻塞当前线程,直到所有任务完成stopWatch.stop();log.info("处理耗时:" + stopWatch.getTotalTimeSeconds() + "秒");log.info("所有文件处理完成,通知回调函数...");shutdown(); // 关闭线程池} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("主线程被中断:{}", e.getMessage(), e);}}, executorService);// 接口立即返回log.info("接口已返回,fun6异步多线程处理中...");}private static CompletableFuture<Void> processFilesAsync(String baseDir) {try {List<String> fileNames = getFileNames(baseDir);List<CompletableFuture<Void>> futures = fileNames.stream().map(fileName -> CompletableFuture.runAsync(() -> process(fileName), executorService)).collect(Collectors.toList());// 等待所有任务完成return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));} catch (IOException e) {return CompletableFuture.failedFuture(e);}}private static void processFiles(String baseDir) throws IOException {List<String> fileNames = getFileNames(baseDir);for (String fileName : fileNames) {process(fileName);}}@SneakyThrowspublic static void process(String fileName){String jsonDir = BASE_DIR + "\\" + fileName;String jsonString = new String(Files.readAllBytes(Paths.get(jsonDir)));OriginalObject originalObject = objectMapper.readValue(jsonString, OriginalObject.class);Thread.sleep(100);System.out.println(originalObject.getFdId());}public static void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();}}
}

总结一下

CompletableFuture是真好用,可以异步执行,可以接入线程池,还可以设置回调,可以很方便的管理异步逻辑。


http://www.ppmy.cn/news/1562765.html

相关文章

HarMonyOS 鸿蒙系统使用 Grid构建网格

网格布局是由“行”和“列”分割的单元格所组成&#xff0c;通过指定“项目”所在的单元格做出各种各样的布局。网格布局具有较强的页面均分能力&#xff0c;子组件占比控制能力&#xff0c;是一种重要自适应布局&#xff0c;其使用场景有九宫格图片展示、日历、计算器等。 Ar…

人工智能之数学基础:函数间隔和几何间隔

本文重点 在机器学习领域,尤其是支持向量机(SVM)算法中,函数间隔(Functional Margin)和几何间隔(Geometric Margin)是两个至关重要的概念。它们不仅用于描述数据点到超平面的距离,还直接影响到分类器的性能与泛化能力。本文将详细介绍这两个概念,并探讨它们之间的区…

3D机器视觉的类型、应用和未来趋势

3D相机正在推动机器视觉市场的增长。很多制造企业开始转向自动化3D料箱拣选&#xff0c;专注于使用3D视觉和人工智能等先进技术来简化操作并减少开支。 预计3D相机将在未来五年内推动全球机器视觉市场&#xff0c;这得益于移动机器人和机器人拣选的强劲增长。到 2028 年&#…

Cursor IDE是用什么语言开发出来的

极限&#xff1a;当x无限趋近于0时y的值无限趋近于1 极限&#xff1a;多边形无限趋近圆 Cursor IDE 是一款现代化的代码编辑器&#xff0c;基于 Visual Studio Code 的开源核心开发&#xff0c;因此它的主要开发语言和技术栈与 VS Code 非常相似。以下是 Cursor IDE 的核心开发…

道品科技智慧农业与云平台:未来农业的变革之路

随着全球人口的不断增长&#xff0c;农业面临着前所未有的挑战。如何在有限的土地和资源上提高农业生产效率&#xff0c;成为了各国政府和农业从业者亟待解决的问题。智慧农业的兴起&#xff0c;结合云平台的应用&#xff0c;为农业的可持续发展提供了新的解决方案。 ## 一、智…

电脑提示directx错误导致玩不了游戏怎么办?dx出错的解决方法

想必大家都有过这样的崩溃瞬间&#xff1a;满心欢喜打开心仪的游戏&#xff0c;准备在虚拟世界里大杀四方或者畅游冒险&#xff0c;结果屏幕上突然弹出个 DirectX 错误的提示框&#xff0c;紧接着游戏闪退&#xff0c;一切美好戛然而止。DirectX 作为 Windows 系统下游戏运行的…

全网首发:嵌入式交叉编译libssh,正确编译脚本

下载 Index of /files 编译zlib 嵌入式交叉编译&#xff1a;zlib_zlib嵌入式-CSDN博客 编译openssl 注意ssh需要指定的版本。比如我编译libssh-0.11.1&#xff0c;需要的版本是openssl-1.1.1。 交叉编译嵌入式openssl&#xff1a;关键是在config中指定编译器前缀_嵌入式编…

解决无法远程管理Windows Server服务器核心安装

问题 有时&#xff0c;人们会为了节省运算资源&#xff0c;例如运行Hyper-V虚拟机&#xff0c;而选择Windows Server核心安装&#xff0c;即无图形化界面。这时&#xff0c;我们就只能通过Powershell命令对其进行操控&#xff0c;或为了获得图形化界面而使用远程服务器管理工具…