oracle的DDL语句如下:
CREATE TABLE TPPROD.CONFIG (NO VARCHAR2(50),CONFIGCODE VARCHAR2(400),CONFIGVALUE VARCHAR2(400),CONSTRAINT PK_GUENDORASSISTCONFIG PRIMARY KEY (NO,CONFIGCODE)
);
CREATE UNIQUE INDEX PK_GUENDORASSISTCONFIG ON TPPROD.GUENDORASSISTCONFIG (NO,CONFIGCODE);
解析oracle表生成高斯内表入口:Oracle2GaussCreateMana.scala
package com.tpiods.sqoop.oracle2gaussimport java.io.{File, FileInputStream, PrintWriter}
/*** 输入: oracle建表语句* 输出: gauss内表建表语句*/
object Oracle2GaussCreateMana {def main(args: Array[String]): Unit = {// 指定输入sql语句的文件val input = "ods_etl/src/main/resources/work0409_test/test_tab.sql"val lines = scala.io.Source.fromInputStream(new FileInputStream(input)).getLines()val str = readSql(lines)val str1 = replaceSql(str)val str2 = addTeacCols3(str1)// 指定输出sql语句的文件val output1 = "ods_etl/src/main/resources/work0409_test/test_tab_mana.sql"val writer = new PrintWriter(new File(output1))writer.write(str2)writer.close()}/*** 处理建表语句的转换逻辑* @param lines* @return*/def readSql(lines: Iterator[String]): String = {val sb = new StringBuildervar colCnt = 0var pkCols: String = ""for (line <- lines if line != null) {if (line.startsWith("CREATE TABLE") || line.startsWith(" CREATE TABLE") || line.startsWith(" CREATE TABLE")) {colCnt = colCnt + 1val tabName = line.split("\\.")(1).replaceAll(" ", "").replaceAll("\\(", "")if (sb.length >= 1) {sb.append(s"""|set search_path = ods;|drop table if exists ${tabName};|create table if not exists ${tabName} (\t\n""".stripMargin)} else {sb.append(s"""|set search_path = ods;|drop table if exists ${tabName};|create table if not exists ${tabName} (\t\n""".stripMargin)}}if (line.startsWith("\t\"") || line.startsWith("\t")) {val split = line.split("\\s+")if (!("CONSTRAINT" == split(1)) && split.length >= 3) {if (colCnt == 1) {sb.append(" ").append(split(1)).append("\t").append(split(2)).append("\n")} else {sb.append(",").append(split(1)).append("\t").append(split(2)).append("\n")}}colCnt = 0}if (line.startsWith(" (") || line.startsWith(" ( ")) {val split = line.split("\\s+")sb.append(split(2)).append("\t").append(split(3)).append("\n")}if (line.indexOf("PRIMARY KEY ")> 0) {pkCols = line.split("PRIMARY KEY ")(1)}if (line.startsWith(");") || line.endsWith("MOVEMENT |") || line.startsWith(" )")) {
// sb.append(
// s""") WITH (orientation=column, compression=low)
// |DISTRIBUTE BY HASH${pkCols}
// |;
// |""".stripMargin)// 不按主键做hash,则打开下面这段代码sb.append(s""");|""".stripMargin)}}sb.toString()}/*** oracle数据类型替换成gauss对应的数据类型* @param str* @return*/def replaceSql(str: String): String = {str.toLowerCase().replaceAll("\"", "").replaceAll("number\\(\\*+,\\d*\\)", "number").replaceAll(",\n", "\n").replaceAll("\\b" + "nchar" + "\\b", "char").replaceAll("\\b" + "varchar2|nvarchar2" + "\\b", "varchar").replaceAll("\\b" + "long" + "\\b", "text")}/**** @param lines2 读入字段类型替换后的sql* @return 业务字段之后增加技术字段*/def addTeacCols(lines2: String): String = {lines2.replaceAll("\\) with",""",oper varchar|,mtime numeric(38,5)|,source_sys varchar|,etl_time varchar|,etl_date varchar|)|with""".stripMargin)}/***不按主键做hash,调用该方法* @param lines2 读入字段类型替换后的sql* @return 业务字段之后增加技术字段*/def addTeacCols2(lines2: String): String = {lines2.replaceAll("\\);",""",oper varchar|,mtime numeric(38,5)|,source_sys varchar|,etl_time varchar|,etl_date varchar|);|""".stripMargin)}/*** 按主键做hash* @param lines2 读入字段类型替换后的sql* @return 业务字段之前增加技术字段*/def addTeacCols3(lines2: String): String = {lines2.replaceAll("\t\n","""| tabname varchar|,oper varchar|,mtime numeric(38,5)|,source_sys varchar|,etl_time varchar|,etl_date varchar|,""".stripMargin)}/*** 不按主键做hash,调用这个方法* @param lines2 读入字段类型替换后的sql* @return 业务字段之前增加技术字段*/def addTeacCols4(lines2: String): String = {lines2.replaceAll("\t\n","""| tabname varchar|,oper varchar|,mtime numeric(38,5)|,source_sys varchar|,etl_time varchar|,etl_date varchar|,""".stripMargin)}
}
运行以上代码,输出如下:
set search_path = ods;
drop table if exists config;
create table if not exists config (tabname varchar
,oper varchar
,mtime numeric(38,5)
,source_sys varchar
,etl_time varchar
,etl_date varchar
, no varchar(50)
,configcode varchar(400)
,configvalue varchar(400)
);
解析oracle表生成表名字段主键配置入口:OracleGeneTabColsPk.scala
package com.tpiods.sqoop.oracle2gaussimport java.io.{File, FileInputStream, PrintWriter}
/*** 输入: oracle源系统建表语句* 输出: 表名|列名...|主键*/
object OracleGeneTabColsPk {def main(args: Array[String]): Unit = {// 指定输入sql语句的文件val input = "ods_etl/src/main/resources/work0409_test/test_tab.sql"val lines = scala.io.Source.fromInputStream(new FileInputStream(input)).getLines()val str = readSql(lines)// 指定输出sql语句的文件val output = "ods_etl/src/main/resources/work0409_test/test_tab_tabcolspk.txt"val writer = new PrintWriter(new File(output))writer.write(str)writer.close()}def readSql(lines: Iterator[String]): String = {val sb = new StringBuildervar tableName: String = nullfor (tempString <- lines if tempString != null) {if (tempString.startsWith("CREATE TABLE") || tempString.startsWith(" CREATE TABLE")) {tableName = tempString.split("\\.")(1).replaceAll(" ", "")sb.append("\n").append(tableName).append("|")} else if (tempString.indexOf("PRIMARY KEY") >= 0) {val pkName = tempString.split("\\(")(1).replaceAll(" ", "").replaceAll("\\)", "")sb.append("|").append(pkName)}else if (tempString.startsWith(" (") || tempString.startsWith(" ( ")) {val split = tempString.split("\\s+")sb.append(split(2)).append(",")}else if (tempString.startsWith("\t\"") || tempString.startsWith("\t")) {val split = tempString.split("\\s+")if (!("CONSTRAINT" == split(1)) && split.length >= 3) {sb.append(split(1)).append(",")}}}val sb2 = sb.toString().toLowerCase().replaceAll("\"", "").replaceAll(",\n", "\n").replaceAll(",\\|", "\\|").replaceAll("\\(\\|", "\\|")if (sb2.endsWith(",")) {sb2.substring(0, sb2.length() - 1)} else {sb2}}}
运行以上代码,输出如下:
config|no,configcode,configvalue|no,configcode