java流式处理zip+多线程

server/2025/1/15 16:15:26/

概述

流式处理一个zip,zip里有多个json文件。
流式处理可以避免解压一个大的zip。再加上多线程,处理的效率杠杠的。

代码

java">package 多线程.demo05多jsonCountDownLatch;import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;@Slf4j
public class ZipProcessor {private Path path;private static int numThreads = Runtime.getRuntime().availableProcessors();private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);private static ObjectMapper objectMapper = new ObjectMapper();@SneakyThrowspublic ZipProcessor(String filePath){path = Paths.get(filePath);if (!Files.exists(path)) {throw new FileNotFoundException("The specified ZIP file does not exist: " + filePath);}}public void streamProcess(){StopWatch stopWatch = new StopWatch();stopWatch.start();// 使用 try-with-resources 保证资源关闭try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {ZipEntry entry;while ((entry = zis.getNextEntry()) != null) {// 将当前条目的数据读取到字节数组中byte[] byteArray = getByteArray(zis);process(byteArray, entry);// 关闭当前条目的输入流zis.closeEntry();}stopWatch.stop();log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());} catch (IOException e) {stopWatch.stop();log.error("zip处理异常,耗时:{}秒", stopWatch.getTotalTimeSeconds(), e);}}public void streamParallelProcess() {StopWatch stopWatch = new StopWatch();stopWatch.start();try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {ZipEntry entry;List<Future<?>> futures = new ArrayList<>();while ((entry = zis.getNextEntry()) != null) {// 为了lambda表达式捕获局部变量final ZipEntry currentEntry = entry;// 将当前条目的数据读取到字节数组中byte[] byteArray = getByteArray(zis);Future<?> future = executorService.submit(() -> process(byteArray, currentEntry));futures.add(future);}// 等待所有任务完成for (Future<?> future : futures) {try {future.get();} catch (Exception e) {log.error("任务执行失败", e);}}} catch (IOException e) {log.error("读取ZIP文件异常", e);} finally {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();}stopWatch.stop();log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());}}private void process(byte[] entryData, ZipEntry entry) {try {// 在这里处理每个条目的数据ObjectMapper objectMapper = new ObjectMapper();OriginalObject originalObject = objectMapper.readValue(entryData, OriginalObject.class);log.info("完成处理:{},sourceFileId:{}", entry.getName(), originalObject.getSourceFileId());} catch (IOException e) {log.error("处理条目 {} 异常", entry.getName(), e);}}private byte[] getByteArray(ZipInputStream zis) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();byte[] buffer = new byte[1024];int length;while ((length = zis.read(buffer)) > 0) {baos.write(buffer, 0, length);}return baos.toByteArray();}}

http://www.ppmy.cn/server/158600.html

相关文章

Python爬虫-爬取汽车之家全部汽车品牌的brandid(品牌ID)

前言 本文是该专栏的第42篇,后面会持续分享python爬虫干货知识,记得关注。 本文以汽车之家平台为例子,获取所有汽车品牌的“全部品牌ID”,即brandid数据。如下所示: 具体的实现思路以及完整实现代码逻辑,笔者将在正文进行详细介绍。废话不多说,跟着笔者直接往下看正文详…

Mycat读写分离搭建及配置超详细!!!

目录 一、Mycat产生背景二、Mycat介绍三、Mycat安装四、Mycat搭建读写分离1、 搭建MySQL数据库主从复制2、 基于mysql主从复制搭建MyCat读写分离 五、Mycat启动常见错误处理1、Caused by: io.mycat.config.util.ConfigException: SelfCheck### schema TESTDB refered by user u…

消息中间件类型介绍

消息中间件是一种在分布式系统中用于实现消息传递的软件架构模式。它能够在不同的系统或应用之间异步地传输数据&#xff0c;实现系统的解耦、提高系统的可扩展性和可靠性。以下是几种常见的消息中间件类型及其介绍&#xff1a; 1.RabbitMQ 特点&#xff1a; • 基于AMQP&#…

【深度学习】神经网络灾难性遗忘(Catastrophic Forgetting,CF)问题

文章目录 1. 什么是灾难性遗忘&#xff1f;2. 为什么会存在灾难性遗忘&#xff1f;2.1 网络权重的更新2.2 没有有效的记忆机制2.3 任务间数据分布差异 3. 目前解决方案3.1 弹性权重保持&#xff08;Elastic Weight Consolidation, EWC&#xff09;3.2 其他方法 1. 什么是灾难性…

【微服务】面试题 5、分布式系统理论:CAP 与 BASE 详解

分布式系统理论&#xff1a;CAP 与 BASE 详解 一、CAP 定理 背景与定义&#xff1a;1998 年由加州大学科学家埃里克布鲁尔提出&#xff0c;分布式系统存在一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;、分区容错性&#xff08;Part…

浅谈ArcGIS的地理处理(GP)服务之历史、现状和未来

ArcGIS的地理处理&#xff08;GP&#xff09;服务是由Esri开发的一套工具&#xff0c;旨在通过网络提供地理空间数据处理、分析和转换的能力。它的历史、现状和未来发展受到了GIS技术进步、用户需求变化以及云计算、大数据等技术变革的影响。以下将从历史、现状、未来三个方面详…

分布式组件底层逻辑是什么?

分布式组件是指在分布式系统中执行特定功能的模块&#xff0c;通常分布在多个物理节点上&#xff0c;共同协作完成任务。其底层逻辑包括多个方面&#xff0c;从通信和数据管理到一致性和容错设计&#xff0c;具体如下&#xff1a; 1.分布式组件的核心特点 分布性&#xff1a;功…

谷歌开放语音命令数据集,助力初学者踏入音频识别领域

在人工智能的浪潮中&#xff0c;语音识别技术正逐渐成为我们日常生活的一部分。从智能助手到语音控制设备&#xff0c;语音识别的应用场景越来越广泛。然而&#xff0c;对于初学者来说&#xff0c;进入这一领域往往面临诸多挑战&#xff0c;尤其是缺乏合适的开源数据集和简单的…