Debezium OracleDefaultValueConverter 分析
目录
- 1. 概述
- 2. 核心功能
- 3. 类型映射
- 4. 特殊处理
- 5. 最佳实践
- 6. 使用示例
- 7. 总结
1. 概述
OracleDefaultValueConverter 是 Debezium Oracle 连接器中负责处理列默认值转换的核心类。它主要用于将 Oracle 数据库中的列默认值转换为 Kafka Connect 兼容的格式。
2. 核心功能
2.1 默认值解析
通过 parseDefaultValue 方法解析列的默认值:
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {final int dataType = column.jdbcType();final DefaultValueMapper mapper = defaultValueMappers.get(dataType);if (mapper == null) {LOGGER.warn("Mapper for type '{}' not found.", dataType);return Optional.empty();}try {Object rawDefaultValue = mapper.parse(column, defaultValue != null ? defaultValue.trim() : defaultValue);Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);if (convertedDefaultValue instanceof Struct) {// Workaround for KAFKA-12694LOGGER.warn("Struct can't be used as default value for column '{}'", column.name());return Optional.empty();}return Optional.ofNullable(convertedDefaultValue);}catch (Exception e) {LOGGER.warn("Cannot parse column default value '{}' to type '{}'", defaultValue, dataType);return Optional.empty();}
}
2.2 默认值转换
通过 convertDefaultValue 方法将解析后的默认值转换为目标类型:
private Object convertDefaultValue(Object defaultValue, Column column) {if (valueConverters != null && defaultValue != null) {final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column);if (schemaBuilder != null) {final Schema schema = schemaBuilder.build();final Field field = new Field(column.name(), -1, schema);final ValueConverter valueConverter = valueConverters.converter(column, field