JDBC连接器开发
- 说明
- 必要知识
- Seatunnel基础开发流程
- 拉取项目
- 编译构建
- 运行工程样例
- 打包发布
- JDBC连接器开发
- 包目录介绍
- 食用技巧
- catalog中的类介绍
- dialect中的类介绍
说明
该文档旨在帮助开发人员,快速了解熟悉seatunnel2.3.8程序框架,并能够进行JDBC连接器开发内容。
必要知识
在进行开发前,你一定要仔细阅读如下框架文档,它们存在于seatunnel源码中的docs目录,这些文档能够帮你快速上手和规范开发,如果你已经阅读过则无需关注。
- docs/zh/concept/config.md
- docs/zh/concept/connector-v2-features.md
- docs/zh/concept/JobEnvConfig.md
- docs/zh/connector-v2/sink/Jdbc.md
- docs/zh/connector-v2/sink-common-options.md
- docs/zh/connector-v2/source-common-options.md
- 连接器开发,必看
Seatunnel基础开发流程
拉取项目
git clone 【seatunnel项目地址】
编译构建
- 选择Profiles
- 取消代码风格检查
java"> <skip.spotless>true</skip.spotless>
- mvn打包安装本地包
mvn install -Dmaven.test.skip=true
运行工程样例
seatunnel-examples是seatunnel本地环境运行模块,运行SeaTunnelEngineLocalExample的样例配置文件fake_to_console.conf,可查看seatunnel运行环境是否成功。
打包发布
选择Profiles后,通过Dist模块可直接构建发布包。
JDBC连接器开发
包目录介绍
食用技巧
做JDBC连接器开发我们一般仅需要关注两部分,第一个是catalog包目录,第二个是internal中的dialect。这两部分已经提供了区别不同数据库的差异性描述,其余大部分代码是公共使用,不建议轻易修改,否则可能会影响所有引用类。
catalog中的类介绍
MySqlCatalogFactory中使用factoryIdentifier()来标识数据库类型,optionRule()用于定制连接器参数效验规则,createCatalog()是工厂类用来创建实例的方法。
java">@AutoService(Factory.class)
public class MySqlCatalogFactory implements CatalogFactory {@Overridepublic String factoryIdentifier() {return DatabaseIdentifier.MYSQL;}@Overridepublic Catalog createCatalog(String catalogName, ReadonlyConfig options) {String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);Preconditions.checkArgument(StringUtils.isNoneBlank(urlWithDatabase),"Miss config <base-url>! Please check your config.");JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);return new MySqlCatalog(catalogName,options.get(JdbcCatalogOptions.USERNAME),options.get(JdbcCatalogOptions.PASSWORD),urlInfo);}@Overridepublic OptionRule optionRule() {return JdbcCatalogOptions.BASE_RULE.build();}
}
MySqlCatalog 中包含了对于数据库元数据的查询,例如SELECT_DATABASE_EXISTS(库信息查询)、SELECT_TABLE_EXISTS (表信息查询),还定义了一些库表生成语句,例如获取表DDL语句,并且使用getTable()方法还能直接获取到外部表。
java">@Slf4j
public class MySqlCatalog extends AbstractJdbcCatalog {private static final String SELECT_COLUMNS_SQL_TEMPLATE ="SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";private static final String SELECT_DATABASE_EXISTS ="SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";private static final String SELECT_TABLE_EXISTS ="SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";private MySqlVersion version;private MySqlTypeConverter typeConverter;public MySqlCatalog(String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {super(catalogName, username, pwd, urlInfo, null);this.version = resolveVersion();this.typeConverter = new MySqlTypeConverter(version);}@Overrideprotected String getDatabaseWithConditionSql(String databaseName) {return String.format(SELECT_DATABASE_EXISTS, databaseName);}@Overrideprotected String getTableWithConditionSql(TablePath tablePath) {return String.format(SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());}@Overrideprotected String getListDatabaseSql() {return "SHOW DATABASES;";}@Overrideprotected String getListTableSql(String databaseName) {return "SHOW TABLES;";}@Overrideprotected String getTableName(ResultSet rs) throws SQLException {return rs.getString(1);}@Overrideprotected String getTableName(TablePath tablePath) {return tablePath.getTableName();}@Overrideprotected String getSelectColumnsSql(TablePath tablePath) {return String.format(SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName());}@Overrideprotected TableIdentifier getTableIdentifier(TablePath tablePath) {return TableIdentifier.of(catalogName, tablePath.getDatabaseName(), tablePath.getTableName());}@Overrideprotected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)throws SQLException {List<ConstraintKey> indexList =super.getConstraintKeys(metaData,tablePath.getDatabaseName(),tablePath.getSchemaName(),tablePath.getTableName());for (Iterator<ConstraintKey> it = indexList.iterator(); it.hasNext(); ) {ConstraintKey index = it.next();if (ConstraintKey.ConstraintType.UNIQUE_KEY.equals(index.getConstraintType())&& "PRIMARY".equals(index.getConstraintName())) {it.remove();}}return indexList;}@Overrideprotected Column buildColumn(ResultSet resultSet) throws SQLException {String columnName = resultSet.getString("COLUMN_NAME");// e.g. tinyint(1) unsignedString columnType = resultSet.getString("COLUMN_TYPE");// e.g. tinyintString dataType = resultSet.getString("DATA_TYPE").toUpperCase();String comment = resultSet.getString("COLUMN_COMMENT");Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");String isNullableStr = resultSet.getString("IS_NULLABLE");boolean isNullable = isNullableStr.equals("YES");// e.g. `decimal(10, 2)` is 10long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");// e.g. `decimal(10, 2)` is 2int numberScale = resultSet.getInt("NUMERIC_SCALE");// e.g. `varchar(10)` is 40long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");// e.g. `timestamp(3)` is 3int timePrecision =MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));MysqlType mysqlType = MysqlType.getByName(columnType);boolean unsigned = columnType.toLowerCase(Locale.ROOT).contains("unsigned");BasicTypeDefine<MysqlType> typeDefine =BasicTypeDefine.<MysqlType>builder().name(columnName).columnType(columnType).dataType(dataType).nativeType(mysqlType).unsigned(unsigned).length(Math.max(charOctetLength, numberPrecision)).precision(numberPrecision).scale(Math.max(numberScale, timePrecision)).nullable(isNullable).defaultValue(defaultValue).comment(comment).build();return typeConverter.convert(typeDefine);}@Overrideprotected String getCreateTableSql(TablePath tablePath, CatalogTable table, boolean createIndex) {return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex).build(table.getCatalogName());}@Overrideprotected String getDropTableSql(TablePath tablePath) {return String.format("DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());}@Overrideprotected String getCreateDatabaseSql(String databaseName) {return String.format("CREATE DATABASE `%s`;", databaseName);}@Overrideprotected String getDropDatabaseSql(String databaseName) {return String.format("DROP DATABASE `%s`;", databaseName);}@Overridepublic CatalogTable getTable(String sqlQuery) throws SQLException {Connection defaultConnection = getConnection(defaultUrl);return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));}@Overrideprotected String getTruncateTableSql(TablePath tablePath) throws CatalogException {return String.format("TRUNCATE TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getTableName());}public String getExistDataSql(TablePath tablePath) {return String.format("SELECT * FROM `%s`.`%s` LIMIT 1;",tablePath.getDatabaseName(), tablePath.getTableName());}private MySqlVersion resolveVersion() {try (Statement statement = getConnection(defaultUrl).createStatement();ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {resultSet.next();return MySqlVersion.parse(resultSet.getString(1));} catch (Exception e) {log.info("Failed to get mysql version, fallback to default version: {}",MySqlVersion.V_5_7,e);return MySqlVersion.V_5_7;}}
}
MysqlCreateTableSqlBuilder和MysqlDataTypeConvertor是用于获取表DDL语句和类型转换器方法,较为简单这里就不一一赘述。
dialect中的类介绍
MySqlDialectFactory使用工厂方式创建MySqlDialect实例,使用acceptsURL()方法判断不同jdbc url,从而识别不同数据库类型。create()的重载方法,主要是用于数据库端兼容多个数据库方言的场景。
java">@AutoService(JdbcDialectFactory.class)
public class MySqlDialectFactory implements JdbcDialectFactory {@Overridepublic boolean acceptsURL(String url) {return url.startsWith("jdbc:mysql:");}@Overridepublic JdbcDialect create() {return new MysqlDialect();}@Overridepublic JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) {return new StarRocksDialect(fieldIde);}return new MysqlDialect(fieldIde);}
}
MySqlDialect 的getRowConverter()和getTypeConverter()用于获取行对象转换器和类型转换器,行对象转换器主要提供JDBC数据对象 <<<>>>SeaTunnelRow对象互转方法,类型转换器主要提供JDBC数据类型 <<<>>>Seatunnel数据类型互转的方法,这两者共同作用于引擎侧内部对象和JDBC数据集的转入转出。MySqlDialect中还定义了一些不通数据库操作的特性功能,例如Upsert功能实现和转义符号等。MySqlDialect还能使用defaultParameter()方法为jdbc url串定制一些默认的参数。
java">public class MysqlDialect implements JdbcDialect {private static final List NOT_SUPPORTED_DEFAULT_VALUES =Arrays.asList(MysqlType.BLOB, MysqlType.TEXT, MysqlType.JSON, MysqlType.GEOMETRY);public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();public MysqlDialect() {}public MysqlDialect(String fieldIde) {this.fieldIde = fieldIde;}@Overridepublic String dialectName() {return DatabaseIdentifier.MYSQL;}@Overridepublic JdbcRowConverter getRowConverter() {return new MysqlJdbcRowConverter();}@Overridepublic TypeConverter<BasicTypeDefine> getTypeConverter() {TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE;return typeConverter;}@Overridepublic JdbcDialectTypeMapper getJdbcDialectTypeMapper() {return new MySqlTypeMapper();}@Overridepublic String quoteIdentifier(String identifier) {return "`" + getFieldIde(identifier, fieldIde) + "`";}@Overridepublic String quoteDatabaseIdentifier(String identifier) {return "`" + identifier + "`";}@Overridepublic String tableIdentifier(TablePath tablePath) {return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName());}@Overridepublic Optional<String> getUpsertStatement(String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {String updateClause =Arrays.stream(fieldNames).map(fieldName ->quoteIdentifier(fieldName)+ "=VALUES("+ quoteIdentifier(fieldName)+ ")").collect(Collectors.joining(", "));String upsertSQL =getInsertIntoStatement(database, tableName, fieldNames)+ " ON DUPLICATE KEY UPDATE "+ updateClause;return Optional.of(upsertSQL);}@Overridepublic PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {PreparedStatement statement =connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);statement.setFetchSize(Integer.MIN_VALUE);return statement;}@Overridepublic String extractTableName(TablePath tablePath) {return tablePath.getTableName();}@Overridepublic Map<String, String> defaultParameter() {HashMap<String, String> map = new HashMap<>();map.put("rewriteBatchedStatements", "true");return map;}@Overridepublic TablePath parse(String tablePath) {return TablePath.of(tablePath, false);}@Overridepublic Object[] sampleDataFromColumn(Connection connection,JdbcSourceTable table,String columnName,int samplingRate,int fetchSize)throws Exception {String sampleQuery;if (StringUtils.isNotBlank(table.getQuery())) {sampleQuery =String.format("SELECT %s FROM (%s) AS T",quoteIdentifier(columnName), table.getQuery());} else {sampleQuery =String.format("SELECT %s FROM %s",quoteIdentifier(columnName), tableIdentifier(table.getTablePath()));}try (Statement stmt =connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {stmt.setFetchSize(Integer.MIN_VALUE);try (ResultSet rs = stmt.executeQuery(sampleQuery)) {int count = 0;List<Object> results = new ArrayList<>();while (rs.next()) {count++;if (count % samplingRate == 0) {results.add(rs.getObject(1));}if (Thread.currentThread().isInterrupted()) {throw new InterruptedException("Thread interrupted");}}Object[] resultsArray = results.toArray();Arrays.sort(resultsArray);return resultsArray;}}}@Overridepublic Long approximateRowCntStatement(Connection connection, JdbcSourceTable table)throws SQLException {// 1. If no query is configured, use TABLE STATUS.// 2. If a query is configured but does not contain a WHERE clause and tablePath is// configured , use TABLE STATUS.// 3. If a query is configured with a WHERE clause, or a query statement is configured but// tablePath is TablePath.DEFAULT, use COUNT(*).boolean useTableStats =StringUtils.isBlank(table.getQuery())|| (!table.getQuery().toLowerCase().contains("where")&& table.getTablePath() != null&& !TablePath.DEFAULT.getFullName().equals(table.getTablePath().getFullName()));if (useTableStats) {// The statement used to get approximate row count which is less// accurate than COUNT(*), but is more efficient for large table.TablePath tablePath = table.getTablePath();String useDatabaseStatement =String.format("USE %s;", quoteDatabaseIdentifier(tablePath.getDatabaseName()));String rowCountQuery =String.format("SHOW TABLE STATUS LIKE '%s';", tablePath.getTableName());try (Statement stmt = connection.createStatement()) {log.info("Split Chunk, approximateRowCntStatement: {}", useDatabaseStatement);stmt.execute(useDatabaseStatement);log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery);try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {throw new SQLException(String.format("No result returned after running query [%s]",rowCountQuery));}return rs.getLong(5);}}}return SQLUtils.countForSubquery(connection, table.getQuery());}@Overridepublic boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));}@Overridepublic boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {MysqlType mysqlType = MysqlType.getByName(columnDefine.getColumnType());switch (mysqlType) {case CHAR:case VARCHAR:case TEXT:case TINYTEXT:case MEDIUMTEXT:case LONGTEXT:case ENUM:case SET:case BLOB:case TINYBLOB:case MEDIUMBLOB:case LONGBLOB:case DATE:case DATETIME:case TIMESTAMP:case TIME:case YEAR:return true;default:return false;}}
}
MySqlTypeConverter类只存在convert和reconvert,分别对应JDBC类型转换为SEATUNNEL 类型以及SEATUNNEL类型转换为JDBC类型,同时值得注意的是,在运行SEATUNNEL任务时,若存在该类中没有被定义的类型转换规则,则会抛出运行时异常UNSUPPORTED_DATA_TYPE,所以铁子们要想支持兼容更多的数据类型,应该从此类下手。
java">public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlType>> {// ============================data types=====================static final String MYSQL_NULL = "NULL";static final String MYSQL_BIT = "BIT";static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED";// -------------------------number----------------------------static final String MYSQL_TINYINT = "TINYINT";static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";static final String MYSQL_SMALLINT = "SMALLINT";static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";static final String MYSQL_MEDIUMINT = "MEDIUMINT";static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";static final String MYSQL_INT = "INT";static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";static final String MYSQL_INTEGER = "INTEGER";static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";static final String MYSQL_BIGINT = "BIGINT";static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";static final String MYSQL_DECIMAL = "DECIMAL";static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";static final String MYSQL_FLOAT = "FLOAT";static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";static final String MYSQL_DOUBLE = "DOUBLE";static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";// -------------------------string----------------------------public static final String MYSQL_CHAR = "CHAR";public static final String MYSQL_VARCHAR = "VARCHAR";static final String MYSQL_TINYTEXT = "TINYTEXT";static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";static final String MYSQL_TEXT = "TEXT";static final String MYSQL_LONGTEXT = "LONGTEXT";static final String MYSQL_JSON = "JSON";static final String MYSQL_ENUM = "ENUM";// ------------------------------time-------------------------static final String MYSQL_DATE = "DATE";public static final String MYSQL_DATETIME = "DATETIME";public static final String MYSQL_TIME = "TIME";public static final String MYSQL_TIMESTAMP = "TIMESTAMP";static final String MYSQL_YEAR = "YEAR";static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED";// ------------------------------blob-------------------------static final String MYSQL_TINYBLOB = "TINYBLOB";static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";static final String MYSQL_BLOB = "BLOB";static final String MYSQL_LONGBLOB = "LONGBLOB";static final String MYSQL_BINARY = "BINARY";static final String MYSQL_VARBINARY = "VARBINARY";static final String MYSQL_GEOMETRY = "GEOMETRY";public static final int DEFAULT_PRECISION = 38;public static final int MAX_PRECISION = 65;public static final int DEFAULT_SCALE = 18;public static final int MAX_SCALE = 30;public static final int MAX_TIME_SCALE = 6;public static final int MAX_TIMESTAMP_SCALE = 6;public static final long POWER_2_8 = (long) Math.pow(2, 8);public static final long POWER_2_16 = (long) Math.pow(2, 16);public static final long POWER_2_24 = (long) Math.pow(2, 24);public static final long POWER_2_32 = (long) Math.pow(2, 32);public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;public static final MySqlTypeConverter DEFAULT_INSTANCE =new MySqlTypeConverter(MySqlVersion.V_5_7);private final MySqlVersion version;public MySqlTypeConverter(MySqlVersion version) {this.version = version;}public MySqlTypeConverter() {this(MySqlVersion.V_5_7);}@Overridepublic String identifier() {return DatabaseIdentifier.MYSQL;}@Overridepublic Column convert(BasicTypeDefine typeDefine) {PhysicalColumn.PhysicalColumnBuilder builder =PhysicalColumn.builder().name(typeDefine.getName()).sourceType(typeDefine.getColumnType()).nullable(typeDefine.isNullable()).defaultValue(typeDefine.getDefaultValue()).comment(typeDefine.getComment());String mysqlDataType = typeDefine.getDataType().toUpperCase();if (mysqlDataType.endsWith("ZEROFILL")) {mysqlDataType =mysqlDataType.substring(0, mysqlDataType.length() - "ZEROFILL".length()).trim();}if (typeDefine.isUnsigned() && !(mysqlDataType.endsWith(" UNSIGNED"))) {mysqlDataType = mysqlDataType + " UNSIGNED";}switch (mysqlDataType) {case MYSQL_NULL:builder.dataType(BasicType.VOID_TYPE);break;case MYSQL_BIT:case MYSQL_BIT_UNSIGNED:if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {builder.dataType(BasicType.BOOLEAN_TYPE);} else if (typeDefine.getLength() == 1) {builder.dataType(BasicType.BOOLEAN_TYPE);} else {builder.dataType(PrimitiveByteArrayType.INSTANCE);// BIT(M) -> BYTE(M/8)long byteLength = typeDefine.getLength() / 8;byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;builder.columnLength(byteLength);}break;case MYSQL_TINYINT:if (typeDefine.getColumnType().equalsIgnoreCase("tinyint(1)")) {builder.dataType(BasicType.BOOLEAN_TYPE);} else {builder.dataType(BasicType.BYTE_TYPE);}break;case MYSQL_TINYINT_UNSIGNED:case MYSQL_SMALLINT:builder.dataType(BasicType.SHORT_TYPE);break;case MYSQL_SMALLINT_UNSIGNED:case MYSQL_MEDIUMINT:case MYSQL_MEDIUMINT_UNSIGNED:case MYSQL_INT:case MYSQL_INTEGER:case MYSQL_YEAR:case MYSQL_YEAR_UNSIGNED:builder.dataType(BasicType.INT_TYPE);break;case MYSQL_INT_UNSIGNED:case MYSQL_INTEGER_UNSIGNED:case MYSQL_BIGINT:builder.dataType(BasicType.LONG_TYPE);break;case MYSQL_BIGINT_UNSIGNED:DecimalType intDecimalType = new DecimalType(20, 0);builder.dataType(intDecimalType);builder.columnLength(Long.valueOf(intDecimalType.getPrecision()));builder.scale(intDecimalType.getScale());break;case MYSQL_FLOAT:builder.dataType(BasicType.FLOAT_TYPE);break;case MYSQL_FLOAT_UNSIGNED:log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);builder.dataType(BasicType.FLOAT_TYPE);break;case MYSQL_DOUBLE:builder.dataType(BasicType.DOUBLE_TYPE);break;case MYSQL_DOUBLE_UNSIGNED:log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);builder.dataType(BasicType.DOUBLE_TYPE);break;case MYSQL_DECIMAL:Preconditions.checkArgument(typeDefine.getPrecision() > 0);DecimalType decimalType;if (typeDefine.getPrecision() > DEFAULT_PRECISION) {log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL);decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);} else {decimalType =new DecimalType(typeDefine.getPrecision().intValue(),typeDefine.getScale() == null? 0: typeDefine.getScale().intValue());}builder.dataType(decimalType);builder.columnLength(Long.valueOf(decimalType.getPrecision()));builder.scale(decimalType.getScale());break;case MYSQL_DECIMAL_UNSIGNED:Preconditions.checkArgument(typeDefine.getPrecision() > 0);log.warn("{} will probably cause value overflow.", MYSQL_DECIMAL_UNSIGNED);DecimalType decimalUnsignedType =new DecimalType(typeDefine.getPrecision().intValue() + 1,typeDefine.getScale() == null? 0: typeDefine.getScale().intValue());builder.dataType(decimalUnsignedType);builder.columnLength(Long.valueOf(decimalUnsignedType.getPrecision()));builder.scale(decimalUnsignedType.getScale());break;case MYSQL_ENUM:builder.dataType(BasicType.STRING_TYPE);if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {builder.columnLength(100L);} else {builder.columnLength(typeDefine.getLength());}break;case MYSQL_CHAR:case MYSQL_VARCHAR:if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {builder.columnLength(TypeDefineUtils.charTo4ByteLength(1L));} else {builder.columnLength(typeDefine.getLength());}builder.dataType(BasicType.STRING_TYPE);break;case MYSQL_TINYTEXT:builder.dataType(BasicType.STRING_TYPE);builder.columnLength(POWER_2_8 - 1);break;case MYSQL_TEXT:builder.dataType(BasicType.STRING_TYPE);builder.columnLength(POWER_2_16 - 1);break;case MYSQL_MEDIUMTEXT:builder.dataType(BasicType.STRING_TYPE);builder.columnLength(POWER_2_24 - 1);break;case MYSQL_LONGTEXT:builder.dataType(BasicType.STRING_TYPE);builder.columnLength(POWER_2_32 - 1);break;case MYSQL_JSON:builder.dataType(BasicType.STRING_TYPE);break;case MYSQL_BINARY:case MYSQL_VARBINARY:if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {builder.columnLength(1L);} else {builder.columnLength(typeDefine.getLength());}builder.dataType(PrimitiveByteArrayType.INSTANCE);break;case MYSQL_TINYBLOB:builder.dataType(PrimitiveByteArrayType.INSTANCE);builder.columnLength(POWER_2_8 - 1);break;case MYSQL_BLOB:builder.dataType(PrimitiveByteArrayType.INSTANCE);builder.columnLength(POWER_2_16 - 1);break;case MYSQL_MEDIUMBLOB:builder.dataType(PrimitiveByteArrayType.INSTANCE);builder.columnLength(POWER_2_24 - 1);break;case MYSQL_LONGBLOB:builder.dataType(PrimitiveByteArrayType.INSTANCE);builder.columnLength(POWER_2_32 - 1);break;case MYSQL_GEOMETRY:builder.dataType(PrimitiveByteArrayType.INSTANCE);break;case MYSQL_DATE:builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);break;case MYSQL_TIME:builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);builder.scale(typeDefine.getScale());break;case MYSQL_DATETIME:case MYSQL_TIMESTAMP:builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);builder.scale(typeDefine.getScale());break;default:throw CommonError.convertToSeaTunnelTypeError(DatabaseIdentifier.MYSQL, mysqlDataType, typeDefine.getName());}return builder.build();}@Overridepublic BasicTypeDefine<MysqlType> reconvert(Column column) {BasicTypeDefine.BasicTypeDefineBuilder builder =BasicTypeDefine.<MysqlType>builder().name(column.getName()).nullable(column.isNullable()).comment(column.getComment()).defaultValue(column.getDefaultValue());switch (column.getDataType().getSqlType()) {case NULL:builder.nativeType(MysqlType.NULL);builder.columnType(MYSQL_NULL);builder.dataType(MYSQL_NULL);break;case BOOLEAN:builder.nativeType(MysqlType.BOOLEAN);builder.columnType(String.format("%s(%s)", MYSQL_TINYINT, 1));builder.dataType(MYSQL_TINYINT);builder.length(1L);break;case TINYINT:builder.nativeType(MysqlType.TINYINT);builder.columnType(MYSQL_TINYINT);builder.dataType(MYSQL_TINYINT);break;case SMALLINT:builder.nativeType(MysqlType.SMALLINT);builder.columnType(MYSQL_SMALLINT);builder.dataType(MYSQL_SMALLINT);break;case INT:builder.nativeType(MysqlType.INT);builder.columnType(MYSQL_INT);builder.dataType(MYSQL_INT);break;case BIGINT:builder.nativeType(MysqlType.BIGINT);builder.columnType(MYSQL_BIGINT);builder.dataType(MYSQL_BIGINT);break;case FLOAT:builder.nativeType(MysqlType.FLOAT);builder.columnType(MYSQL_FLOAT);builder.dataType(MYSQL_FLOAT);break;case DOUBLE:builder.nativeType(MysqlType.DOUBLE);builder.columnType(MYSQL_DOUBLE);builder.dataType(MYSQL_DOUBLE);break;case DECIMAL:DecimalType decimalType = (DecimalType) column.getDataType();long precision = decimalType.getPrecision();int scale = decimalType.getScale();if (precision <= 0) {precision = DEFAULT_PRECISION;scale = DEFAULT_SCALE;log.warn("The decimal column {} type decimal({},{}) is out of range, "+ "which is precision less than 0, "+ "it will be converted to decimal({},{})",column.getName(),decimalType.getPrecision(),decimalType.getScale(),precision,scale);} else if (precision > MAX_PRECISION) {scale = (int) Math.max(0, scale - (precision - MAX_PRECISION));precision = MAX_PRECISION;log.warn("The decimal column {} type decimal({},{}) is out of range, "+ "which exceeds the maximum precision of {}, "+ "it will be converted to decimal({},{})",column.getName(),decimalType.getPrecision(),decimalType.getScale(),MAX_PRECISION,precision,scale);}if (scale < 0) {scale = 0;log.warn("The decimal column {} type decimal({},{}) is out of range, "+ "which is scale less than 0, "+ "it will be converted to decimal({},{})",column.getName(),decimalType.getPrecision(),decimalType.getScale(),precision,scale);} else if (scale > MAX_SCALE) {scale = MAX_SCALE;log.warn("The decimal column {} type decimal({},{}) is out of range, "+ "which exceeds the maximum scale of {}, "+ "it will be converted to decimal({},{})",column.getName(),decimalType.getPrecision(),decimalType.getScale(),MAX_SCALE,precision,scale);}builder.nativeType(MysqlType.DECIMAL);builder.columnType(String.format("%s(%s,%s)", MYSQL_DECIMAL, precision, scale));builder.dataType(MYSQL_DECIMAL);builder.precision(precision);builder.scale(scale);break;case BYTES:if (column.getColumnLength() == null || column.getColumnLength() <= 0) {builder.nativeType(MysqlType.VARBINARY);builder.columnType(String.format("%s(%s)", MYSQL_VARBINARY, MAX_VARBINARY_LENGTH / 2));builder.dataType(MYSQL_VARBINARY);} else if (column.getColumnLength() < MAX_VARBINARY_LENGTH) {builder.nativeType(MysqlType.VARBINARY);builder.columnType(String.format("%s(%s)", MYSQL_VARBINARY, column.getColumnLength()));builder.dataType(MYSQL_VARBINARY);} else if (column.getColumnLength() < POWER_2_24) {builder.nativeType(MysqlType.MEDIUMBLOB);builder.columnType(MYSQL_MEDIUMBLOB);builder.dataType(MYSQL_MEDIUMBLOB);} else {builder.nativeType(MysqlType.LONGBLOB);builder.columnType(MYSQL_LONGBLOB);builder.dataType(MYSQL_LONGBLOB);}break;case STRING:if (column.getColumnLength() == null || column.getColumnLength() <= 0) {builder.nativeType(MysqlType.LONGTEXT);builder.columnType(MYSQL_LONGTEXT);builder.dataType(MYSQL_LONGTEXT);} else if (column.getColumnLength() < POWER_2_8) {builder.nativeType(MysqlType.VARCHAR);builder.columnType(String.format("%s(%s)", MYSQL_VARCHAR, column.getColumnLength()));builder.dataType(MYSQL_VARCHAR);} else if (column.getColumnLength() < POWER_2_16) {builder.nativeType(MysqlType.TEXT);builder.columnType(MYSQL_TEXT);builder.dataType(MYSQL_TEXT);} else if (column.getColumnLength() < POWER_2_24) {builder.nativeType(MysqlType.MEDIUMTEXT);builder.columnType(MYSQL_MEDIUMTEXT);builder.dataType(MYSQL_MEDIUMTEXT);} else {builder.nativeType(MysqlType.LONGTEXT);builder.columnType(MYSQL_LONGTEXT);builder.dataType(MYSQL_LONGTEXT);}break;case DATE:builder.nativeType(MysqlType.DATE);builder.columnType(MYSQL_DATE);builder.dataType(MYSQL_DATE);break;case TIME:builder.nativeType(MysqlType.TIME);builder.dataType(MYSQL_TIME);if (version.isAtOrBefore(MySqlVersion.V_5_5)) {builder.columnType(MYSQL_TIME);} else if (column.getScale() != null && column.getScale() > 0) {int timeScale = column.getScale();if (timeScale > MAX_TIME_SCALE) {timeScale = MAX_TIME_SCALE;log.warn("The time column {} type time({}) is out of range, "+ "which exceeds the maximum scale of {}, "+ "it will be converted to time({})",column.getName(),column.getScale(),MAX_SCALE,timeScale);}builder.columnType(String.format("%s(%s)", MYSQL_TIME, timeScale));builder.scale(timeScale);} else {builder.columnType(MYSQL_TIME);}break;case TIMESTAMP:builder.nativeType(MysqlType.DATETIME);builder.dataType(MYSQL_DATETIME);if (version.isAtOrBefore(MySqlVersion.V_5_5)) {builder.columnType(MYSQL_DATETIME);} else if (column.getScale() != null && column.getScale() > 0) {int timestampScale = column.getScale();if (timestampScale > MAX_TIMESTAMP_SCALE) {timestampScale = MAX_TIMESTAMP_SCALE;log.warn("The timestamp column {} type timestamp({}) is out of range, "+ "which exceeds the maximum scale of {}, "+ "it will be converted to timestamp({})",column.getName(),column.getScale(),MAX_TIMESTAMP_SCALE,timestampScale);}builder.columnType(String.format("%s(%s)", MYSQL_DATETIME, timestampScale));builder.scale(timestampScale);} else {builder.columnType(MYSQL_DATETIME);}break;default:throw CommonError.convertToConnectorTypeError(DatabaseIdentifier.MYSQL,column.getDataType().getSqlType().name(),column.getName());}return builder.build();}
}
MySqlTypeMapper 是老版本的TypeConverter,该类也是引用的MySqlTypeConverter。
MySqlVersion 是版本控制。