Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;

import java.io.File;
import java.io.FileReader;
Expand Down Expand Up @@ -94,7 +95,9 @@ public Builder fromProperties(Properties props) {
}

public Builder withPayloadOrderingFields(String payloadOrderingFields) {
payloadConfig.setValue(ORDERING_FIELDS, String.valueOf(payloadOrderingFields));
if (StringUtils.nonEmpty(payloadOrderingFields)) {
payloadConfig.setValue(ORDERING_FIELDS, payloadOrderingFields);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
import static org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
Expand Down Expand Up @@ -191,6 +193,8 @@ private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePro
tablePropsToAdd.put(
ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue(), // // to be fixed once we land PR #13721.
DEBEZIUM_UNAVAILABLE_VALUE);
} else if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
tablePropsToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -42,8 +44,8 @@
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
Expand All @@ -63,6 +65,7 @@
* set hoodie.record.merge.mode=CUSTOM
* set hoodie.record.merge.strategy.id accordingly
* remove any properties with prefix hoodie.record.merge.property.
* fix precombine field value for MySqlDebeziumAvroPayload.
*/
public class NineToEightDowngradeHandler implements DowngradeHandler {
@Override
Expand Down Expand Up @@ -99,7 +102,6 @@ private void reconcileMergeConfigs(Map<ConfigProperty, String> propertiesToAdd,
propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, PAYLOAD_BASED_MERGE_STRATEGY_UUID);
propertiesToAdd.put(RECORD_MERGE_MODE, RecordMergeMode.CUSTOM.name());
}
// don't we need to fix merge strategy Id for OverwriteWithLatestAvroPayload and DefaultHoodieRecordPayload ?
if (legacyPayloadClass.equals(AWSDmsAvroPayload.class.getName())) {
propertiesToRemove.add(
ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY).noDefaultValue());
Expand All @@ -110,6 +112,9 @@ private void reconcileMergeConfigs(Map<ConfigProperty, String> propertiesToAdd,
propertiesToRemove.add(
ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue());
}
if (legacyPayloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
propertiesToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, DebeziumConstants.ADDED_SEQ_COL_NAME);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import static org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
import static org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
Expand Down Expand Up @@ -196,6 +198,13 @@ static Stream<Arguments> payloadClassTestCases() {
COMMIT_TIME_ORDERING.name(),
PartialUpdateMode.IGNORE_DEFAULTS.name(),
"OverwriteNonDefaultsWithLatestAvroPayload"
),
Arguments.of(
MySqlDebeziumAvroPayload.class.getName(),
"",
EVENT_TIME_ORDERING.name(),
null,
"MySqlDebeziumAvroPayload"
)
);
}
Expand Down Expand Up @@ -284,6 +293,10 @@ private void assertPayloadClassChange(Map<ConfigProperty, String> propertiesToAd
assertEquals(
payloadClass,
propertiesToAdd.get(LEGACY_PAYLOAD_CLASS_NAME));
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.PRECOMBINE_FIELDS));
assertEquals(propertiesToAdd.get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -137,7 +138,7 @@ static Stream<Arguments> payloadClassTestCases() {
MySqlDebeziumAvroPayload.class.getName(),
2,
LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key(),
3,
4,
true,
true,
"MySqlDebeziumAvroPayload"
Expand Down Expand Up @@ -185,7 +186,7 @@ static Stream<Arguments> payloadClassTestCases() {
);
}

@ParameterizedTest(name = "testDowngradeFor{5}")
@ParameterizedTest(name = "testDowngradeFor{6}")
@MethodSource("payloadClassTestCases")
void testDowngradeForPayloadClass(String payloadClassName, int expectedPropertiesToRemoveSize, String expectedPropsToRemove,
int expectedPropertiesToAddSize, boolean hasRecordMergeMode,
Expand Down Expand Up @@ -223,6 +224,10 @@ void testDowngradeForPayloadClass(String payloadClassName, int expectedPropertie
assertEquals(PAYLOAD_BASED_MERGE_STRATEGY_UUID,
propertiesToChange.propertiesToUpdate().get(RECORD_MERGE_STRATEGY_ID));
}
if (payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertTrue(propertiesToChange.propertiesToUpdate().containsKey(HoodieTableConfig.PRECOMBINE_FIELDS));
assertEquals(propertiesToChange.propertiesToUpdate().get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), DebeziumConstants.ADDED_SEQ_COL_NAME);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils}
import org.apache.hudi.common.util.{ConfigUtils, OrderingValues, ReflectionUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -121,7 +121,7 @@ object HoodieDatasetBulkInsertHelper
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, config.getPreCombineFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
dedupeRows(prependedRdd, updatedSchema, ConfigUtils.getOrderingFields(config.getProps).toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
} else {
prependedRdd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static String[] getOrderingFields(Properties properties) {
} else if (properties.containsKey(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY)) {
orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
}
return orderField == null ? null : orderField.split(",");
return StringUtils.isNullOrEmpty(orderField) ? null : orderField.split(",");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,6 @@ object DataSourceOptionsHelper {
if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) && tableConfig.getKeyGeneratorClassName != null) {
missingWriteConfigs ++= Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> tableConfig.getKeyGeneratorClassName)
}
if (!params.contains(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()) && tableConfig.getPreCombineFieldsStr.isPresent) {
missingWriteConfigs ++= Map(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> tableConfig.getPreCombineFieldsStr.orElse(null))
}
Comment on lines -1034 to -1036
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was one issue which I was facing. Basically when precombine config is not configured, we add it using table config.
Write config is therefore pointing to older table config but the table config is updated to newer config after upgrade. In order to avoid such a scenario removed addition of write config here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the deprecation of write config in #13718, this should be further streamlined

if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) {
missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.avro.{AvroSchemaCache, HoodieAvroUtils}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.util.{OrderingValues, StringUtils}
import org.apache.hudi.common.util.{ConfigUtils, OrderingValues, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
Expand Down Expand Up @@ -125,7 +125,7 @@ object HoodieCreateRecordUtils {
val consistentLogicalTimestampEnabled = parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
val precombineFields = config.getPreCombineFields()
val precombineFields = ConfigUtils.getOrderingFields(config.getProps)

// handle dropping partition columns
it.map { avroRec =>
Expand All @@ -143,7 +143,7 @@ object HoodieCreateRecordUtils {
avroRecWithoutMeta
}

val hoodieRecord = if (shouldCombine && !precombineFields.isEmpty) {
val hoodieRecord = if (shouldCombine && precombineFields != null && precombineFields.nonEmpty) {
val orderingVal = OrderingValues.create(
precombineFields,
JFunction.toJavaFunction[String, Comparable[_]](
Expand Down
Loading
Loading