29、Spark写数据到Hudi时,同步hive表的一些坑

devtools/2025/1/15 15:24:19/

hivecomment_0">1.hudi的同步hive表没有comment

原以为hudi同步的hive表是根据数据写入的dataframe的schema创建的。就和spark write hive时类似,查看源码后发现不是。

hive_2">1.1 hudi同步hive的模式

HMS , JDBC , HIVESQL。我这儿常用的是HMS和JDBC
在这里插入图片描述
各个同步模式对应的执行器:
在这里插入图片描述

1.2 schema生成

我们可以看到schema生成的代码块。先从提交的commit中获取元数据信息,没有的话则从数据文件中获取schema。两种方式获取到的schema都是没有comment信息的。
org.apache.hudi.common.table.TableSchemaResolver#getTableParquetSchema
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  /*** Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest* commit. We will assume that the schema has not changed within a single atomic write.** @return Parquet schema for this table* @throws Exception*/private MessageType getTableParquetSchemaFromDataFile() throws Exception {HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();try {switch (metaClient.getTableType()) {case COPY_ON_WRITE:// If this is COW, get the last commit and read the schema from a file written in the// last commitHoodieInstant lastCommit =activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"+ commitMetadata));return readSchemaFromBaseFile(new Path(filePath));case MERGE_ON_READ:// If this is MOR, depending on whether the latest commit is a delta commit or// compaction commit// Get a datafile written and get the schema from that fileOption<HoodieInstant> lastCompactionCommit =metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();LOG.info("Found the last compaction commit as " + lastCompactionCommit);Option<HoodieInstant> lastDeltaCommit;if (lastCompactionCommit.isPresent()) {lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();} else {lastDeltaCommit =metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();}LOG.info("Found the last delta commit " + lastDeltaCommit);if (lastDeltaCommit.isPresent()) {HoodieInstant lastDeltaInstant = lastDeltaCommit.get();// read from the log file wrotecommitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),HoodieCommitMetadata.class);Pair<String, HoodieFileFormat> filePathWithFormat =commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny().map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {// No Log files in Delta-Commit. Check if there are any parquet filesreturn commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension()))).findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->new IllegalArgumentException("Could not find any data file written for commit "+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()+ ", CommitMetadata :" + commitMetadata));});switch (filePathWithFormat.getRight()) {case HOODIE_LOG:return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));case PARQUET:return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));default:throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()+ " for file " + filePathWithFormat.getLeft());}} else {return readSchemaFromLastCompaction(lastCompactionCommit);}default:LOG.error("Unknown table type " + metaClient.getTableType());throw new InvalidTableException(metaClient.getBasePath());}} catch (IOException e) {throw new HoodieException("Failed to read data schema", e);}}

1.3建表DDL

获取到schema后,我们再看建表行为。
org.apache.hudi.hive.ddl.DDLExecutor#createTable 定义了这个接口建表方法。有两个实现类,一个是
org.apache.hudi.hive.ddl.HMSDDLExecutor。另一个是 org.apache.hudi.hive.ddl.QueryBasedDDLExecutor
在这里插入图片描述在这里插入图片描述
首先,看org.apache.hudi.hive.ddl.HMSDDLExecutor#createTable方法:
ddl操作中使用的字段信息在HiveSchemaUtil.convertMapSchemaToHiveFieldSchema生成,可以直接在这个方法里看到字段的comment信息是直接写死为空字符串的。
在这里插入图片描述
在这里插入图片描述
再看,org.apache.hudi.hive.ddl.QueryBasedDDLExecutor#createTable方法。
方法里是通过HiveSchemaUtil.generateCreateDDL方法直接生成的ddl建表语句的。这个方法里generateSchemaString方法来生成字段信息的。在这个方法里,也是没有涉及comment信息的。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1.4结论

同步hive表是在 数据写入hudi目录后,根据目录里的schema来创建的hive表,所以创建的hive表没有带着dataframe的comment信息。需要手动执行修改字段comment。

2.追加comment

sparksqlcomment_113">2.1.使用spark.sql的方式修改comment

spark.sql()的方式执行 修改comment的sql语句,会调用hudi里的AlterHoodieTableChangeColumnCommand类。这个里面会比较schema,刷新sparksession里的catalog信息,会让任务hang住。(为什么hang住没去排查)大概操作就是写一个使用新的schema的空数据集到hudi来实现schema更新。
org.apache.spark.sql.hudi.command.AlterHoodieTableChangeColumnCommand。
在这里插入图片描述
在这里插入图片描述

hivesqlcomment_118">2.2使用hive-sql的方式修改comment

hive-jdbc的方式执行修改sql语句。这个方式不会更新hive表里的 TBLPROPERTIES 的 'spark.sql.sources.schema.part.0’信息。
使用dataframe的schame.tojson ,去修改 ‘spark.sql.sources.schema.part.0’ 信息

  /*** 将 dataframe 中的comment加到 hudi的hive表中** @param df      dataframe* @param dbTable hive表* @param spark   spark session*/def addCommentForSyncHive(df: DataFrame, dbTable: String, spark: SparkSession, writeOptions: mutable.Map[String, String]): Unit = {val comment: Map[String, String] = df.schema.map(sf => (sf.name, sf.getComment().getOrElse(""))).toMapinfo(s"数据集的字段名->备注为:\n${comment.mkString("\n")}")val jdbcUrlOption = writeOptions.get(DataSourceWriteOptions.HIVE_URL.key())val jdbcUserOption = writeOptions.get(DataSourceWriteOptions.HIVE_USER.key())val jdbcPassOption = writeOptions.get(DataSourceWriteOptions.HIVE_PASS.key())assert(jdbcUrlOption.isDefined, s"${DataSourceWriteOptions.HIVE_URL.key()} 必须被指定")val connection = DbUtil.createHiveConnection(jdbcUrlOption.get, jdbcUserOption.getOrElse(""), jdbcPassOption.getOrElse(""))val stmt = connection.createStatement()//需要手动更新hive表中的spark.sql.sources.schema.part.0信息stmt.execute(s"ALTER TABLE $dbTable SET TBLPROPERTIES ('spark.sql.sources.schema.part.0' = '${df.schema.json}')")// 获取表字段和类型val tableSchema = spark.sql(s"DESCRIBE $dbTable").select("col_name", "data_type").collect().map(row => (row.getString(0), row.getString(1)))tableSchema.foreach { case (column, dataType) =>if (comment.contains(column) && !Seq("ym", "ymd").contains(column)) {val newComment = comment.getOrElse(column, "")val sql = s"""ALTER TABLE $dbTable CHANGE COLUMN $column $column $dataType COMMENT '$newComment'"""info(s"添加备注执行sql:$sql")try {stmt.execute(sql)} catch {case e:Throwable =>warn("添加备注sql执行失败")}}}stmt.close()connection.close()}

修改’spark.sql.sources.schema.part.0’时,因为schema带有备注,会很长,导致超过hive表元数据mysql表字段的长度限制。去mysql中修改这个长度限制(table_params表PARAM_VALUE字段)。
在这里插入图片描述


http://www.ppmy.cn/devtools/150337.html

相关文章

深入Android架构(从线程到AIDL)_27 Messager框架与IMessager接口03

目录 3、 双向沟通的Messenger框架 基本設計原則 4、 IMessenger接口 使用AIDL 3、 双向沟通的Messenger框架 这个Messenger框架是对Binder框架加以扩充而来的。 在双向沟通上&#xff0c;也继承了Binder框架机制。Binder框架双向沟通的应用情境是&#xff1a;当myActivit…

求矩阵不靠边元素之和(PTA)C语言

求矩阵的所有不靠边元素之和&#xff0c;矩阵行的值m从键盘读入(2<m<10)&#xff0c;调用自定义函数Input实现矩阵元素从键盘输入&#xff0c;调用Sum函数实现求和。(只考虑float型&#xff0c;且不需考虑求和的结果可能超出float型能表示的范围)。 函数接口定义&#x…

R语言的数据库编程

R语言的数据库编程 引言 在当今大数据时代&#xff0c;数据分析已成为推动各行业发展的重要力量。R语言&#xff0c;作为一种专为统计分析和数据挖掘而设计的编程语言&#xff0c;逐渐成为数据科学家和分析师的首选工具。然而&#xff0c;仅仅使用R语言进行数据分析往往无法满…

将node节点加入k8s集群

1、k8s master集群安装完成之后即可以开始将node节点加入到集群 2、首先要进行基础环境的配置&#xff0c;包括关闭防火墙、关闭selinux&#xff0c;关闭swap分区&#xff0c;这都是基础操作&#xff0c;不在粘贴代码。 3、进行yum源的配置&#xff0c;这里最简单方法是把mas…

Python 扫描枪读取发票数据导入Excel

财务需要一个扫描枪扫描发票文件&#xff0c;并将主要信息录入Excel 的功能。 文件中sheet表的列名称&#xff0c;依次为&#xff1a;发票编号、发票编码、日期、金额、工号、扫描日期。 扫描的时候&#xff0c;Excel 文件需要关闭&#xff0c;否则会报错。 import openpyxl …

计算机网络之---公钥基础设施(PKI)

公钥基础设施 公钥基础设施&#xff08;PKI&#xff0c;Public Key Infrastructure&#xff09; 是一种用于管理公钥加密的系统架构&#xff0c;它通过结合硬件、软件、策略和标准来确保数字通信的安全性。PKI 提供了必要的框架&#xff0c;用于管理密钥对&#xff08;包括公钥…

基于 Selenium 实现上海大学校园网自动登录

基于 Selenium 实现上海大学校园网自动登录 一、技术方案 核心工具&#xff1a; Selenium&#xff1a;一个用于自动化测试的工具&#xff0c;能够模拟用户在浏览器上的操作。Edge WebDriver&#xff1a;用于控制 Edge 浏览器的驱动程序。 功能设计&#xff1a; 检测网络状…

MVC执行流程

&#xff08;1&#xff09;用户通过浏览器&#xff08;客户端&#xff09;向服务端&#xff08;后端&#xff09;发送请求&#xff0c;请求会被前端控制器DispatcherServlet拦截。 &#xff08;2&#xff09;DispatcherServlet拦截到请求后&#xff0c;会调用处理器映射器&…