hive表小文件合并

news/2024/11/24 13:25:09/

1. 背景

公司的 hive 表中的数据是通过 flink sql 程序,从 kafka 读取,然后写入 hive 的,为了数据能够被及时可读,我设置了 flink sql 程序的 checkpoint 时间为 1 分钟,因此,在 hive 表对应的 hdfs 上,会每隔 1 分钟,生成一个小文件,每天生成 1440 个小文件,时间长了之后,就会造成 hdfs 的小文件过多的问题。为了解决这个问题,我编写了一个工具类,用来合并指定目录及其子目录下所有的文件。

2. 代码

TimeUtil 工具类

package reach.store.tools.common.utils;
import cn.hutool.core.date.LocalDateTimeUtil;
import java.time.format.DateTimeFormatter;public class TimeUtil {private static final DateTimeFormatter DATE_TIME_FORMATTER_DEFAULT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");/*** 获取当前时间字符串*/public static String now() {return LocalDateTimeUtil.now().format(DATE_TIME_FORMATTER_DEFAULT);}}

MergeHdfsFiles 主类

package reach.store.tools.hdfs;import cn.hutool.core.lang.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reach.store.tools.common.utils.TimeUtil;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 合并 hdfs 上同一个目录下的小文件*/
public class MergeHdfsFiles {private static final Logger LOGGER = LoggerFactory.getLogger(MergeHdfsFiles.class);/*** 将 main 函数入参转化为 map 表*/private static final Map<String, Object> ARGS_MAP = new HashMap<>();private static final String DIR_KEY = "dir";/*** 合并后文件名前缀对应的 key*/private static final String MERGE_FILE_NAME_PRE_KEY = "mergeFileNamePre";/*** 合并后文件名默认前缀*/private static final String MERGE_FILE_NAME_PRE_DEFAULT = "merge";/*** 合并后文件名前缀*/private static String MERGE_FILE_NAME_PRE = MERGE_FILE_NAME_PRE_DEFAULT;/*** 合并时被过滤文件最大大小对应的 key*/private static final String MAX_FILE_SIZE_KEY = "maxFileSize";/*** 合并时被过滤文件最大大小默认值*/private static final Long MAX_FILE_SIZE_DEFAULT = 50 * 1024 * 1024L;/*** 合并时被过滤文件最大大小*/private static Long MAX_FILE_SIZE = MAX_FILE_SIZE_DEFAULT;private static final Configuration CONFIGURATION = new Configuration();private static final FileSystem FILE_SYSTEM;static {try {FILE_SYSTEM = FileSystem.get(CONFIGURATION);} catch (IOException e) {throw new RuntimeException(e);}}/*** 所有需要合并小文件的目录*/private static final List<Path> DIRS_PATH = new ArrayList<>();/*** 必须先调用此函数,来处理入参,初始化 dirsPath map 表* 将输入的参数转化为 map 表,输入参数格式为 --key value ...*/private static void argsToMap(String[] args) {if (args.length == 0 || args.length % 2 != 0) {System.out.println(TimeUtil.now() + "   未输入必要的参数,或输入参数个数不是偶数,请检查输入参数");System.exit(1);} else {String key = null;for (int index = 0; index < args.length; index++) {if (index % 2 == 0) {// keykey = args[index].substring(2);} else {// valueARGS_MAP.put(key, args[index]);}}// 处理可选参数值if (ARGS_MAP.containsKey(MERGE_FILE_NAME_PRE_KEY)) {MERGE_FILE_NAME_PRE = String.valueOf(ARGS_MAP.get(MERGE_FILE_NAME_PRE_KEY));}if (ARGS_MAP.containsKey(MAX_FILE_SIZE_KEY)) {MAX_FILE_SIZE = Long.parseLong(String.valueOf(ARGS_MAP.get(MAX_FILE_SIZE_KEY))) * 1024 * 1024;}}}/*** 合并所有的文件,然后删除合并之前的文件** @param dirPath         合并后的文件目录* @param sourceFilePaths 被合并的所有文件 path*/private static void mergeFiles(Path dirPath, List<Path> sourceFilePaths) throws IOException {if (sourceFilePaths == null || sourceFilePaths.size() == 0) {return;}// 不合并以 . 开头,并且大小超过设定值的文件sourceFilePaths.removeIf(next -> {try {return next.getName().startsWith(".") || FILE_SYSTEM.getFileStatus(next).getLen() >= MAX_FILE_SIZE;} catch (IOException e) {throw new RuntimeException(e);}});if (sourceFilePaths.size() == 0) {return;}// 如果目录下需要合并的文件只有一个,则无需进行合并操作if (sourceFilePaths.size() == 1) {System.out.println(TimeUtil.now() + "   " + dirPath.toString() + "目录下只有一个文件符合合并条件,不进行合并操作。");return;}// 读写 parquet 文件MessageType schema = ParquetFileReader.readFooter(CONFIGURATION, sourceFilePaths.get(0), ParquetMetadataConverter.NO_FILTER).getFileMetaData().getSchema();GroupWriteSupport.setSchema(schema, CONFIGURATION);// 合并文件,合并后文件名,需要以 . 开头,防止被读取 hive 读取表操作读取到,最后通过重命名合并后文件名及删除所有原始文件的方式使合并后的文件可见。Path targetFilePath = new Path(dirPath + "/." + MERGE_FILE_NAME_PRE + "_" + UUID.randomUUID());ParquetWriter<Group> writer = new ParquetWriter<>(targetFilePath, new GroupWriteSupport(), CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, CONFIGURATION);for (Path filePath : sourceFilePaths) {ParquetReader<Group> parquetReader = ParquetReader.builder(new GroupReadSupport(), filePath).build();SimpleGroup group;while ((group = (SimpleGroup) parquetReader.read()) != null) {writer.write(group);}}writer.close();// 重命名合并后的文件,删除原来的所有文件Path newTargetFilePath = new Path(targetFilePath.getParent() + "/" + targetFilePath.getName().substring(1));FILE_SYSTEM.rename(targetFilePath, newTargetFilePath);System.out.println(TimeUtil.now() + "   被合并小文件的目录为:" + targetFilePath.getParent().toString() +",原始符合合并条件文件数量:" + sourceFilePaths.size() +",合并后文件名:" + newTargetFilePath.getName());for (Path path : sourceFilePaths) {FILE_SYSTEM.delete(path, false);}}/*** 处理单个目录下的文件和目录<br>* 如果有目录,则将目录加入 dirsPath 列表,下次循环时继续从列表中获取,然后继续处理目录<br>* 如果有文件,则将文件信息放入一个列表,之后对所有文件合并即可** @param dirPath 需要处理的目录 path 对象*/private static void dirHandle(Path dirPath, Configuration configuration, FileSystem fileSystem) throws IOException {List<Path> filesPath = new ArrayList<>();RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listLocatedStatus(dirPath);// 查找所有的文件及目录while (locatedFileStatusRemoteIterator.hasNext()) {LocatedFileStatus locatedFileStatus = locatedFileStatusRemoteIterator.next();if (locatedFileStatus.isFile()) {filesPath.add(locatedFileStatus.getPath());} else if (locatedFileStatus.isDirectory()) {DIRS_PATH.add(locatedFileStatus.getPath());}}mergeFiles(dirPath, filesPath);}/*** 开始进行指定目录下的小文件合并*/private static void start() throws Exception {String dir;if (ARGS_MAP.containsKey(DIR_KEY)) {dir = String.valueOf(ARGS_MAP.get(DIR_KEY));if (dir.endsWith("/")) {// 去掉最后的 /dir = dir.substring(0, dir.length() - 1);}} else {throw new RuntimeException("未指定要合并小文件的目录 dir 参数");}Configuration configuration = new Configuration();DIRS_PATH.add(new Path(dir));FileSystem fileSystem = FileSystem.get(configuration);while (DIRS_PATH.size() > 0) {Path dirPath = DIRS_PATH.get(0);dirHandle(dirPath, configuration, fileSystem);DIRS_PATH.remove(dirPath);}fileSystem.close();}/*** @param args dir:必选,需要合并小文件的目录,绝对路径,直接从 / 开始写即可。<br>*             mergeFileNamePre:可选,合并后的文件名前缀,默认为 merge <br>*             maxFileSize:可选,被合并小文件的最大大小,超过该值,则不参与合并,单位:M,默认:50M*/public static void main(String[] args) throws Exception {argsToMap(args);start();}}

程序执行逻辑:

  1. 去掉以 . 开头的文件。
  2. 去掉大小超过 50M 的文件,这个 50M 可以在程序启动时作为参数设置。
  3. 如果目录下只有一个文件,则不执行合并操作。

注意:该程序只能合并 parquet 格式的文件,并且采用 snappy 压缩。

logback.xml 文件,注意将该文件放到项目的 resources 资源目录下。在该文件的配置中,将日志的级别设置为 WARN,是为了减少后续程序运行时过多的 INFO 日志输出,影响程序日志的输出。

<?xml version="1.0" encoding="UTF-8"?>
<configuration><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d %p [%c] - %m%n</pattern></encoder></appender><root level="WARN"><appender-ref ref="console"/></root></configuration>

core-site.xml、hdfs-site.xml,这两个文件,大家从自己的集群下载下来放到项目的 resource 资源目录下即可。

pom.xml 文件内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>reach.store</groupId><artifactId>bigdata-tools</artifactId><version>1.0</version><packaging>pom</packaging><modules><module>tools-common</module><module>dolphinscheduler</module><module>tools-hdfs</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><hadoop.version>3.0.3</hadoop.version></properties><dependencies><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.8.2</version><scope>test</scope></dependency><!-- hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.9.0</version></dependency><!-- 其他 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.21</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version></dependency></dependencies><build><plugins><!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.2</version><configuration><skipTests>true</skipTests></configuration></plugin><!-- 打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin></plugins><resources><resource><!-- 要包含的资源文件目录,写相对路径,相对于项目的根路径 --><directory>src/main/resources</directory><includes><!-- 要包含的文件,相对于上面指定的目录 --><include>**</include></includes></resource></resources></build></project>

注意修改对应的依赖版本为自己集群的版本。

最后直接通过 idea 的 maven 窗口的 package 按钮进行打包即可。

3. 执行

主类:xxx.xxx.xxx.MergeHdfsFiles

启动:调用时,直接通过命令 java -cp 主类完全限定名 xxx.jar --key1 value1 --key2 value2 ... 执行即可。

入参说明

  1. dir:必选,需要合并小文件的目录,直接从根目录 / 开始即可,开头无需指定 hdfs 等信息,最后无需添加 /
  2. mergeFileNamePre:可选,合并后的文件名称前缀,默认为:merge,合并后的文件名会在该前缀后面添加一个 UUID。
  3. maxFileSize:可选,需要被合并的最大文件大小,单位:M,默认:50M。如果小文件大小超过该参数设置,则不参与小文件合并。

http://www.ppmy.cn/news/669556.html

相关文章

elasticsearch 明明有index但是查不出来

最近用python去query elastricsearch的data&#xff0c;但是我再kibana明明看到有&#xff0c;但是就是查不出来 因为涉及公司隐私&#xff0c;就不截图直接举例子了&#xff0c;我在 discover里面看到的是某条数据的index是 xxx-sss-a-b&#xff0c;但是我写query是xxx-sss-a-…

kali 安全/运维 开源教程2022

计划制定 简单概述 1 学哪个 学什么 怎么学 首先技术不是被贬值或者说是淘汰 而是每隔一段时间都会出现新的技术 旧技术不会沦落至消失 而是变成基础 或者被打包成一个集成环境 从汇编到c 再到现如今的各种开发语言 发明新技术是为了更好的提高旧环境的工作效率 而现如今pyt…

2022最新python100个实战练手项目,【附源码】,快来学习起来吧!

Python是目前最好的编程语言之一。由于其可读性和对初学者的友好性&#xff0c;已被广泛使用。那么要想学会并掌握Python&#xff0c;可以实战的练习项目是必不可少的。 接下来&#xff0c;我将给大家介绍20个非常实用的Python项目&#xff0c;帮助大家更好的学习Python。大家…

利用MSF打包加固APK对安卓手机进行控制

介绍 由于经典的MSF生成apk安装包控制&#xff0c;版本较低&#xff0c;高版本安卓手机可能会出现版本不支持或无法使用的现象&#xff0c;而且apk无法正常使用&#xff0c;没有图形化界面&#xff0c;比较假。 相比于原始的msf远控&#xff0c;此版本app在进行远控时&#xf…

PIAO网址PIAO

一、视频类 1. 预告片世界&#xff1a;预告片世界 - 最新电影预告片免费下载 2. 33台词&#xff1a;33台词 - 电影台词搜索引擎 3.MixKit&#xff1a;Free Stock Videos, Download Royalty Free Videos | Mixkit 4.Pexel&#xff1a;https://www.pexels.com/zh-cn/video/ …

素材网址大全

一、视频类 预告片世界&#xff1a;https://www.yugaopian.cn/33台词&#xff1a;http://33.agilestudio.cn/ 3.MixKit&#xff1a;https://mixkit.co/free-stock-video/ 4.Pexel&#xff1a;https://www.pexels.com/zh-cn/video/ 5.Videezy&#xff1a;https://www.videezy.c…

踩坑的Python爬虫:新手如何在一个月内学会爬取大规模数据?

Python爬虫为什么受欢迎 如果你仔细观察&#xff0c;就不难发现&#xff0c;懂爬虫、学习爬虫的人越来越多&#xff0c;一方面&#xff0c;互联网可以获取的数据越来越多&#xff0c;另一方面&#xff0c;像 Python这样的编程语言提供越来越多的优秀工具&#xff0c;让爬虫变得…

网络图片转base64

要在 JavaScript 中将网络图片转换为 Base64 编码&#xff0c;你可以使用 fetch API 来获取网络图片&#xff0c;并将其转换为 Blob 对象&#xff0c;然后使用 FileReader 来读取 Blob 对象并进行 Base64 编码转换。下面是一个示例代码&#xff1a; function imageUrlToBase64…