学习Sqoop前,先装好Hadoop、HIVE和MySQL
帅气的目录
- 1、Sqoop简介
- 2、安装
- 2.1、下载
- 2.2、环境变量
- 2.3、解压和移动
- 2.4、配置文件(此步可略)
- 2.5、复制 JDBC 驱动程序
- 2.6、MySQL连接测试
- 3、基本操作
- 4、ETL入门示例
- 4.1、数据准备
- 4.2、MySQL数据导入到HDFS
- 4.3、HDFS数据导出到MySQL
- 5、ETL进阶示例
- 5.1、数据准备
- 5.2、MySQL数据导入到HIVE
- 5.3、HIVE数据导出到MySQL
- 5.4、增量更新
- 5.5、Python2脚本部署
- 6、MySQL数据导入HIVE表分区
1、Sqoop简介
- 开源的数据传输工具
- 主用在Hadoop(HDFS)与传统的数据库(MySQL、Oracle…)之间
- 官网:http://sqoop.apache.org/
2、安装
2.1、下载
- 本文版本:
sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
- 该版教学地址:http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html
wget http://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
2.2、环境变量
https://blog.csdn.net/Yellow_python/article/details/112692486
2.3、解压和移动
tar -zxf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
mv sqoop-1.4.7.bin__hadoop-2.6.0 $SQOOP_HOME
2.4、配置文件(此步可略)
cd $SQOOP_HOME/conf
mv sqoop-env-template.sh sqoop-env.sh
vi sqoop-env.sh
2.5、复制 JDBC 驱动程序
cp $HIVE_HOME/lib/mysql-connector-java-*.*.*.jar $SQOOP_HOME/lib/
ll $SQOOP_HOME/lib/ | grep mysql-connector-java-
之前安装MySQL和HIVE时,已经拷贝了一个,因此本文从HIVE那里拿
2.6、MySQL连接测试
sqoop list-databases \
--connect jdbc:mysql://hadoop100:3306/ \
--username root \
--password 123456
基础参数 | 说明 |
---|---|
--connect | 连接 |
--username | 账户 |
--password | 密码 |
3、基本操作
sqoop help
sqoop help import
sqoop help export
4、ETL入门示例
4.1、数据准备
登录MySQL建2个表,其中一个插入数据,长相如下
mysql -uroot -p123456
DROP DATABASE IF EXISTS b1;
CREATE DATABASE b1;
CREATE TABLE IF NOT EXISTS b1.t1 (f1 INT(9) PRIMARY KEY,f2 CHAR(9));
INSERT INTO b1.t1 VALUES (1,'a'),(2,'b'),(3,null),(4,'b');
SELECT * FROM b1.t1;
CREATE TABLE IF NOT EXISTS b1.t2 (f1 INT(9) PRIMARY KEY,f2 CHAR(9));
4.2、MySQL数据导入到HDFS
sqoop import \
--connect jdbc:mysql://hadoop100:3306/b1 \
--username root --password 123456 \
--target-dir /b1/t1 \
--delete-target-dir \
--query 'SELECT * FROM t1 WHERE $CONDITIONS' \
--num-mappers 2 \
--split-by f1
常用参数 | 说明 |
---|---|
--target-dir | HDFS目标目录 |
--delete-target-dir | 删除目标目录(若存在) |
--append | 追加形式写入(不和--delete-target-dir 同用) |
--query | 查询语句,其中$CONDITIONS 是固定要加上去的 |
--num-mappers <n> | Map任务并行数,默认4,建议设成1 |
--split-by <column-name> | Map并行数≠1时,要设 按什么列来横切表,然后分配给Map |
结果查看
hadoop fs -ls /b1/t1
4.3、HDFS数据导出到MySQL
sqoop export \
--connect jdbc:mysql://hadoop100:3306/b1 \
--username root --password 123456 \
--num-mappers 2 \
--export-dir /b1/t1 \
--table t2
常用描述 | 描述 |
---|---|
--export-dir <dir> | HDFS导出的源路径 |
--table <table-name> | 目的地MySQL表名 |
导出MySQL结果
5、ETL进阶示例
同步策略 | 说明 | 数据特点 | 示例 |
---|---|---|---|
全量同步 | 覆盖写入 | 量小 | 商品表、品牌表 |
增量同步 | 追加写入 | 量大、增量、不改 | 支付表、退款表 |
增量变化同步 | 追加和修改 | 量大、增量、会改 | 订单流水表、优惠券流水表 |
特殊同步 | 覆盖写入 (很久才一次) | 数据不常变 | 日期表、地区表 鞋子尺码表 |
5.1、数据准备
1、MySQL建表
mysql -uroot -p123456
DROP DATABASE IF EXISTS b2;
CREATE DATABASE b2;
CREATE TABLE IF NOT EXISTS b2.t1
(f1 INT(9) PRIMARY KEY,f2 CHAR(9),f3 DATE,f4 INT(9));
INSERT INTO b2.t1 VALUES (1,'a','2020-1-1',NULL);
INSERT INTO b2.t1 VALUES (2,'b','2020-1-1',NULL);
INSERT INTO b2.t1 VALUES (3,'a','2020-1-2',5);
INSERT INTO b2.t1 VALUES (4,'b','2020-1-2',NULL);
SELECT * FROM b2.t1;
CREATE TABLE IF NOT EXISTS b2.t2
(f1 INT(9) PRIMARY KEY,f2 CHAR(9),f3 DATE);
2、HIVE建表,设分区
DROP DATABASE IF EXISTS b2 CASCADE; --删库
CREATE DATABASE b2 LOCATION '/b2'; --建库
CREATE TABLE b2.t1 (f1 INT ,f2 STRING,f4 INT)
PARTITIONED BY (f3 DATE) --按日期分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' --行格式
LOCATION '/b2/t1';
5.2、MySQL数据导入到HIVE
参数 | 说明 |
---|---|
--null-string <null-string> | The string to be written for a null value for string columns |
--null-non-string <null-string> | The string to be written for a null value for non-string columns |
--fields-terminated-by <char> | 字段分隔符,默认, |
--lines-terminated-by <char> | 行分隔符,默认\n |
1、按日期导入HDFS
sqoop import \
--connect jdbc:mysql://hadoop100:3306/b2 \
--username root --password 123456 \
--delete-target-dir --target-dir /h1/b2/t1/2020-1-1 \
--query "SELECT f1,f2,f4 FROM t1 WHERE f3='2020-1-1' AND \$CONDITIONS" \
--fields-terminated-by '\t' --lines-terminated-by '\n' \
--null-string '\\N' --null-non-string '\\N' \
--num-mappers 1
2、HDFS导进HIVE
LOAD DATA -- 数据导入
INPATH '/h1/b2/t1/2020-1-1' --数据来源
OVERWRITE --覆盖写入
INTO TABLE b2.t1 --数据终点
PARTITION(f3='2020-1-1') --分区
;
LOAD DATA
前
LOAD DATA
后
3、查询数据
SELECT * FROM b2.t1;
5.3、HIVE数据导出到MySQL
参数 | 说明 |
---|---|
--input-null-string <null-string> | The string to be interpreted as null for string columns |
--input-null-non-string <null-string> | The string to be interpreted as null for non-string columns |
--update-mode <mode> | 更新模式,包括:updateonly (默认)和allowinsert |
--update-key <col-name> | allowinsert 模式下,作为更新依据的列名若有多列就用逗号分隔 |
--columns <col,col,col…> | 指定导出的列 |
--input-fields-terminated-by <char> | 字段分隔符,默认, |
--input-lines-terminated-by <char> | 行分隔符,默认\n |
sqoop export \
--connect jdbc:mysql://hadoop100:3306/b2 \
--username root --password 123456 \
--table t2 \
--num-mappers 1 \
--input-fields-terminated-by '\t' \
--input-null-string '\\N' --input-null-non-string '\\N' \
--update-mode allowinsert \
--update-key f1 \
--columns f1,f2 \
--export-dir '/b2/t1/f3=2020-01-01'
查看MySQL:
SELECT * FROM b2.t2;
5.4、增量更新
1、按日期导入HDFS
sqoop import \
--connect jdbc:mysql://hadoop100:3306/b2 \
--username root --password 123456 \
--delete-target-dir --target-dir /h1/b2/t1/2020-1-2 \
--query "SELECT f1,f2,f4 FROM t1 WHERE f3='2020-1-2' AND \$CONDITIONS" \
--fields-terminated-by '\t' --lines-terminated-by '\n' \
--input-null-string '\\N' --null-non-string '\\N' \
--num-mappers 1
2、HDFS导进HIVE
LOAD DATA -- 数据导入
INPATH '/h1/b2/t1/2020-1-2' --数据来源
OVERWRITE --覆盖写入
INTO TABLE b2.t1 --数据终点
PARTITION(f3='2020-1-2') --分区
;
3、查询数据
SELECT * FROM b2.t1;
4、HIVE数据导出到MySQL,注意不要缺少0
sqoop export \
--connect jdbc:mysql://hadoop100:3306/b2 \
--username root --password 123456 \
--table t2 \
--num-mappers 1 \
--input-fields-terminated-by '\t' \
--input-null-string '\\N' --input-null-non-string '\\N' \
--update-mode allowinsert \
--update-key f1 \
--columns f1,f2 \
--export-dir /b2/t1/f3=2020-01-02
5、查看MySQL:
SELECT * FROM b2.t2;
5.5、Python2脚本部署
touch etl.py
chmod 777 etl.py
vim etl.py
#!/usr/bin/python
import sys, datetime
from subprocess import check_outputa = sys.argv
# print ayesterday = datetime.date.today() - datetime.timedelta(days=1)
ymd = a[1] if len(a) == 2 else yesterday.strftime('%Y-%m-%d')
# print ymdsqoop_import = r'''
sqoop import
--connect jdbc:mysql://hadoop100:3306/b2
--username root --password 123456
--delete-target-dir --target-dir /h1/b2/t1/%s
--query "SELECT f1,f2,f4 FROM t1 WHERE f3='%s' AND \$CONDITIONS"
--fields-terminated-by '\t' --lines-terminated-by '\n'
--null-string '\\N' --null-non-string '\\N'
--num-mappers 1
''' % (ymd, ymd)
# print sqoop_importhive = r'''
hive -e "LOAD DATA INPATH '/h1/b2/t1/%s'
OVERWRITE INTO TABLE b2.t1 PARTITION(f3='%s');"
''' % (ymd, ymd)
# print hivesqoop_export = r'''
sqoop export
--connect jdbc:mysql://hadoop100:3306/b2
--username root --password 123456
--table t2
--num-mappers 1
--input-fields-terminated-by '\t' --input-lines-terminated-by '\n'
--input-null-string '\\N' --input-null-non-string '\\N'
--update-mode allowinsert
--update-key f1
--columns f1,f2
--export-dir /b2/t1/f3=%s
''' % ymd
# print sqoop_exportdef sh(cmd):return check_output(' '.join(cmd.split()), shell=True)sh(sqoop_import)
sh(hive)
sh(sqoop_export)
执行脚本,注意不要缺少0
./etl.py
./etl.py 2020-01-01
./etl.py 2020-01-02
./etl.py 2020-01-03
6、MySQL数据导入HIVE表分区
sqoop import \
--hive-import \
--hive-overwrite \
--null-string '\\N' --null-non-string '\\N' \
--num-mappers 1 \
--hive-drop-import-delims \
--connect jdbc:mysql://主机:端口号/库名 \
--username MySQL用户 \
--password MySQL密码 \
--hive-database HIVE库名 \
--hive-table HIVE表名 \
--hive-partition-key 分区名 \
--hive-partition-value 分区值 \
--query "SELECT 字段 FROM 表 WHERE 日期=分区值 AND \$CONDITIONS" \
--target-dir HDFS临时路径 \
--delete-target-dir
--compress
Argument | Description | Comment |
---|---|---|
--hive-home <dir> | 覆写$HIVE_HOME | |
--hive-import | 必要 | |
--hive-overwrite | 覆盖已存在的数据 | 常用 |
--hive-table <table-name> | 写到哪个HIVE表 | 必要 |
--hive-drop-import-delims | 丢弃字符串中的\n 、\r 、\01 | 建议加上,并且HIVE表使用默认分隔符\01 |
--hive-delims-replacement | 替代字符串中的\n 、\r 、\01 | |
--hive-partition-key | 分区字段名 | 常用 |
--hive-partition-value <v> | 分区值 | 常用 |
--compress | 启用压缩 | 常用 |
--compression-codec <c> | 使用Hadoop压缩编码解码器 | 默认gzip |
--target-dir | HDFS临时路径 | 建议/temp/sqoop/{hive_db}/{hvie_tb} |
--delete-target-dir | 删除HDFS临时路径(若存在) | 必要 |
--target-dir
过程,我截到了3个阶段:
1、创建HDFS路径并写入数据
2、数据写完,准备导入到HIVE
3、导入到HIVE后,删除HDFS路径
[yellow@hadoop102 ~]$ hadoop fs -ls HDFS临时路径
Found 1 items
drwxr-xr-x - yellow supergroup 0 2022-04-06 20:59 HDFS临时路径/_temporary[yellow@hadoop102 ~]$ hadoop fs -ls HDFS临时路径
Found 2 items
-rw-r--r-- 2 yellow supergroup 0 2022-04-06 20:59 HDFS临时路径/_SUCCESS
-rw-r--r-- 2 yellow supergroup 122 2022-04-06 20:59 HDFS临时路径/part-m-00000.gz[yellow@hadoop102 ~]$ hadoop fs -ls HDFS临时路径
ls: `HDFS临时路径': No such file or directory
若出现报错
ERROR hive.HiveConfig: Could not load org.apache.hadoop.hive.conf.HiveConf.
Make sure HIVE_CONF_DIR is set correctly.ERROR tool.ImportTool: Import failed: java.io.IOException:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf
报错解决
cp $HIVE_HOME/lib/hive-common-3.1.2.jar $SQOOP_HOME/lib/