概述
本次修改基于 SeaTunnel-2.3.4 版本
在产品的业务里面,需要一个通过 CSV 来同步数据的功能,但 SeaTunnel 的 CSV 读取功能,并没有根据 CSV 规则来分隔字符串,而是简单的用换行符来分隔行,用 String.split
方法来分隔列,一旦列的值里面包含有换行符或者分隔符,结果就会错误。
所以在这里,我们对 Apache SeaTunnel 原来的 CSV 读取功能进行改造,让它真正能够处理 CSV。
开始
CSV 的处理在 connector-file-base
模块,在 org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat
这个枚举可以看到,对于读取 CSV 类型,用的是 org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy
。
public enum FileFormat implements Serializable {CSV("csv") {@Overridepublic WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {fileSinkConfig.setFieldDelimiter(",");return new TextWriteStrategy(fileSinkConfig);}@Overridepublic ReadStrategy getReadStrategy() {// csv读取策略return new TextReadStrategy();}},......
}
跟踪进 TextReadStrategy
的 read 方法
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy@Overridepublic void read(String path, String tableId, Collector<SeaTunnelRow> output)throws FileConnectorException, IOException {............// 用的是BufferedReader来读取文件流try (BufferedReader reader =new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {// 使用换行符来识别行reader.lines().skip(skipHeaderNumber).forEach(line -> {try {// 这个deserializationSchema是org.apache.seatunnel.format.text.TextDeserializationSchema// 他的deserialize方法,是使用 String.split 方法进行切割SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(line.getBytes());if (!readColumns.isEmpty()) {// need column projectionObject[] fields;if (isMergePartition) {fields =new Object[readColumns.size()+ partitionsMap.size()];} else {fields = new Object[readColumns.size()];}for (int i = 0; i < indexes.length; i++) {fields[i] = seaTunnelRow.getField(indexes[i]);}seaTunnelRow = new SeaTunnelRow(fields);}if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e) {String errorMsg =String.format("Deserialize this data [%s] failed, please check the origin data",line);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}});}}
发现这个策略内部分割行使用的是 BufferedReader.lines
方法,其实就是按换行符来分割,分割字段用的是 String.split
方法按字符分割。
问题就来了,如果我字段的内容里面就包含有换行符,或者字段内容里面就包含有分割符,那么出来的结果就乱了,把不该切的切了。
所以咱们要把这些逻辑进行改造,让它正确的切割字符串。
新增单独的 CSV 处理逻辑
我们要做的是用一种 CSV 专用的处理逻辑来替换 seatunnel 原本的 CSV 处理逻辑
引入 CSV 第三方处理库
在这里我选用的是 opencsv
这个库来处理 CSV 。
在 connector-file-base
的 pom.xml
里将 opencsv
引入。
<dependency><groupId>com.opencsv</groupId><artifactId>opencsv</artifactId><version>5.7.1</version>
</dependency>
新增 CSV 序列化类 DeserializationSchema
新建一个包 org.apache.seatunnel.connectors.seatunnel.file.format.csv
,在下面新建一个 CSV 序列化类 CsvDeserializationSchema
实现 DeserializationSchema
接口(参照 TextDeserializationSchema
),下面是 DeserializationSchema
以及它需要用到的 StrToNumberUtil
的完整代码,不想仔细看的小伙伴可以直接复制去用。
新增工具类 org.apache.seatunnel.connectors.seatunnel.file.utils.StrToNumberUtil
package org.apache.seatunnel.connectors.seatunnel.file.utils;import org.apache.commons.lang3.StringUtils;import java.math.BigDecimal;
import java.util.Optional;public class StrToNumberUtil {public static Double str2Double(String str) {if (StringUtils.isBlank(str)) {return null;}try {return Double.parseDouble(str.trim());} catch (Exception e) {return null;}}public static Long str2Long(String str) {if (StringUtils.isBlank(str)) {return null;}str = str.trim();// 多个小数点,不是数字,passif (str.indexOf('.') != str.lastIndexOf('.')) {return null;}// 取整数位String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str;try {return Long.parseLong(sub);} catch (Exception e) {return null;}}public static Byte str2Byte(String s) {return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null);}public static Short str2Short(String s) {return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null);}public static Integer str2Int(String s) {return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null);}public static Float str2Float(String s) {return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null);}public static BigDecimal str2BigDecimal(String s) {if (StringUtils.isBlank(s)) {return null;}try {return new BigDecimal(s.trim());} catch (Exception e) {return null;}}}
新增 CSV 序列化类 org.apache.seatunnel.connectors.seatunnel.file.format.csv.CsvDeserializationSchema
package org.apache.seatunnel.connectors.seatunnel.file.format.csv;import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.utils.StrToNumberUtil;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;import org.apache.commons.lang3.StringUtils;import lombok.NonNull;import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.*;public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {private final SeaTunnelRowType seaTunnelRowType;private final String[] separators;private final DateUtils.Formatter dateFormatter;private final DateTimeUtils.Formatter dateTimeFormatter;private final TimeUtils.Formatter timeFormatter;private CsvDeserializationSchema(@NonNull SeaTunnelRowType seaTunnelRowType,String[] separators,DateUtils.Formatter dateFormatter,DateTimeUtils.Formatter dateTimeFormatter,TimeUtils.Formatter timeFormatter) {this.seaTunnelRowType = seaTunnelRowType;this.separators = separators;this.dateFormatter = dateFormatter;this.dateTimeFormatter = dateTimeFormatter;this.timeFormatter = timeFormatter;}public static Builder builder() {return new Builder();}public static class Builder {private SeaTunnelRowType seaTunnelRowType;private String[] separators = TextFormatConstant.SEPARATOR.clone();private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;private DateTimeUtils.Formatter dateTimeFormatter =DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;private Builder() {}public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {this.seaTunnelRowType = seaTunnelRowType;return this;}public Builder delimiter(String delimiter) {this.separators[0] = delimiter;return this;}public Builder separators(String[] separators) {this.separators = separators;return this;}public Builder dateFormatter(DateUtils.Formatter dateFormatter) {this.dateFormatter = dateFormatter;return this;}public Builder dateTimeFormatter(DateTimeUtils.Formatter dateTimeFormatter) {this.dateTimeFormatter = dateTimeFormatter;return this;}public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {this.timeFormatter = timeFormatter;return this;}public CsvDeserializationSchema build() {return new CsvDeserializationSchema(seaTunnelRowType, separators, dateFormatter, dateTimeFormatter, timeFormatter);}}/*** 传入json字符串,key是列号(从0开始),value是列的值** @param message 传入json字符串,key是列号(从0开始),value是列的值*/@Overridepublic SeaTunnelRow deserialize(byte[] message) throws IOException {String content = new String(message);ObjectMapper objectMapper = new ObjectMapper();com.fasterxml.jackson.databind.type.MapType javaType =objectMapper.getTypeFactory().constructMapType(HashMap.class, Integer.class, String.class);Map<Integer, String> splitsMap = objectMapper.readValue(content, javaType);Object[] objects = new Object[seaTunnelRowType.getTotalFields()];for (int i = 0; i < objects.length; i++) {objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i), 0);}return new SeaTunnelRow(objects);}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return seaTunnelRowType;}private Map<Integer, String> splitLineBySeaTunnelRowType(String line, SeaTunnelRowType seaTunnelRowType, int level) {String[] splits = splitLineWithCsvMethod(line, separators[level].charAt(0));LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();for (int i = 0; i < splits.length; i++) {splitsMap.put(i, splits[i]);}if (fieldTypes.length > splits.length) {// contains partition columnsfor (int i = splits.length; i < fieldTypes.length; i++) {splitsMap.put(i, null);}}return splitsMap;}private String[] splitLineWithCsvMethod(String line, char sep) {CSVParser csvParser = new CSVParserBuilder().withSeparator(sep).build();try (CSVReader reader = new CSVReaderBuilder(new StringReader(line)).withCSVParser(csvParser).build()) {Iterator<String[]> iterator = reader.iterator();if (iterator.hasNext()) {return iterator.next();}return new String[0];} catch (Exception e) {return new String[]{line};}}private Object convert(String field, SeaTunnelDataType<?> fieldType, int level) {if (StringUtils.isBlank(field)) {return null;}switch (fieldType.getSqlType()) {case ARRAY:BasicType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();String[] elements = field.split(separators[level + 1]);ArrayList<Object> objectArrayList = new ArrayList<>();for (String element : elements) {objectArrayList.add(convert(element, elementType, level + 1));}switch (elementType.getSqlType()) {case STRING:return objectArrayList.toArray(new String[0]);case BOOLEAN:return objectArrayList.toArray(new Boolean[0]);case TINYINT:return objectArrayList.toArray(new Byte[0]);case SMALLINT:return objectArrayList.toArray(new Short[0]);case INT:return objectArrayList.toArray(new Integer[0]);case BIGINT:return objectArrayList.toArray(new Long[0]);case FLOAT:return objectArrayList.toArray(new Float[0]);case DOUBLE:return objectArrayList.toArray(new Double[0]);default:throw new SeaTunnelTextFormatException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,String.format("SeaTunnel array not support this data type [%s]",elementType.getSqlType()));}case MAP:SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();LinkedHashMap<Object, Object> objectMap = new LinkedHashMap<>();String[] kvs = field.split(separators[level + 1]);for (String kv : kvs) {String[] splits = kv.split(separators[level + 2]);if (splits.length < 2) {objectMap.put(convert(splits[0], keyType, level + 1), null);} else {objectMap.put(convert(splits[0], keyType, level + 1),convert(splits[1], valueType, level + 1));}}return objectMap;case STRING:return field;case BOOLEAN:return Boolean.parseBoolean(field);case TINYINT:return StrToNumberUtil.str2Byte(field);case SMALLINT:return StrToNumberUtil.str2Short(field);case INT:return StrToNumberUtil.str2Int(field);case BIGINT:return StrToNumberUtil.str2Long(field);case FLOAT:return StrToNumberUtil.str2Float(field);case DOUBLE:return StrToNumberUtil.str2Double(field);case DECIMAL:return StrToNumberUtil.str2BigDecimal(field);case NULL:return null;case BYTES:return field.getBytes();case DATE:return DateUtils.parse(field, dateFormatter);case TIME:return TimeUtils.parse(field, timeFormatter);case TIMESTAMP:return DateTimeUtils.parse(field, dateTimeFormatter);case ROW:Map<Integer, String> splitsMap =splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1);Object[] objects = new Object[splitsMap.size()];for (int i = 0; i < objects.length; i++) {objects[i] =convert(splitsMap.get(i),((SeaTunnelRowType) fieldType).getFieldType(i),level + 1);}return new SeaTunnelRow(objects);default:throw new SeaTunnelTextFormatException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,String.format("SeaTunnel not support this data type [%s]",fieldType.getSqlType()));}}
}
它和 TextDeserializationSchema
的主要区别在 deserialize
、splitLineBySeaTunnelRowType
、convert
这三个方法
deserialize
方法:
TextDeserializationSchema
接收的是上层读取的一行的字符串转字节,再调用splitLineBySeaTunnelRowType
方法按分隔符切割,转成列号->值
的 map,传给convert
方法。CsvDeserializationSchema
接收的是一个 json 字符串转字节,这个 json 已经是的 key 为列号(从 0 开始),value 为列的值的形式了,将 json 转成 map 之后,直接可以传给convert
方法convert
方法:在处理TINYINT
、SMALLINT
、INT
、BIGINT
、FLOAT
、DOUBLE
、DECIMAL
类型时,DeserializationSchema
使用了我们自定义的字符串转数值类的工具类(StrToNumberUtil
),主要是为了避免用户提供的文件中,数值类型的值格式不符合 java 自带的字符串转数值方法的格式,从而导致异常的情况
splitLineBySeaTunnelRowType
方法:
TextDeserializationSchema
直接使用 String.split 根据分隔符进行切割,如果值里面也带有分隔符,切割结果会有问题CsvDeserializationSchema
使用 opencsv 来进行切割,完全按照 CSV 规范来处理,稳得很!
其他细微的差异就不多过多描述了,可以使用 idea 的文件对比功能比较一下看看哈。
新增 CSV 读取策略类 CsvReadStrategy
在 org.apache.seatunnel.connectors.seatunnel.file.source.reader
包下新增 CsvReadStrategy
类,参考 org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy
即可
下面是完整的 CsvReadStrategy
类代码
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.format.csv.CsvDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;import io.airlift.compress.lzo.LzopCodec;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {private DeserializationSchema<SeaTunnelRow> deserializationSchema;private String fieldDelimiter = BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();private DateUtils.Formatter dateFormat = BaseSourceConfigOptions.DATE_FORMAT.defaultValue();private DateTimeUtils.Formatter datetimeFormat =BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();private TimeUtils.Formatter timeFormat = BaseSourceConfigOptions.TIME_FORMAT.defaultValue();private CompressFormat compressFormat = BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();private int[] indexes;@Overridepublic void read(String path, String tableId, Collector<SeaTunnelRow> output)throws FileConnectorException, IOException {Map<String, String> partitionsMap = parsePartitionsByPath(path);InputStream inputStream;switch (compressFormat) {case LZO:LzopCodec lzo = new LzopCodec();inputStream = lzo.createInputStream(hadoopFileSystemProxy.getInputStream(path));break;case NONE:inputStream = hadoopFileSystemProxy.getInputStream(path);break;default:log.warn("Text file does not support this compress type: {}",compressFormat.getCompressCodec());inputStream = hadoopFileSystemProxy.getInputStream(path);break;}char separator =(fieldDelimiter == null || fieldDelimiter.isEmpty())? ',': fieldDelimiter.charAt(0);CSVParser csvParser = new CSVParserBuilder().withSeparator(separator).build();try (CSVReader reader = new CSVReaderBuilder(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).withSkipLines((int) skipHeaderNumber).withCSVParser(csvParser).build()) {Iterator<String[]> iterator = reader.iterator();ObjectMapper objectMapper = new ObjectMapper();while (iterator.hasNext()) {String[] row = iterator.next();if (row.length == 0) {continue;}try {Map<Integer, String> rowMap = new HashMap<>();for (int i = 0; i < row.length; i++) {rowMap.put(i, row[i]);}byte[] bytes = objectMapper.writeValueAsBytes(rowMap);SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(bytes);if (!readColumns.isEmpty()) {// need column projectionObject[] fields;if (isMergePartition) {fields = new Object[readColumns.size() + partitionsMap.size()];} else {fields = new Object[readColumns.size()];}for (int i = 0; i < indexes.length; i++) {fields[i] = seaTunnelRow.getField(indexes[i]);}seaTunnelRow = new SeaTunnelRow(fields);}if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (Exception e) {String errorMsg =String.format("Deserialize this data [%s] failed, please check the origin data",String.join(separator + "", row));throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e);}}}}@Overridepublic SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();this.seaTunnelRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), seaTunnelRowType);initFormatter();if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,"When reading json/text/csv files, if user has not specified schema information, "+ "SeaTunnel will not support column projection");}CsvDeserializationSchema.Builder builder =CsvDeserializationSchema.builder().delimiter(TextFormatConstant.PLACEHOLDER).dateFormatter(dateFormat).dateTimeFormatter(datetimeFormat).timeFormatter(timeFormat);if (isMergePartition) {deserializationSchema =builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();} else {deserializationSchema = builder.seaTunnelRowType(this.seaTunnelRowType).build();}return getActualSeaTunnelRowTypeInfo();}@Overridepublic void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {SeaTunnelRowType userDefinedRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), seaTunnelRowType);Optional<String> fieldDelimiterOptional =ReadonlyConfig.fromConfig(pluginConfig).getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);if (fieldDelimiterOptional.isPresent()) {fieldDelimiter = fieldDelimiterOptional.get();} else {FileFormat fileFormat =FileFormat.valueOf(pluginConfig.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key()).toUpperCase());if (fileFormat == FileFormat.CSV) {fieldDelimiter = ",";}}initFormatter();CsvDeserializationSchema.Builder builder =CsvDeserializationSchema.builder().delimiter(fieldDelimiter).dateFormatter(dateFormat).dateTimeFormatter(datetimeFormat).timeFormatter(timeFormat);if (isMergePartition) {deserializationSchema =builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();} else {deserializationSchema = builder.seaTunnelRowType(seaTunnelRowType).build();}// column projectionif (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {// get the read column index from user-defined row typeindexes = new int[readColumns.size()];String[] fields = new String[readColumns.size()];SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];for (int i = 0; i < indexes.length; i++) {indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));fields[i] = seaTunnelRowType.getFieldName(indexes[i]);types[i] = seaTunnelRowType.getFieldType(indexes[i]);}this.seaTunnelRowType = new SeaTunnelRowType(fields, types);this.seaTunnelRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);} else {this.seaTunnelRowType = seaTunnelRowType;this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;}}private void initFormatter() {if (pluginConfig.hasPath(BaseSourceConfigOptions.DATE_FORMAT.key())) {dateFormat =DateUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.DATE_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.DATETIME_FORMAT.key())) {datetimeFormat =DateTimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.DATETIME_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.TIME_FORMAT.key())) {timeFormat =TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.TIME_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.COMPRESS_CODEC.key())) {String compressCodec =pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());}}
}
和 TextReadStrategy
的区别在于
read
方法:
TextReadStrategy
使用BufferedReader
来分隔行CsvReadStrategy
使用 opencsv 的CSVReader
来分隔行
getSeaTunnelRowTypeInfo
和 setSeaTunnelRowTypeInfo
方法:
TextReadStrategy
使用TextDeserializationSchema.Builder
来获取deserializationSchema
CsvReadStrategy
使用CsvDeserializationSchema.Builder
来获取deserializationSchema
注册 CSV 策略类
最后,将 CsvReadStrategy
写到 org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat
这个枚举中,CSV 的 getReadStrategy
方法,从 new TextReadStrategy()
,改成 new CsvReadStrategy()
即可
最后打包
全局生效
如果想让其他基于 connector-file-base 的插件都生效,那就打包 connector-file-base 模块。
mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am
然后将 connector-file-base
的 JAR 放进 SeaTunnel 部署目录的 Lib 目录下,所有基于 connector-file-base
的插件都会生效。
部分插件生效
想让某些基于 connector-file-base 的插件生效,那就只重新打包那一个插件即可。
mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am
用新 JAR 替换旧 JAR 即可。
其他
打包的时候可能会因为什么原因导致 Maven 的 Spotless 插件报错,试试先跑一下 mvn spotless:apply
,再去跑打包。
本文完!
本文由 白鲸开源科技 提供发布支持!