Apache SeaTunnel 增强对csv读取时分割字段的能力

server/2024/12/22 12:45:29/

概述

本次修改基于 SeaTunnel-2.3.4 版本

在产品的业务里面,需要一个通过 CSV 来同步数据的功能,但 SeaTunnel 的 CSV 读取功能,并没有根据 CSV 规则来分隔字符串,而是简单的用换行符来分隔行,用 String.split 方法来分隔列,一旦列的值里面包含有换行符或者分隔符,结果就会错误。

所以在这里,我们对 Apache SeaTunnel 原来的 CSV 读取功能进行改造,让它真正能够处理 CSV。

开始

CSV 的处理在 connector-file-base 模块,在 org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat 这个枚举可以看到,对于读取 CSV 类型,用的是 org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy

public enum FileFormat implements Serializable {CSV("csv") {@Overridepublic WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {fileSinkConfig.setFieldDelimiter(",");return new TextWriteStrategy(fileSinkConfig);}@Overridepublic ReadStrategy getReadStrategy() {// csv读取策略return new TextReadStrategy();}},......
}

跟踪进 TextReadStrategy 的 read 方法

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy@Overridepublic void read(String path, String tableId, Collector<SeaTunnelRow> output)throws FileConnectorException, IOException {............// 用的是BufferedReader来读取文件流try (BufferedReader reader =new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {// 使用换行符来识别行reader.lines().skip(skipHeaderNumber).forEach(line -> {try {// 这个deserializationSchema是org.apache.seatunnel.format.text.TextDeserializationSchema// 他的deserialize方法,是使用 String.split 方法进行切割SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(line.getBytes());if (!readColumns.isEmpty()) {// need column projectionObject[] fields;if (isMergePartition) {fields =new Object[readColumns.size()+ partitionsMap.size()];} else {fields = new Object[readColumns.size()];}for (int i = 0; i < indexes.length; i++) {fields[i] = seaTunnelRow.getField(indexes[i]);}seaTunnelRow = new SeaTunnelRow(fields);}if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e) {String errorMsg =String.format("Deserialize this data [%s] failed, please check the origin data",line);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}});}}

发现这个策略内部分割行使用的是 BufferedReader.lines 方法,其实就是按换行符来分割,分割字段用的是 String.split 方法按字符分割。

问题就来了,如果我字段的内容里面就包含有换行符,或者字段内容里面就包含有分割符,那么出来的结果就乱了,把不该切的切了。

所以咱们要把这些逻辑进行改造,让它正确的切割字符串。

新增单独的 CSV 处理逻辑

我们要做的是用一种 CSV 专用的处理逻辑来替换 seatunnel 原本的 CSV 处理逻辑

引入 CSV 第三方处理库

在这里我选用的是 opencsv 这个库来处理 CSV 。

connector-file-basepom.xml 里将 opencsv 引入。

<dependency><groupId>com.opencsv</groupId><artifactId>opencsv</artifactId><version>5.7.1</version>
</dependency>

新增 CSV 序列化类 DeserializationSchema

新建一个包 org.apache.seatunnel.connectors.seatunnel.file.format.csv,在下面新建一个 CSV 序列化类 CsvDeserializationSchema 实现 DeserializationSchema 接口(参照 TextDeserializationSchema),下面是 DeserializationSchema 以及它需要用到的 StrToNumberUtil 的完整代码,不想仔细看的小伙伴可以直接复制去用。

新增工具类 org.apache.seatunnel.connectors.seatunnel.file.utils.StrToNumberUtil

package org.apache.seatunnel.connectors.seatunnel.file.utils;import org.apache.commons.lang3.StringUtils;import java.math.BigDecimal;
import java.util.Optional;public class StrToNumberUtil {public static Double str2Double(String str) {if (StringUtils.isBlank(str)) {return null;}try {return Double.parseDouble(str.trim());} catch (Exception e) {return null;}}public static Long str2Long(String str) {if (StringUtils.isBlank(str)) {return null;}str = str.trim();// 多个小数点,不是数字,passif (str.indexOf('.') != str.lastIndexOf('.')) {return null;}// 取整数位String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str;try {return Long.parseLong(sub);} catch (Exception e) {return null;}}public static Byte str2Byte(String s) {return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null);}public static Short str2Short(String s) {return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null);}public static Integer str2Int(String s) {return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null);}public static Float str2Float(String s) {return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null);}public static BigDecimal str2BigDecimal(String s) {if (StringUtils.isBlank(s)) {return null;}try {return new BigDecimal(s.trim());} catch (Exception e) {return null;}}}

新增 CSV 序列化类 org.apache.seatunnel.connectors.seatunnel.file.format.csv.CsvDeserializationSchema

package org.apache.seatunnel.connectors.seatunnel.file.format.csv;import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.utils.StrToNumberUtil;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;import org.apache.commons.lang3.StringUtils;import lombok.NonNull;import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.*;public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {private final SeaTunnelRowType seaTunnelRowType;private final String[] separators;private final DateUtils.Formatter dateFormatter;private final DateTimeUtils.Formatter dateTimeFormatter;private final TimeUtils.Formatter timeFormatter;private CsvDeserializationSchema(@NonNull SeaTunnelRowType seaTunnelRowType,String[] separators,DateUtils.Formatter dateFormatter,DateTimeUtils.Formatter dateTimeFormatter,TimeUtils.Formatter timeFormatter) {this.seaTunnelRowType = seaTunnelRowType;this.separators = separators;this.dateFormatter = dateFormatter;this.dateTimeFormatter = dateTimeFormatter;this.timeFormatter = timeFormatter;}public static Builder builder() {return new Builder();}public static class Builder {private SeaTunnelRowType seaTunnelRowType;private String[] separators = TextFormatConstant.SEPARATOR.clone();private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;private DateTimeUtils.Formatter dateTimeFormatter =DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;private Builder() {}public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {this.seaTunnelRowType = seaTunnelRowType;return this;}public Builder delimiter(String delimiter) {this.separators[0] = delimiter;return this;}public Builder separators(String[] separators) {this.separators = separators;return this;}public Builder dateFormatter(DateUtils.Formatter dateFormatter) {this.dateFormatter = dateFormatter;return this;}public Builder dateTimeFormatter(DateTimeUtils.Formatter dateTimeFormatter) {this.dateTimeFormatter = dateTimeFormatter;return this;}public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {this.timeFormatter = timeFormatter;return this;}public CsvDeserializationSchema build() {return new CsvDeserializationSchema(seaTunnelRowType, separators, dateFormatter, dateTimeFormatter, timeFormatter);}}/*** 传入json字符串,key是列号(从0开始),value是列的值** @param message 传入json字符串,key是列号(从0开始),value是列的值*/@Overridepublic SeaTunnelRow deserialize(byte[] message) throws IOException {String content = new String(message);ObjectMapper objectMapper = new ObjectMapper();com.fasterxml.jackson.databind.type.MapType javaType =objectMapper.getTypeFactory().constructMapType(HashMap.class, Integer.class, String.class);Map<Integer, String> splitsMap = objectMapper.readValue(content, javaType);Object[] objects = new Object[seaTunnelRowType.getTotalFields()];for (int i = 0; i < objects.length; i++) {objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i), 0);}return new SeaTunnelRow(objects);}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return seaTunnelRowType;}private Map<Integer, String> splitLineBySeaTunnelRowType(String line, SeaTunnelRowType seaTunnelRowType, int level) {String[] splits = splitLineWithCsvMethod(line, separators[level].charAt(0));LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();for (int i = 0; i < splits.length; i++) {splitsMap.put(i, splits[i]);}if (fieldTypes.length > splits.length) {// contains partition columnsfor (int i = splits.length; i < fieldTypes.length; i++) {splitsMap.put(i, null);}}return splitsMap;}private String[] splitLineWithCsvMethod(String line, char sep) {CSVParser csvParser = new CSVParserBuilder().withSeparator(sep).build();try (CSVReader reader = new CSVReaderBuilder(new StringReader(line)).withCSVParser(csvParser).build()) {Iterator<String[]> iterator = reader.iterator();if (iterator.hasNext()) {return iterator.next();}return new String[0];} catch (Exception e) {return new String[]{line};}}private Object convert(String field, SeaTunnelDataType<?> fieldType, int level) {if (StringUtils.isBlank(field)) {return null;}switch (fieldType.getSqlType()) {case ARRAY:BasicType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();String[] elements = field.split(separators[level + 1]);ArrayList<Object> objectArrayList = new ArrayList<>();for (String element : elements) {objectArrayList.add(convert(element, elementType, level + 1));}switch (elementType.getSqlType()) {case STRING:return objectArrayList.toArray(new String[0]);case BOOLEAN:return objectArrayList.toArray(new Boolean[0]);case TINYINT:return objectArrayList.toArray(new Byte[0]);case SMALLINT:return objectArrayList.toArray(new Short[0]);case INT:return objectArrayList.toArray(new Integer[0]);case BIGINT:return objectArrayList.toArray(new Long[0]);case FLOAT:return objectArrayList.toArray(new Float[0]);case DOUBLE:return objectArrayList.toArray(new Double[0]);default:throw new SeaTunnelTextFormatException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,String.format("SeaTunnel array not support this data type [%s]",elementType.getSqlType()));}case MAP:SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();LinkedHashMap<Object, Object> objectMap = new LinkedHashMap<>();String[] kvs = field.split(separators[level + 1]);for (String kv : kvs) {String[] splits = kv.split(separators[level + 2]);if (splits.length < 2) {objectMap.put(convert(splits[0], keyType, level + 1), null);} else {objectMap.put(convert(splits[0], keyType, level + 1),convert(splits[1], valueType, level + 1));}}return objectMap;case STRING:return field;case BOOLEAN:return Boolean.parseBoolean(field);case TINYINT:return StrToNumberUtil.str2Byte(field);case SMALLINT:return StrToNumberUtil.str2Short(field);case INT:return StrToNumberUtil.str2Int(field);case BIGINT:return StrToNumberUtil.str2Long(field);case FLOAT:return StrToNumberUtil.str2Float(field);case DOUBLE:return StrToNumberUtil.str2Double(field);case DECIMAL:return StrToNumberUtil.str2BigDecimal(field);case NULL:return null;case BYTES:return field.getBytes();case DATE:return DateUtils.parse(field, dateFormatter);case TIME:return TimeUtils.parse(field, timeFormatter);case TIMESTAMP:return DateTimeUtils.parse(field, dateTimeFormatter);case ROW:Map<Integer, String> splitsMap =splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1);Object[] objects = new Object[splitsMap.size()];for (int i = 0; i < objects.length; i++) {objects[i] =convert(splitsMap.get(i),((SeaTunnelRowType) fieldType).getFieldType(i),level + 1);}return new SeaTunnelRow(objects);default:throw new SeaTunnelTextFormatException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,String.format("SeaTunnel not support this data type [%s]",fieldType.getSqlType()));}}
}

它和 TextDeserializationSchema 的主要区别在 deserializesplitLineBySeaTunnelRowTypeconvert 这三个方法

deserialize 方法:
  • TextDeserializationSchema 接收的是上层读取的一行的字符串转字节,再调用 splitLineBySeaTunnelRowType 方法按分隔符切割,转成 列号->值 的 map,传给 convert 方法。
    • CsvDeserializationSchema 接收的是一个 json 字符串转字节,这个 json 已经是的 key 为列号(从 0 开始),value 为列的值的形式了,将 json 转成 map 之后,直接可以传给 convert 方法
    • convert 方法:在处理 TINYINTSMALLINTINTBIGINTFLOATDOUBLEDECIMAL 类型时,DeserializationSchema 使用了我们自定义的字符串转数值类的工具类(StrToNumberUtil),主要是为了避免用户提供的文件中,数值类型的值格式不符合 java 自带的字符串转数值方法的格式,从而导致异常的情况
splitLineBySeaTunnelRowType 方法:
  • TextDeserializationSchema 直接使用 String.split 根据分隔符进行切割,如果值里面也带有分隔符,切割结果会有问题
  • CsvDeserializationSchema 使用 opencsv 来进行切割,完全按照 CSV 规范来处理,稳得很!

其他细微的差异就不多过多描述了,可以使用 idea 的文件对比功能比较一下看看哈。

新增 CSV 读取策略类 CsvReadStrategy

org.apache.seatunnel.connectors.seatunnel.file.source.reader 包下新增 CsvReadStrategy 类,参考 org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy 即可

下面是完整的 CsvReadStrategy 类代码

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.format.csv.CsvDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;import io.airlift.compress.lzo.LzopCodec;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {private DeserializationSchema<SeaTunnelRow> deserializationSchema;private String fieldDelimiter = BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();private DateUtils.Formatter dateFormat = BaseSourceConfigOptions.DATE_FORMAT.defaultValue();private DateTimeUtils.Formatter datetimeFormat =BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();private TimeUtils.Formatter timeFormat = BaseSourceConfigOptions.TIME_FORMAT.defaultValue();private CompressFormat compressFormat = BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();private int[] indexes;@Overridepublic void read(String path, String tableId, Collector<SeaTunnelRow> output)throws FileConnectorException, IOException {Map<String, String> partitionsMap = parsePartitionsByPath(path);InputStream inputStream;switch (compressFormat) {case LZO:LzopCodec lzo = new LzopCodec();inputStream = lzo.createInputStream(hadoopFileSystemProxy.getInputStream(path));break;case NONE:inputStream = hadoopFileSystemProxy.getInputStream(path);break;default:log.warn("Text file does not support this compress type: {}",compressFormat.getCompressCodec());inputStream = hadoopFileSystemProxy.getInputStream(path);break;}char separator =(fieldDelimiter == null || fieldDelimiter.isEmpty())? ',': fieldDelimiter.charAt(0);CSVParser csvParser = new CSVParserBuilder().withSeparator(separator).build();try (CSVReader reader = new CSVReaderBuilder(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).withSkipLines((int) skipHeaderNumber).withCSVParser(csvParser).build()) {Iterator<String[]> iterator = reader.iterator();ObjectMapper objectMapper = new ObjectMapper();while (iterator.hasNext()) {String[] row = iterator.next();if (row.length == 0) {continue;}try {Map<Integer, String> rowMap = new HashMap<>();for (int i = 0; i < row.length; i++) {rowMap.put(i, row[i]);}byte[] bytes = objectMapper.writeValueAsBytes(rowMap);SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(bytes);if (!readColumns.isEmpty()) {// need column projectionObject[] fields;if (isMergePartition) {fields = new Object[readColumns.size() + partitionsMap.size()];} else {fields = new Object[readColumns.size()];}for (int i = 0; i < indexes.length; i++) {fields[i] = seaTunnelRow.getField(indexes[i]);}seaTunnelRow = new SeaTunnelRow(fields);}if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (Exception e) {String errorMsg =String.format("Deserialize this data [%s] failed, please check the origin data",String.join(separator + "", row));throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e);}}}}@Overridepublic SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();this.seaTunnelRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), seaTunnelRowType);initFormatter();if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,"When reading json/text/csv files, if user has not specified schema information, "+ "SeaTunnel will not support column projection");}CsvDeserializationSchema.Builder builder =CsvDeserializationSchema.builder().delimiter(TextFormatConstant.PLACEHOLDER).dateFormatter(dateFormat).dateTimeFormatter(datetimeFormat).timeFormatter(timeFormat);if (isMergePartition) {deserializationSchema =builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();} else {deserializationSchema = builder.seaTunnelRowType(this.seaTunnelRowType).build();}return getActualSeaTunnelRowTypeInfo();}@Overridepublic void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {SeaTunnelRowType userDefinedRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), seaTunnelRowType);Optional<String> fieldDelimiterOptional =ReadonlyConfig.fromConfig(pluginConfig).getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);if (fieldDelimiterOptional.isPresent()) {fieldDelimiter = fieldDelimiterOptional.get();} else {FileFormat fileFormat =FileFormat.valueOf(pluginConfig.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key()).toUpperCase());if (fileFormat == FileFormat.CSV) {fieldDelimiter = ",";}}initFormatter();CsvDeserializationSchema.Builder builder =CsvDeserializationSchema.builder().delimiter(fieldDelimiter).dateFormatter(dateFormat).dateTimeFormatter(datetimeFormat).timeFormatter(timeFormat);if (isMergePartition) {deserializationSchema =builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();} else {deserializationSchema = builder.seaTunnelRowType(seaTunnelRowType).build();}// column projectionif (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {// get the read column index from user-defined row typeindexes = new int[readColumns.size()];String[] fields = new String[readColumns.size()];SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];for (int i = 0; i < indexes.length; i++) {indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));fields[i] = seaTunnelRowType.getFieldName(indexes[i]);types[i] = seaTunnelRowType.getFieldType(indexes[i]);}this.seaTunnelRowType = new SeaTunnelRowType(fields, types);this.seaTunnelRowTypeWithPartition =mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);} else {this.seaTunnelRowType = seaTunnelRowType;this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;}}private void initFormatter() {if (pluginConfig.hasPath(BaseSourceConfigOptions.DATE_FORMAT.key())) {dateFormat =DateUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.DATE_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.DATETIME_FORMAT.key())) {datetimeFormat =DateTimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.DATETIME_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.TIME_FORMAT.key())) {timeFormat =TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfigOptions.TIME_FORMAT.key()));}if (pluginConfig.hasPath(BaseSourceConfigOptions.COMPRESS_CODEC.key())) {String compressCodec =pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());}}
}

TextReadStrategy 的区别在于

read 方法:

  • TextReadStrategy 使用 BufferedReader 来分隔行
  • CsvReadStrategy 使用 opencsv 的 CSVReader 来分隔行

getSeaTunnelRowTypeInfosetSeaTunnelRowTypeInfo 方法:

  • TextReadStrategy 使用 TextDeserializationSchema.Builder 来获取 deserializationSchema
    • CsvReadStrategy 使用 CsvDeserializationSchema.Builder 来获取 deserializationSchema

注册 CSV 策略类

最后,将 CsvReadStrategy 写到 org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat 这个枚举中,CSV 的 getReadStrategy 方法,从 new TextReadStrategy(),改成 new CsvReadStrategy() 即可

最后打包

全局生效

如果想让其他基于 connector-file-base 的插件都生效,那就打包 connector-file-base 模块。

mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am

然后将 connector-file-base 的 JAR 放进 SeaTunnel 部署目录的 Lib 目录下,所有基于 connector-file-base 的插件都会生效。

部分插件生效

想让某些基于 connector-file-base 的插件生效,那就只重新打包那一个插件即可。

mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am

用新 JAR 替换旧 JAR 即可。

其他

打包的时候可能会因为什么原因导致 Maven 的 Spotless 插件报错,试试先跑一下 mvn spotless:apply,再去跑打包。

本文完!

本文由 白鲸开源科技 提供发布支持!


http://www.ppmy.cn/server/152233.html

相关文章

【序】前端监控:打造高效稳定的用户体验

前端监控&#xff1a;打造高效稳定的用户体验 为什么需要前端监控&#xff1f; 在现代前端开发中&#xff0c;用户体验至关重要。无论是页面性能问题、JavaScript 报错&#xff0c;还是网络请求的失败&#xff0c;都可能影响用户的满意度。前端监控可以帮助我们&#xff1a; …

Android-相对布局RelativeLayout

相对布局在摆放子视图位置时&#xff0c;按照指定的参考系来摆放子视图的位置&#xff0c;默认以屏幕左上角(0,0)位置作为参考系摆放位置 了解一下接下来都会以代码的方式可视化出来 属性 可选值 说明 layout_alignParentTop true/false 是否让控件相对于父容器顶部对齐 …

【Rust自学】4.2. 所有权规则、内存与分配

4.2.0 写在正文之前 在学习了Rust的通用编程概念后&#xff0c;就来到了整个Rust的重中之重——所有权&#xff0c;它跟其他语言都不太一样&#xff0c;很多初学者觉得学起来很难。这个章节就旨在让初学者能够完全掌握这个特性。 本章有三小节&#xff1a; 所有权&#xff1…

弹性裸金属服务器(神龙):助力企业腾飞的云计算“黑科技”

在云计算飞速发展的今天&#xff0c;企业对于计算资源的需求早已不再满足于简单的“够用”&#xff0c;而是追求极致的性能、灵活的伸缩和数据安全的保障。那么&#xff0c;问题来了&#xff1a;如何在性能与弹性之间取得完美的平衡&#xff1f; 答案就是——阿里云弹性裸金属…

负载均衡+LNMP+rsync+NFS+lsync部署流程

负载均衡LNMPNFSrsynclsync部署流程 文章目录 负载均衡LNMPNFSrsynclsync部署流程服务器准备需求配置过程1.nfs服务器配置动态资源公共存储磁盘/data/wordpress2.db01服务器配置存放静态资源的数据库服务3.web两台服务器部署nginxPHP服务4.web两台服务器编写业务配置文件&#…

12_HTML5 Video(视频) --[HTML5 API 学习之旅]

HTML5 引入了 <video> 标签&#xff0c;使得在网页中嵌入和控制视频变得非常简单。<video> 元素允许你直接在 HTML 中指定视频文件&#xff0c;并提供了多种属性和方法来控制视频的播放、暂停、音量等。 基本用法 HTML5 的 <video> 标签让嵌入和控制视频变…

使用 datamodel-code-generator 从 MySQL 生成 Python 模型

使用 datamodel-code-generator 从 MySQL 生成 Python 模型 简介 datamodel-code-generator 是一个强大的工具&#xff0c;可以从多种数据源&#xff08;包括 MySQL&#xff09;自动生成 Python 数据模型。本文将详细介绍如何使用它从 MySQL 数据库生成 Pydantic 模型。 安装…

分布式数据库 OceanBase 的前世今生

文章目录 分布式数据库的开端OceanBase 2022 年度发布会为什么“小就是大”&#xff1f;商业化进程按下“加速键”向国际输出中国技术 OceanBase 2024 年度发布会为什么要做云数据库&#xff1f;2 年服务超 700 客户崭露头角一体化云数据库简化数据栈产品力和生态力是未来制胜关…