Sqoop源码修改:增加落地HDFS文件数与MapTask数量一致性检查

embedded/2025/2/5 10:48:45/

个人博客地址:Sqoop源码修改:增加落地HDFS文件数与MapTask数量一致性检查 | 一张假钞的真实世界

本篇是对记录一次Sqoop从MySQL导入数据到Hive问题的排查经过的补充。

Sqoop 命令通过 bin 下面的脚本调用,调用如下:

exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"

org.apache.sqoop.Sqoop 是 Sqoop 的入口类,在此主要是解析参数及初始化工具类,然后通过 org.apache.hadoop.util.ToolRunner 类调用对应的工具完成操作。Sqoop 的 Import 操作对应的是 org.apache.sqoop.tool.ImportTool 类。

在 ImportTool 类的 return 代码前增加以下代码:

int numMappers = options.getNumMappers();String hDbName = options.getHCatDatabaseName();
String hTableName = options.getHCatTableName();
String hPartKeys = options.getHCatalogPartitionKeys();
String hPartVals = options.getHCatalogPartitionValues();if(isStringNotEmpty(hDbName) && isStringNotEmpty(hTableName) && isStringNotEmpty(hPartKeys) &&     isStringNotEmpty(hPartVals)) {String[] partKeys = hPartKeys.split(",");String[] partVals = hPartVals.split(",");String partPathStr = "";if(partKeys.length > 0 && partVals.length == partKeys.length) {for(int i = 0; i < partKeys.length; i++) {partPathStr += partKeys[i] + "=" + partVals[i] + "/";}}String targetDir = "/user/hive/warehouse/" + hDbName + ".db/" + hTableName + "/" + partPathStr;targetDir = targetDir.toLowerCase();LOG.info("---------targetDir=" + targetDir);try {FileSystem fs = FileSystem.get(options.getConf());RemoteIterator<LocatedFileStatus> rIter = fs.listFiles(new Path(targetDir), false);int fileCount = 0;while(rIter.hasNext()) {fileCount++;rIter.next();}LOG.info("---------------fileCount=" + fileCount);if(numMappers != fileCount) {LOG.error("files number in hdfs not equals mapper task number !");return 2;}} catch (IOException e) {LOG.error("count files number from hdfs error !");e.printStackTrace();return 3;}
}

改动只针对 Sqoop 集成 HCatalog 方式导入 ORC 格式的情况。因为我们的数据仓库中都采用的是这种方式。

优化:当 MySQL 中记录数特别少时,如少于 4 条记录,则默认 Sqoop 的 MapTask 数量为 4 但其实际执行时因为原始记录数不够则实际执行的 MapTask 数量会跟实际的记录数一致,此时 split 数量跟落地 HDFS 的文件数量一致。所以,可以根据 Sqoop 对应 MR 的实际 split 数量进行判断文件数量。


http://www.ppmy.cn/embedded/159724.html

相关文章

PostgreSQL技术内幕24:定时任务调度插件pg_cron

文章目录 0.简介1.基础知识2.pg_cron安装使用方式2.1 安装pg_cron2.2 使用方式 3.实现原理3.1 启动过程3.2 任务添加和管理3.3 调度过程3.4 执行原理 0.简介 pg_cron是PostgreSQL中的一个简单的基于cron的任务调度插件&#xff0c;本文将从其基础知识&#xff08;Linux中Cron的…

数据库课程设计使用Java+JDBC+MySQL+Swing实现的会议预约管理系统源代码+数据库

编码&#xff1a; GBK 开发环境 jdk12MySQL8.0 效果图 用户端 管理员端 完整代码下载地址&#xff1a;会议预约管理系统源代码数据库

蓝桥杯例题七

&#x1f31f; "没有人能定义你的天际线&#xff0c;除了你自己。 别把光芒寄托在别人的灯盏里&#xff0c;你的火种就藏在胸膛深处——哪怕此刻只是微弱的星点&#xff0c;也足够燎尽荒原。所谓奇迹&#xff0c;不过是凡人用不肯低头的倔强&#xff0c;在命运岩层上凿出的…

Qt跨屏窗口的一个Bug及解决方案

如果我们希望一个窗口覆盖用户的整个桌面&#xff0c;此时就要考虑用户有多个屏幕的场景&#xff08;此窗口要横跨多个屏幕&#xff09;&#xff0c;由于每个屏幕的分辨率和缩放比例可能是不同的&#xff0c;Qt底层在为此窗口设置缩放比例&#xff08;DevicePixelRatio&#xf…

在Ubuntu上使用Docker部署DeepSeek

在Ubuntu上使用Docker部署DeepSeek&#xff0c;并确保其可以访问公网网址进行对话&#xff0c;可以按照以下步骤进行&#xff1a; 一、安装Docker 更新Ubuntu的软件包索引&#xff1a; sudo apt-get update安装必要的软件包&#xff0c;这些软件包允许apt通过HTTPS使用存储库…

笔记day6

文章目录 1 复习&#xff1a;2 动态开发面包屑中的分类名3 动态开发面包屑中的关键字4 排序操作4.1 问题&#xff1a;order属性的属性值最多有多少种写法4.2 问题&#xff1a;谁应该有类名4.3 问题&#xff1a;谁应该有箭头4.4 问题&#xff1a;箭头用什么制作 1 复习&#xff…

《苍穹外卖》项目学习记录-Day7缓存套餐

如果使用EHCache作为缓存实现&#xff0c;只需要导入 EHCache的maven坐标。 入门案例 如果使用Spring cache缓存数据&#xff0c;key的生成&#xff1a;userCache::xxx。#user.id中的.叫做对象导航。 因为id是自增的所以只输入age和name就可以了&#xff0c;在插入完之后它会把…

详解Kafka并行计算架构

引言 在高流量的复杂场景下&#xff0c;Kafka 凭借卓越的性能表现脱颖而出&#xff0c;始终维持着极高的吞吐率和高效的消息消费能力&#xff0c;在众多消息队列产品中独树一帜。其稳定且强大的性能&#xff0c;不仅保障了海量数据的快速处理&#xff0c;还为各类业务的高效运行…