从不同资料,可看到四个让人迷惑的 precombine.field 配置项:
-
precombine.field
-
write.precombine.field
-
hoodie.table.precombine.field
-
hoodie.datasource.write.precombine.field
它们是完全相同,还是有什么关系了?
- hoodie.datasource.write.precombine.field
HoodieWriteConfig.java
public class HoodieWriteConfig extends HoodieConfig {public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty.key("hoodie.datasource.write.precombine.field").defaultValue("ts").withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "+ "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");
}
- hoodie.table.precombine.field
HoodieTableConfig.java
/*** Configurations on the Hoodie Table like type of ingestion,* storage formats, hive table name etc Configurations are loaded from hoodie.properties,* these properties are usually set during* initializing a path as hoodie base path and never changes during the lifetime of a hoodie table.** @see HoodieTableMetaClient* @since 0.3.0*/
public class HoodieTableConfig extends HoodieConfig {public static final ConfigProperty<String> PRECOMBINE_FIELD = ConfigProperty.key("hoodie.table.precombine.field").noDefaultValue().withDocumentation("Field used in preCombining before actual write. By default, when two records have the same key value, "+ "the largest value for the precombine field determined by Object.compareTo(..), is picked.");
}
- precombine.field
这个是 FlinkSQL 专用的,不能在 SparkSQL 等上使用,write.precombine.field 也是如此。
FlinkOptions.java
/*** Hoodie Flink config options.** <p>It has the options for Hoodie table read and write. It also defines some utilities.*/
@ConfigClassProperty(name = "Flink Options",groupName = ConfigGroups.Names.FLINK_SQL,description = "Flink jobs using the SQL can be configured through the options in WITH clause."+ " The actual datasource level configs are listed below.")
public class FlinkOptions extends HoodieConfig {public static final String NO_PRE_COMBINE = "no_precombine";public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions.key("precombine.field").stringType().defaultValue("ts")// HoodieWriteConfig.PRECOMBINE_FIELD_NAME 为 hoodie.datasource.write.precombine.field.withFallbackKeys("write.precombine.field", HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()).withDescription("Field used in preCombining before actual write. When two records have the same\n"+ "key value, we will pick the one with the largest value for the precombine field,\n"+ "determined by Object.compareTo(..)");
}
从上面的 precombine.field 定义可以看到,precombine.field 同 write.precombine.field、hoodie.datasource.write.precombine.field 是一样的,最底层用的都是 hoodie.datasource.write.precombine.field 。
- write.precombine.field
完全等同于 precombine.field。
从上面还没看出 hoodie.table.precombine.field 同其它三个有和关系,实际上也是一样的,这从 HoodieTableFactory.java 的实现可以看到。
/*** Hoodie data source/sink factory.*/
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {/*** Supplement the table config options if not specified.*/private void setupTableOptions(String basePath, Configuration conf) {StreamerUtil.getTableConfig(basePath, HadoopConfigurations.getHadoopConf(conf)).ifPresent(tableConfig -> {// HoodieTableConfig.RECORDKEY_FIELDS 为 hoodie.table.recordkey.fields// FlinkOptions.RECORD_KEY_FIELD 为 hoodie.datasource.write.recordkey.fieldif (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)&& !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {conf.setString(FlinkOptions.RECORD_KEY_FIELD, tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));}// HoodieTableConfig.PRECOMBINE_FIELD 为 hoodie.table.precombine.field// FlinkOptions.PRECOMBINE_FIELD 为 precombine.field 和 write.precombine.field、hoodie.datasource.write.precombine.fieldif (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)&& !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {conf.setString(FlinkOptions.PRECOMBINE_FIELD, tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));}if (tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)&& !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));}});}
}
- 总结
precombine.field 和 write.precombine.field 仅限 FLinkSQL 使用。
- 附
HoodieConfig.java
/*** This class deals with {@link ConfigProperty} and provides get/set functionalities.*/
public class HoodieConfig implements Serializable {public <T> String getString(ConfigProperty<T> configProperty) {Option<Object> rawValue = getRawValue(configProperty);return rawValue.map(Object::toString).orElse(null);}private <T> Option<Object> getRawValue(ConfigProperty<T> configProperty) {if (props.containsKey(configProperty.key())) {// 从 key 取到值return Option.ofNullable(props.get(configProperty.key()));}// 从 key 没有取到值,遍历所有的将废弃的 keysfor (String alternative : configProperty.getAlternatives()) {if (props.containsKey(alternative)) {LOG.warn(String.format("The configuration key '%s' has been deprecated "+ "and may be removed in the future. Please use the new key '%s' instead.",alternative, configProperty.key()));return Option.ofNullable(props.get(alternative));}}return Option.empty();}
}