Hudi第四章:集成Hive

news/2024/11/19 7:49:28/

系列文章目录

Hudi第一章:编译安装
Hudi第二章:集成Spark
Hudi第二章:集成Spark(二)
Hudi第三章:集成Flink
Hudi第四章:集成Hive


文章目录

  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.拷贝jar包
  • 二、Flink集成hive
    • 1.配置模版
    • 2.案例实操
  • 三、spark集成hive
    • 1.配置模版
    • 2.案例实操
  • 四、使用catalog
    • 1.环境配置
    • 2.案例实操
  • 总结


前言

本来关于Flink还有一些内容,但剩下的我了解过之后,觉得并不是很常用,而且 比较杂,所以还是决定需要的时候再学习吧。


一、环境准备

1.拷贝jar包

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/
cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

启动hive

二、Flink集成hive

1.配置模版

## hms mode 配置CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with('connector'='hudi','path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1','table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出'hive_sync.enable'='true',           -- required,开启hive同步功能'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名'hive_sync.mode' = 'hms',            -- required, 将hive sync mode设置为hms, 默认jdbc'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
);

2.案例实操

在flinksql客户端中执行。

CREATE TABLE t10(id int,num int,ts int,primary key (id) not enforced
)
PARTITIONED BY (num)
with('connector'='hudi','path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t10','table.type'='COPY_ON_WRITE', 'hive_sync.enable'='true', 'hive_sync.table'='h10', 'hive_sync.db'='default', 'hive_sync.mode' = 'hms','hive_sync.metastore.uris' = 'thrift://hadoop102:9083'
);

在这里插入图片描述
然后随便插入一条数据。

insert into t10 values(1,1,1); 

然后我们新开一个窗口,用客户端连接hive,也可以用其它可视化连接器。
bin/beeline -u jdbc:hive2://hadoop102:10000 -n atguigu
可以看到这里已经有一张表同步过来了
在这里插入图片描述
在这里插入图片描述

三、spark集成hive

1.配置模版

  option("hoodie.datasource.hive_sync.enable","true").                         //设置数据集注册并同步到hiveoption("hoodie.datasource.hive_sync.mode","hms").                         //使用hmsoption("hoodie.datasource.hive_sync.metastore.uris", "thrift://ip:9083"). //hivemetastore地址option("hoodie.datasource.hive_sync.username","").                          //登入hiveserver2的用户option("hoodie.datasource.hive_sync.password","").                      //登入hiveserver2的密码option("hoodie.datasource.hive_sync.database", "").                   //设置hudi与hive同步的数据库option("hoodie.datasource.hive_sync.table", "").                        //设置hudi与hive同步的表名option("hoodie.datasource.hive_sync.partition_fields", "").               //hive表同步的分区列option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). // 分区提取器 按/ 提取分区

2.案例实操

打开shell后输入以下内容

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGeneratorval inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)).withColumn("a",split(col("partitionpath"),"\\/")(0)).withColumn("b",split(col("partitionpath"),"\\/")(1)).withColumn("c",split(col("partitionpath"),"\\/")(2))
df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option("hoodie.table.name", tableName). option("hoodie.datasource.hive_sync.enable","true").option("hoodie.datasource.hive_sync.mode","hms").option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hadoop102:9083").option("hoodie.datasource.hive_sync.database", "default").option("hoodie.datasource.hive_sync.table", "spark_hudi").option("hoodie.datasource.hive_sync.partition_fields", "a,b,c").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode(Overwrite).save(basePath)

之后再次去hive查看
在这里插入图片描述

四、使用catalog

1.环境配置

需要的jar包。
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.13.6/flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar

下载之后需要修改,用压缩软件打开。
在这里插入图片描述
在这里插入图片描述
将其删除。
然后将整个jar包上传到flink的lib目录下。
在这里插入图片描述

2.案例实操

我们这里使用flink进行操作,由于flink是静态加载,所以我们需要从其session。
现在我们默认只有一个catalog
在这里插入图片描述
我们创建一个hive的catalog用于和hive连接。

CREATE CATALOG hive_catalogWITH ('type' = 'hive','default-database' = 'default','hive-conf-dir' = '/opt/module/hive/conf');use catalog hive_catalog;

在这里插入图片描述
可以看到此时我们的flink已经连接到hive了,这样就不用总是切换了。
在这里插入图片描述


总结

hudi的内容暂时就到这里,学习的过程中觉得hudi的技术还不是很成熟,很多地方的依赖需要自己修改,所以这次就不再进行太深层次的学习了。


http://www.ppmy.cn/news/1158493.html

相关文章

PyQt 问题记录

1.现成的组件不一定线程安全,(包括且不限于数据的修改竞争,和一些组件的崩溃 ) 对于PyQt 的线程使用,可能还需要更谨慎些 保存逻辑 QuestionBox("保存/Save")def Save(self):okFlagFalseerrFlagFalseWriteCmd{}for it in self.Mode…

webrtc gcc算法(1)

老的webrtc gcc算法,大概流程: 这两个拥塞控制算法分别是在发送端和接收端实现的, 接收端的拥塞控制算法所计算出的估计带宽, 会通过RTCP的remb反馈到发送端, 发送端综合两个控制算法的结果得到一个最终的发送码率,并以…

代理服务器没有响应,谷歌浏览器无法上网【搬代码】

代理服务器没有响应 •检查你的代理设置127.0.0.1:8888。 转到“工具”>“Internet 选项”>“连接”。如果你在 LAN 中,请单击“局域网设置”。 •确保你的防火墙设置没有阻止 Web 访问。 •向你的系统管理员求助。 IE浏览器-设置-internet选项 然后就可…

【FreeRTOS】【STM32】06.1 FreeRTOS的使用1(对06的补充)

前后台系统(裸机) 裸机又称前后台系统,在一个while中不停循环处理各个task。 中断服务函数作为前台程序 大循环while(1)作为后台程序 多任务系统 通过任务调度的方式,执行各个任务,优先级高的先执行,执行完了释放CPU使用权&am…

137.【SpringCloud-快速搭建】

微服务框架搭建 (一)、SpringCloud-Parent1.创建一个SpringBoot项目2.导入我们的依赖 (二)、SpringCloud-API (实体类)1.创建一个SpringBoot项目2.导入我们的依赖3.创建我们的实体类 (三)、SpringCloud-dept (业务A)1.创建一个SpringBoot项目2.导入我们的依赖3.配置我们的配置信…

linux系统编程之一

1)fcntl的使用方法 fcntl作用:可以用fcntl函数改变一个已打开的文件属性而不必重新打开文件; 堆排序是完全二叉树,但不是排序二叉树; 排序二叉树要求兄弟节点之间有大小关系,比如说左小右大; 堆排序仅要求…

GitHub验证的2FA

一、 起因: GitHub需要双重身份验证 (2FA) 是登录网站或应用时使用的额外保护层。启用 2FA 时,必须使用您的用户名和密码登录,并提供另一种只有您知道或可以访问的身份验证形式。 二、解决: 2.1 这里使用chrome的身份验证插件进…

GitHub仓库的README文件无法显示图片问题-非域名污染原因

之前上自己仓库就偶然发现图片不显示现象,当时以为是网络问题就没有留意这事。但是一直不显示就有问题了!于是网上搜了一遭,看见大家遇到此现象的原因普遍归于DNS污染1而我的问题原来是MarkDown格式! 在图片语法前不要加分区语法…