Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Determine what level of persistence is used to cache write RDDs. "
+ "Refer to org.apache.spark.storage.StorageLevel for different values");

@Deprecated
public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.precombine.field")
.noDefaultValue()
Expand Down Expand Up @@ -1407,6 +1408,7 @@ public HoodieTableType getTableType() {
HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
}

@Deprecated
public List<String> getPreCombineFields() {
return Option.ofNullable(getString(PRECOMBINE_FIELD_NAME))
.map(preCombine -> Arrays.asList(preCombine.split(",")))
Expand Down Expand Up @@ -3017,6 +3019,7 @@ public Builder forTable(String tableName) {
return this;
}

@Deprecated
public Builder withPreCombineField(String preCombineField) {
writeConfig.setValue(PRECOMBINE_FIELD_NAME, preCombineField);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public UpgradeDowngrade.TableConfigChangeSet upgrade(HoodieWriteConfig config,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
Set<ConfigProperty> tablePropsToRemove = new HashSet<>();
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieTableConfig tableConfig = metaClient.getTableConfig();
// Populate missing index versions indexes
Expand All @@ -117,7 +118,6 @@ public UpgradeDowngrade.TableConfigChangeSet upgrade(HoodieWriteConfig config,
indexMetadataOpt.get(),
metaClient.getTableConfig().getTableVersion());
}
Set<ConfigProperty> tablePropsToRemove = new HashSet<>();
// Handle merge mode config.
reconcileMergeModeConfig(tablePropsToAdd, tableConfig);
// Handle partial update mode config.
Expand All @@ -126,6 +126,8 @@ public UpgradeDowngrade.TableConfigChangeSet upgrade(HoodieWriteConfig config,
reconcileMergePropertiesConfig(tablePropsToAdd, tableConfig);
// Handle payload class configs.
reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
// Handle ordering fields config.
reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd, tablePropsToRemove);
}

Expand Down Expand Up @@ -175,8 +177,7 @@ private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> tableP
}
}

private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePropsToAdd,
HoodieTableConfig tableConfig) {
private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePropsToAdd, HoodieTableConfig tableConfig) {
String payloadClass = tableConfig.getPayloadClass();
String mergeStrategy = tableConfig.getRecordMergeStrategyId();
if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || StringUtils.isNullOrEmpty(payloadClass)) {
Expand All @@ -193,11 +194,22 @@ private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePro
tablePropsToAdd.put(
ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue(), // // to be fixed once we land PR #13721.
DEBEZIUM_UNAVAILABLE_VALUE);
} else if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
tablePropsToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
}
}

private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> tablePropsToAdd,
Set<ConfigProperty> tablePropsToRemove,
HoodieTableConfig tableConfig) {
String payloadClass = tableConfig.getPayloadClass();
Option<String> orderingFieldsOpt = MySqlDebeziumAvroPayload.class.getName().equals(payloadClass)
? Option.of(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME)
: tableConfig.getOrderingFieldsStr();
orderingFieldsOpt.ifPresent(orderingFields -> {
tablePropsToAdd.put(HoodieTableConfig.ORDERING_FIELDS, orderingFields);
tablePropsToRemove.add(HoodieTableConfig.PRECOMBINE_FIELD);
});
}

/**
* Populates missing version attributes in index definitions based on table version.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -81,17 +82,18 @@ public UpgradeDowngrade.TableConfigChangeSet downgrade(HoodieWriteConfig config,
// Update table properties.
Set<ConfigProperty> propertiesToRemove = new HashSet<>();
Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
reconcileMergeConfigs(propertiesToAdd, propertiesToRemove, metaClient);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
reconcileMergeConfigs(propertiesToAdd, propertiesToRemove, tableConfig);
reconcileOrderingFieldsConfig(propertiesToAdd, propertiesToRemove, tableConfig);
return new UpgradeDowngrade.TableConfigChangeSet(propertiesToAdd, propertiesToRemove);
}

private void reconcileMergeConfigs(Map<ConfigProperty, String> propertiesToAdd,
Set<ConfigProperty> propertiesToRemove,
HoodieTableMetaClient metaClient) {
HoodieTableConfig tableConfig) {
// Update table properties.
propertiesToRemove.add(PARTIAL_UPDATE_MODE);
// For specified payload classes, add strategy id and custom merge mode.
HoodieTableConfig tableConfig = metaClient.getTableConfig();
String legacyPayloadClass = tableConfig.getLegacyPayloadClass();
if (!StringUtils.isNullOrEmpty(legacyPayloadClass) && (PAYLOAD_CLASSES_TO_HANDLE.contains(legacyPayloadClass))) {
propertiesToRemove.add(LEGACY_PAYLOAD_CLASS_NAME);
Expand All @@ -112,9 +114,18 @@ private void reconcileMergeConfigs(Map<ConfigProperty, String> propertiesToAdd,
propertiesToRemove.add(
ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue());
}
if (legacyPayloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
propertiesToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, DebeziumConstants.ADDED_SEQ_COL_NAME);
}
}
}

private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> propertiesToAdd,
Set<ConfigProperty> propertiesToRemove,
HoodieTableConfig tableConfig) {
Option<String> orderingFieldsOpt = MySqlDebeziumAvroPayload.class.getName().equals(tableConfig.getLegacyPayloadClass())
? Option.of(DebeziumConstants.ADDED_SEQ_COL_NAME)
: tableConfig.getOrderingFieldsStr();
orderingFieldsOpt.ifPresent(orderingFields -> {
propertiesToAdd.put(HoodieTableConfig.PRECOMBINE_FIELD, orderingFields);
propertiesToRemove.add(HoodieTableConfig.ORDERING_FIELDS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static void upgradePartitionFields(HoodieWriteConfig config, HoodieTableConfig t

static void upgradeMergeMode(HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
String payloadClass = tableConfig.getPayloadClass();
String preCombineFields = tableConfig.getPreCombineFieldsStr().orElse(null);
String orderingFields = tableConfig.getOrderingFieldsStr().orElse(null);
if (isCustomPayloadClass(payloadClass)) {
// This contains a special case: HoodieMetadataPayload.
tablePropsToAdd.put(
Expand All @@ -163,7 +163,7 @@ static void upgradeMergeMode(HoodieTableConfig tableConfig, Map<ConfigProperty,
} else if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd);
} else { // MOR table
if (StringUtils.nonEmpty(preCombineFields)) {
if (StringUtils.nonEmpty(orderingFields)) {
// This contains a special case: OverwriteWithLatestPayload with preCombine field.
tablePropsToAdd.put(
HoodieTableConfig.PAYLOAD_CLASS_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,19 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
if (metaClient.getTableConfig().isMetadataTableAvailable()) {
metaClient = HoodieTableMetaClient.reload(metaClient);
}
// Write out the current version in hoodie.properties.updated file

for (ConfigProperty configProperty : tablePropsToRemove) {
metaClient.getTableConfig().clearValue(configProperty);
}
for (Map.Entry<ConfigProperty, String> entry : tablePropsToAdd.entrySet()) {
// add alternate keys.
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
entry.getKey().getAlternatives().forEach(alternateKey -> {
metaClient.getTableConfig().setValue((String) alternateKey, entry.getValue());
});
}
for (ConfigProperty configProperty : tablePropsToRemove) {
metaClient.getTableConfig().clearValue(configProperty);
}

// Write out the current version in hoodie.properties.updated file
metaClient.getTableConfig().setTableVersion(toVersion);
// Update modified properties.
Set<String> propertiesToRemove =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void setUp() throws IOException {
when(metaClient.getTableConfig()).thenReturn(tableConfig);
when(metaClient.getStorage()).thenReturn(storage);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());

// Use a temp file for index definition path
indexDefPath = new StoragePath(tempDir.resolve("index.json").toString());
Expand Down Expand Up @@ -287,15 +288,20 @@ void testUpgradeWithNoIndexMetadata() {
private void assertPayloadClassChange(Map<ConfigProperty, String> propertiesToAdd,
Set<ConfigProperty> propertiesToRemove,
String payloadClass) {
assertEquals(1, propertiesToRemove.size());
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertEquals(2, propertiesToRemove.size());
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
} else {
assertEquals(1, propertiesToRemove.size());
}
assertTrue(propertiesToRemove.contains(PAYLOAD_CLASS_NAME));
assertTrue(propertiesToAdd.containsKey(LEGACY_PAYLOAD_CLASS_NAME));
assertEquals(
payloadClass,
propertiesToAdd.get(LEGACY_PAYLOAD_CLASS_NAME));
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.PRECOMBINE_FIELDS));
assertEquals(propertiesToAdd.get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.ORDERING_FIELDS));
assertEquals(propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS), FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.ORDERING_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
Expand Down Expand Up @@ -98,6 +99,7 @@ public void setUp() {
when(upgradeDowngradeHelper.getTable(any(), any())).thenReturn(table);
when(table.getMetaClient()).thenReturn(metaClient);
when(metaClient.getTableConfig()).thenReturn(tableConfig);
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
}

static Stream<Arguments> payloadClassTestCases() {
Expand Down Expand Up @@ -136,8 +138,8 @@ static Stream<Arguments> payloadClassTestCases() {
// MySqlDebeziumAvroPayload - requires RECORD_MERGE_MODE and RECORD_MERGE_STRATEGY_ID
Arguments.of(
MySqlDebeziumAvroPayload.class.getName(),
2,
LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key(),
3,
LEGACY_PAYLOAD_CLASS_NAME.key() + "," + ORDERING_FIELDS.key() + "," + PARTIAL_UPDATE_MODE.key() + ",",
4,
true,
true,
Expand Down Expand Up @@ -225,8 +227,8 @@ void testDowngradeForPayloadClass(String payloadClassName, int expectedPropertie
propertiesToChange.propertiesToUpdate().get(RECORD_MERGE_STRATEGY_ID));
}
if (payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertTrue(propertiesToChange.propertiesToUpdate().containsKey(HoodieTableConfig.PRECOMBINE_FIELDS));
assertEquals(propertiesToChange.propertiesToUpdate().get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), DebeziumConstants.ADDED_SEQ_COL_NAME);
assertTrue(propertiesToChange.propertiesToUpdate().containsKey(HoodieTableConfig.PRECOMBINE_FIELD));
assertEquals(propertiesToChange.propertiesToUpdate().get(HoodieTableConfig.PRECOMBINE_FIELD), DebeziumConstants.ADDED_SEQ_COL_NAME);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void testPropertyUpgrade() {
// Mock record merge mode configuration for merging behavior
when(tableConfig.contains(isA(ConfigProperty.class))).thenAnswer(i -> i.getArguments()[0].equals(PAYLOAD_CLASS_NAME));
when(tableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName());
when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.empty());
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd);
assertTrue(tablePropsToAdd.containsKey(RECORD_MERGE_MODE));
assertNotNull(tablePropsToAdd.get(RECORD_MERGE_MODE));
Expand Down Expand Up @@ -140,7 +140,7 @@ void testUpgradeMergeMode(String payloadClass, String preCombineField, String ex
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();

when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.ofNullable(preCombineField));
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.ofNullable(preCombineField));

SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void commitToTable(List<HoodieRecord> recordList, String operation, boole
.setKeyGeneratorType(KeyGeneratorType.SIMPLE.name())
.setRecordKeyFields(writeConfigs.get(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()))
.setPartitionFields(writeConfigs.get(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()))
.setPreCombineFields(writeConfigs.get("hoodie.datasource.write.precombine.field"))
.setOrderingFields(ConfigUtils.getOrderingFieldsStrDuringWrite(writeConfigs))
.setBaseFileFormat(writeConfigs.get(HoodieTableConfig.BASE_FILE_FORMAT.key()))
.set(initConfigs);
if (writeConfigs.containsKey("hoodie.datasource.write.payload.class")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.{ConfigUtils, OrderingValues, ReflectionUtils}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -68,6 +69,7 @@ object HoodieDatasetBulkInsertHelper
*/
def prepareForBulkInsert(df: DataFrame,
config: HoodieWriteConfig,
tableConfig: HoodieTableConfig,
partitioner: BulkInsertPartitioner[Dataset[Row]],
instantTime: String): Dataset[Row] = {
val populateMetaFields = config.populateMetaFields()
Expand Down Expand Up @@ -121,7 +123,7 @@ object HoodieDatasetBulkInsertHelper
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, ConfigUtils.getOrderingFields(config.getProps).toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
} else {
prependedRdd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ private void initRecordMerger(TypedProperties properties, boolean isIngestion) {
// If the provided payload class differs from the table's payload class, we need to infer the correct merging behavior.
if (isIngestion && writerPayloadClass.map(className -> !className.equals(tableConfig.getPayloadClass())).orElse(false)) {
Triple<RecordMergeMode, String, String> triple = HoodieTableConfig.inferMergingConfigsForWrites(null, writerPayloadClass.get(), null,
tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
} else if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple = inferMergingConfigsForPreV9Table(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
mergeStrategyId, tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ default boolean isProjectionCompatible() {
}

/**
* Returns a list of fields required for mor merging. The default implementation will return the recordkey field and the precombine
* Returns a list of fields required for mor merging. The default implementation will return the recordKey field and the ordering fields.
*/
default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConfig cfg, TypedProperties properties) {
ArrayList<String> requiredFields = new ArrayList<>();
Expand All @@ -172,8 +172,8 @@ default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConf
}
}

List<String> preCombineFields = cfg.getPreCombineFields();
requiredFields.addAll(preCombineFields);
List<String> orderingFields = cfg.getOrderingFields();
requiredFields.addAll(orderingFields);
return requiredFields.toArray(new String[0]);
}

Expand Down
Loading
Loading