什么是Sqoop
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
Sqoop的安装以及设置
- Sqoop安装包 提取码:m18x
- 安装步骤
- 下载并解压
1.上传安装包 sqoop-1.4.6-cdh5.14.2.tar.gz 到虚拟机中
2.解压 sqoop 安装包到指定目录tar -zxf sqoop-1.4.6-cdh5.14.2.tar.gz -C /opt/install/
,
3.修改文件名
4.配置环境变量
添加如下内容:
exportSQOOP_HOME=/opt/install/sqoop exportPATH=$SQOOP_HOME/bin:$PATH
让配置文件生效。 source/etc/profile
- 修改配置文件
Sqoop 的配置文件与大多数大数据框架类似,在 sqoop 根目录下的 conf 目录
中。
1.重命名配置文件mvsqoop-env-template.shsqoop-env.sh
2.修改配置文件
sqoop-env.sh exportHADOOP_COMMON_HOME=/opt/install/hadoop exportHADOOP_MAPRED_HOME=/opt/install/hadoop exportHIVE_HOME=/opt/install/hive exportZOOKEEPER_HOME=/opt/install/zookeeper exportZOOCFGDIR=/opt/install/zookeeper exportHBASE_HOME=/opt/install/hbase
- 拷贝 JDBC 驱动
mysql-connector-java-5.1.27-bin.jar
hive-exec-1.1.0-cdh5.14.2.jar
java-json.jar
hive-common-1.1.0-cdh5.14.2.jar
拷贝 jdbc 驱动到 sqoop 的 lib 目录下,驱动包见其他资料。jar包 提取码:loqp
cp mysql-connector-java-5.1.27-bin.jar /opt/install/sqoop/lib/
1.4 验证 Sqoop
我们可以通过某一个 command 来验证 sqoop 配置是否正确: sqoop help 出现一些 Warning 警告(警告信息已省略),并伴随着帮助命令的输出:
Availablecommands: codegen Generatecodetointeractwithdatabaserecords create-hive-table ImportatabledefinitionintoHive eval EvaluateaSQLstatementanddisplaytheresults export ExportanHDFSdirectorytoadatabasetable help Listavailablecommands import ImportatablefromadatabasetoHDFS import-all-tables ImporttablesfromadatabasetoHDFS import-mainframe ImportdatasetsfromamainframeservertoHDFS job Workwithsavedjobslist-databases Listavailabledatabasesonaserver list-tables Listavailabletablesinadatabase merge Mergeresultsofincrementalimports metastore RunastandaloneSqoopmetastore version Displayversioninformation
See'sqoophelpCOMMAND'forinformationonaspecificcommand.
Sqoop的使用
- 准备工作
mysql中建库建表
mysql> create database retail_db;
mysql> use retail_db;
mysql> source /root/data/sqoop/retail_db.sql
mysql> show tables;
±--------------------+
| Tables_in_retail_db |
±--------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
±--------------------+
6 rows in set (0.00 sec)
2. 使用sqoop将customers表导入到hdfs上sqoop import --connect jdbc:mysql://localhost:3306/retail_db --driver com.mysql.jdbc.Driver --table customers --username root --password root --target-dir /data/retail_db/customers --m 3
- 使用where过滤
sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--table orders \
--where "order_id<100" \
--username root \
--password root \
--delete-target-dir \
--target-dir /data/retail_db/orders \
--m 3
4.使用columns过滤
sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--table customers \
--columns "customer_id,customer_fname,customer_lname" \
--username root \
--password root \
--delete-target-dir \
--target-dir /data/retail_db/customers \
--m 3
5.使用查询语句进行过滤
sqoop import \
--connect jdbc:mysql://localhost:3306/sqoop \
--driver com.mysql.jdbc.Driver \
--query "select * from orders where order_status!='CLOSED' and \$CONDITIONS" \
--username root \
--password ok \
--split-by order_id \
--delete-target-dir \
--target-dir /data1/retail_db/orders \
--m 3
6.增量导入
sqoop import \
--connect jdbc:mysql://localhost:3306/sqoop \
--driver com.mysql.jdbc.Driver \
--table orders \
--username root \
--password ok \
--incremental append \
--check-column order_date \
--last_value '0' \
--target-dir /data1/sqoop1/orders \
--m 3SELECT MIN(order_id), MAX(order_id) FROM orders WHERE ( order_date > '2013-07-24 00:00:00' AND order_date <= '2014-07-24 00:00:00.0' )
insert into orders values(99999,'2015-05-30 00:00:00',11599,'CLOSED');sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--table orders \
--username root \
--password root \
--incremental append \
--check-column order_date \
--last-value '0'
--target-dir /data/retail_db/orders \
--m 3SELECT MIN(order_id), MAX(order_id) FROM orders WHERE ( order_date > '2014-07-24 00:00:00.0' AND order_date <= '2015-05-30 00:00:00.0' )
- 创建job
sqoop job --create mysql2hdfs \
-- import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--table orders \
--username root \
--password root \
--incremental append \
--check-column order_date \
--last-value '0' \
--target-dir /data/retail_db/orders \
--m 3
- 查看job
sqoop job --list
- 执行job
sqoop job --exec mysql2hdfsinsert into orders values(999999,'2016-05-30 00:00:00',11599,'CLOSED');SELECT MIN(order_id), MAX(order_id) FROM orders WHERE ( order_date > '2015-05-30 00:00:00.0' AND order_date <= '2016-05-30 00:00:00.0' )每次job执行成功之后都会修改 --last-value 值 将最后一次的最大值填充进去
这里的 '0' 没有实际含义,只是为了保证第一次数据导入时值最小每天可以定时执行
crontab -e
* 2 */1 * * sqoop job --exec mysql2hdfs
- 导入数据到Hive中
先在Hive中创建表
hive -e "create database if not exists retail_db;"sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--table orders \
--username root \
--password root \
--hive-import \
--create-hive-table \
--hive-database retail_db \
--hive-table orders \
--m 3报错,输出路径已存在
20/07/15 22:29:04 ERROR tool.ImportTool: Import failed: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop1:9000/user/root/orders already existsat org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:203)at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:176)at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:273)at org.apache.sqoop.manager.SqlManager.importTable(SqlManager.java:692)at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:513)at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:621)at org.apache.sqoop.Sqoop.run(Sqoop.java:147)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)at org.apache.sqoop.Sqoop.main(Sqoop.java:252)
删除已存在的目录
[root@hadoop1 lib]# hdfs dfs -rmr hdfs://hadoop1:9000/user/root/orders又报错
20/07/15 22:26:56 ERROR tool.ImportTool: Import failed: java.io.IOException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConfat org.apache.sqoop.hive.HiveConfig.getHiveConf(HiveConfig.java:50)at org.apache.sqoop.hive.HiveImport.getHiveArgs(HiveImport.java:392)at org.apache.sqoop.hive.HiveImport.executeExternalHiveScript(HiveImport.java:379)at org.apache.sqoop.hive.HiveImport.executeScript(HiveImport.java:337)at org.apache.sqoop.hive.HiveImport.importTable(HiveImport.java:241)at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:530)at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:621)at org.apache.sqoop.Sqoop.run(Sqoop.java:147)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)at org.apache.sqoop.Sqoop.main(Sqoop.java:252)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf缺少hive-common-1.1.0-cdh5.14.2.jar,所以从Hive的lib中进行拷贝
cp /opt/install/hive-1.1.0-cdh5.14.2/lib/hive-common-1.1.0-cdh5.14.2.jar /opt/install/sqoop-1.4.6-cdh5.14.2/lib/再执行又报错
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/shims/ShimLoaderat org.apache.hadoop.hive.conf.HiveConf$ConfVars.<clinit>(HiveConf.java:370)at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:108)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:264)at org.apache.sqoop.hive.HiveConfig.getHiveConf(HiveConfig.java:44)at org.apache.sqoop.hive.HiveImport.getHiveArgs(HiveImport.java:392)at org.apache.sqoop.hive.HiveImport.executeExternalHiveScript(HiveImport.java:379)at org.apache.sqoop.hive.HiveImport.executeScript(HiveImport.java:337)at org.apache.sqoop.hive.HiveImport.importTable(HiveImport.java:241)at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:530)at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:621)at org.apache.sqoop.Sqoop.run(Sqoop.java:147)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)at org.apache.sqoop.Sqoop.main(Sqoop.java:252)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.shims.ShimLoader缺少jar包,需要从Hive中进行拷贝
cp /opt/install/hive-1.1.0-cdh5.14.2/lib/hive-shims* /opt/install/sqoop-1.4.6-cdh5.14.2/lib/
- 导入数据到Hive分区中
删除Hive表
drop table if exists orders;
导入
sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--query "select order_id,order_status from orders where order_date>='2013-11-03' and order_date <'2013-11-04' and \$CONDITIONS" \
--username root \
--password root \
--delete-target-dir \
--target-dir /data/retail_db/orders \
--split-by order_id \
--hive-import \
--hive-database retail_db \
--hive-table orders \
--hive-partition-key "order_date" \
--hive-partition-value "2013-11-03" \
--m 3注意:分区字段不能当成普通字段导入表中
- 导入数据到HBase中
1.在HBase中建表
create 'products','data','category'
2.sqoop导入
sqoop import \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--username root \
--password root \
--table products \
--hbase-table products \
--column-family data \
--m 3
- HDFS 向MySQL中导出数据\
1.MySQL中建表
create table customers_demo as select * from customers where 1=2;2.上传数据
hdfs dfs -mkdir /customerinput
hdfs dfs -put customers.csv /customerinput3.导出数据
sqoop export \
--connect jdbc:mysql://localhost:3306/retail_db \
--driver com.mysql.jdbc.Driver \
--username root \
--password root \
--table customers_demo \
--export-dir /customerinput \
--m 1
- sqoop 脚本
1.编写脚本 job_RDBMS2HDFS.opt
--------------------------------
import
--connect
jdbc:mysql://localhost:3306/retail_db
--driver
com.mysql.jdbc.Driver
--table
customers
--username
root
--password
root
--target-dir
/data/retail_db/customers
--delete-target-dir
--m
3
--------------------------------
2.执行脚本
sqoop --options-file job_RDBMS2HDFS.opt