diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java index c5f8f408be310..445f8f678921e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.StringUtils; import java.io.File; import java.io.FileReader; @@ -94,7 +95,9 @@ public Builder fromProperties(Properties props) { } public Builder withPayloadOrderingFields(String payloadOrderingFields) { - payloadConfig.setValue(ORDERING_FIELDS, String.valueOf(payloadOrderingFields)); + if (StringUtils.nonEmpty(payloadOrderingFields)) { + payloadConfig.setValue(ORDERING_FIELDS, payloadOrderingFields); + } return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java index ac4609983427b..472b4157d313b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java @@ -46,6 +46,8 @@ import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME; import static org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID; import static org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID; import static org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID; @@ -191,6 +193,8 @@ private void reconcileMergePropertiesConfig(Map tablePro tablePropsToAdd.put( ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue(), // // to be fixed once we land PR #13721. DEBEZIUM_UNAVAILABLE_VALUE); + } else if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) { + tablePropsToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java index 730cdc84d4008..2936a158bc507 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java @@ -25,10 +25,12 @@ import org.apache.hudi.common.model.AWSDmsAvroPayload; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.debezium.DebeziumConstants; +import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -42,8 +44,8 @@ import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER; import static org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID; import static org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME; -import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE; import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE; import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME; import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE; import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX; @@ -63,6 +65,7 @@ * set hoodie.record.merge.mode=CUSTOM * set hoodie.record.merge.strategy.id accordingly * remove any properties with prefix hoodie.record.merge.property. + * fix precombine field value for MySqlDebeziumAvroPayload. */ public class NineToEightDowngradeHandler implements DowngradeHandler { @Override @@ -99,7 +102,6 @@ private void reconcileMergeConfigs(Map propertiesToAdd, propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, PAYLOAD_BASED_MERGE_STRATEGY_UUID); propertiesToAdd.put(RECORD_MERGE_MODE, RecordMergeMode.CUSTOM.name()); } - // don't we need to fix merge strategy Id for OverwriteWithLatestAvroPayload and DefaultHoodieRecordPayload ? if (legacyPayloadClass.equals(AWSDmsAvroPayload.class.getName())) { propertiesToRemove.add( ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY).noDefaultValue()); @@ -110,6 +112,9 @@ private void reconcileMergeConfigs(Map propertiesToAdd, propertiesToRemove.add( ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue()); } + if (legacyPayloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) { + propertiesToAdd.put(HoodieTableConfig.PRECOMBINE_FIELDS, DebeziumConstants.ADDED_SEQ_COL_NAME); + } } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java index 4242730824130..a61f2b1e4ca72 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java @@ -67,6 +67,8 @@ import static org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME; import static org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE; import static org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME; import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE; @@ -196,6 +198,13 @@ static Stream payloadClassTestCases() { COMMIT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), "OverwriteNonDefaultsWithLatestAvroPayload" + ), + Arguments.of( + MySqlDebeziumAvroPayload.class.getName(), + "", + EVENT_TIME_ORDERING.name(), + null, + "MySqlDebeziumAvroPayload" ) ); } @@ -284,6 +293,10 @@ private void assertPayloadClassChange(Map propertiesToAd assertEquals( payloadClass, propertiesToAdd.get(LEGACY_PAYLOAD_CLASS_NAME)); + if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) { + assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.PRECOMBINE_FIELDS)); + assertEquals(propertiesToAdd.get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME); + } } @Test diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java index 0e37b9c8a717e..9971985aa5f5c 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.common.model.debezium.DebeziumConstants; import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -137,7 +138,7 @@ static Stream payloadClassTestCases() { MySqlDebeziumAvroPayload.class.getName(), 2, LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key(), - 3, + 4, true, true, "MySqlDebeziumAvroPayload" @@ -185,7 +186,7 @@ static Stream payloadClassTestCases() { ); } - @ParameterizedTest(name = "testDowngradeFor{5}") + @ParameterizedTest(name = "testDowngradeFor{6}") @MethodSource("payloadClassTestCases") void testDowngradeForPayloadClass(String payloadClassName, int expectedPropertiesToRemoveSize, String expectedPropsToRemove, int expectedPropertiesToAddSize, boolean hasRecordMergeMode, @@ -223,6 +224,10 @@ void testDowngradeForPayloadClass(String payloadClassName, int expectedPropertie assertEquals(PAYLOAD_BASED_MERGE_STRATEGY_UUID, propertiesToChange.propertiesToUpdate().get(RECORD_MERGE_STRATEGY_ID)); } + if (payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) { + assertTrue(propertiesToChange.propertiesToUpdate().containsKey(HoodieTableConfig.PRECOMBINE_FIELDS)); + assertEquals(propertiesToChange.propertiesToUpdate().get(HoodieTableConfig.PRECOMBINE_FIELDS).toString(), DebeziumConstants.ADDED_SEQ_COL_NAME); + } } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index e2f6cba93bde5..3f8e832abe00a 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.engine.TaskContextSupplier import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils} +import org.apache.hudi.common.util.{ConfigUtils, OrderingValues, ReflectionUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.exception.HoodieException @@ -121,7 +121,7 @@ object HoodieDatasetBulkInsertHelper } val dedupedRdd = if (config.shouldCombineBeforeInsert) { - dedupeRows(prependedRdd, updatedSchema, config.getPreCombineFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) + dedupeRows(prependedRdd, updatedSchema, ConfigUtils.getOrderingFields(config.getProps).toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) } else { prependedRdd } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java index 70cf9cb376b2d..d03182a8dcd1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java @@ -90,7 +90,7 @@ public static String[] getOrderingFields(Properties properties) { } else if (properties.containsKey(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY)) { orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); } - return orderField == null ? null : orderField.split(","); + return StringUtils.isNullOrEmpty(orderField) ? null : orderField.split(","); } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index f1ab811b07673..6e2177f539fb8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -1031,9 +1031,6 @@ object DataSourceOptionsHelper { if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) && tableConfig.getKeyGeneratorClassName != null) { missingWriteConfigs ++= Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> tableConfig.getKeyGeneratorClassName) } - if (!params.contains(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()) && tableConfig.getPreCombineFieldsStr.isPresent) { - missingWriteConfigs ++= Map(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> tableConfig.getPreCombineFieldsStr.orElse(null)) - } if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) { missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 5466a790772f9..1a39c8e083279 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -23,7 +23,7 @@ import org.apache.hudi.avro.{AvroSchemaCache, HoodieAvroUtils} import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ -import org.apache.hudi.common.util.{OrderingValues, StringUtils} +import org.apache.hudi.common.util.{ConfigUtils, OrderingValues, StringUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface} import org.apache.hudi.keygen.constant.KeyGeneratorOptions @@ -125,7 +125,7 @@ object HoodieCreateRecordUtils { val consistentLogicalTimestampEnabled = parameters.getOrElse( DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean - val precombineFields = config.getPreCombineFields() + val precombineFields = ConfigUtils.getOrderingFields(config.getProps) // handle dropping partition columns it.map { avroRec => @@ -143,7 +143,7 @@ object HoodieCreateRecordUtils { avroRecWithoutMeta } - val hoodieRecord = if (shouldCombine && !precombineFields.isEmpty) { + val hoodieRecord = if (shouldCombine && precombineFields != null && precombineFields.nonEmpty) { val orderingVal = OrderingValues.create( precombineFields, JFunction.toJavaFunction[String, Comparable[_]]( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala index 2839646250472..2202a63f203e1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala @@ -19,23 +19,27 @@ package org.apache.hudi.functional -import org.apache.hudi.DataSourceWriteOptions.{INSERT_OVERWRITE_OPERATION_OPT_VAL, PARTITIONPATH_FIELD, PAYLOAD_CLASS_NAME, RECORD_MERGE_IMPL_CLASSES, TABLE_TYPE, UPSERT_OPERATION_OPT_VAL} -import org.apache.hudi.common.config.{HoodieStorageConfig, RecordMergeMode} +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.{INSERT_OVERWRITE_OPERATION_OPT_VAL, OPERATION, PARTITIONPATH_FIELD, PAYLOAD_CLASS_NAME, RECORD_MERGE_IMPL_CLASSES, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig, RecordMergeMode} import org.apache.hudi.common.model.{AWSDmsAvroPayload, EventTimeAvroPayload, HoodieRecordMerger, HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, PartialUpdateAvroPayload} import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, DELETE_MARKER} -import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload +import org.apache.hudi.common.model.debezium.{DebeziumConstants, MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion, PartialUpdateMode} import org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, PARTIAL_UPDATE_UNAVAILABLE_VALUE, RECORD_MERGE_PROPERTY_PREFIX} import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade} import org.apache.spark.sql.SaveMode -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkAnswer +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} class TestEightToNineUpgrade extends RecordLevelIndexTestBase { + @ParameterizedTest @MethodSource(Array("payloadConfigs")) def testUpgradeDowngradeBetweenEightAndNine(tableType: HoodieTableType, @@ -44,7 +48,7 @@ class TestEightToNineUpgrade extends RecordLevelIndexTestBase { val mergerClasses = "org.apache.hudi.DefaultSparkRecordMerger," + "org.apache.hudi.OverwriteWithLatestSparkRecordMerger," + "org.apache.hudi.common.model.HoodieAvroRecordMerger" - var hudiOpts= commonOpts ++ Map( + var hudiOpts = commonOpts ++ Map( TABLE_TYPE.key -> tableType.name(), PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClass, @@ -107,6 +111,92 @@ class TestEightToNineUpgrade extends RecordLevelIndexTestBase { checkResultForVersion8(payloadClass) } + @Test + def testUpgradeDowngradeMySqlDebeziumPayload(): Unit = { + val payloadClass = classOf[MySqlDebeziumAvroPayload].getName + var opts: Map[String, String] = Map( + HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClass, + HoodieMetadataConfig.ENABLE.key() -> "false" + ) + val columns = Seq("ts", "key", "rider", "driver", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, + DebeziumConstants.ADDED_SEQ_COL_NAME) + + // 1. Add an insert. + val data = Seq( + (10, "1", "rider-A", "driver-A", 1, 1, "1.1"), + (10, "2", "rider-B", "driver-B", 2, 5, "2.5"), + (10, "3", "rider-C", "driver-C", 3, 10, "3.10"), + (10, "4", "rider-D", "driver-D", 4, 8, "4.8"), + (10, "5", "rider-E", "driver-E", 5, 4, "5.4")) + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(RECORDKEY_FIELD.key(), "key"). + option(TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()). + option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), DebeziumConstants.ADDED_SEQ_COL_NAME). + option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). + option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"). + option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8"). + options(opts). + mode(SaveMode.Overwrite). + save(basePath) + checkResultForVersion8(payloadClass) + + // 2. Add an update and upgrade the table to v9 + // first two records with larger ordering values based on debezium payload + // last two records with smaller ordering values based on debezium payload, below updates should be rejected + var updateData = Seq( + (9, "1", "rider-X", "driver-X", 1, 2, "1.2"), + (9, "2", "rider-Y", "driver-Y", 3, 2, "3.2"), + (9, "3", "rider-C", "driver-C", 2, 10, "2.10"), + (9, "4", "rider-D", "driver-D", 4, 7, "4.7") + ) + var update = spark.createDataFrame(updateData).toDF(columns: _*) + update.write.format("hudi"). + option(OPERATION.key(), "upsert"). + option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"). + mode(SaveMode.Append). + save(basePath) + checkResultForVersion9("", payloadClass) + + // Downgrade to table version 8 explicitly. + // Note that downgrade is NOT automatic. + // It has to be triggered explicitly. + opts = opts ++ Map(HoodieWriteConfig.WRITE_TABLE_VERSION.key -> "8") + new UpgradeDowngrade(metaClient, getWriteConfig(opts), context, SparkUpgradeDowngradeHelper.getInstance) + .run(HoodieTableVersion.EIGHT, null) + + // 3. Add an update after downgrade and validate the data + // first two records with larger ordering values based on debezium payload + // last two records with smaller ordering values based on debezium payload, below updates should be rejected + updateData = Seq( + (8, "1", "rider-X", "driver-X", 1, 3, "1.3"), + (8, "2", "rider-Y", "driver-Y", 4, 2, "4.2"), + (8, "3", "rider-C", "driver-C", 2, 10, "2.10"), + (8, "4", "rider-D", "driver-D", 4, 7, "4.7") + ) + update = spark.createDataFrame(updateData).toDF(columns: _*) + update.write.format("hudi"). + option(OPERATION.key(), "upsert"). + option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"). + option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8"). + mode(SaveMode.Append). + save(basePath) + checkResultForVersion8(payloadClass) + + tableName = "testUpgradeDowngradeMySqlDebeziumPayload" + spark.sql(s"create table testUpgradeDowngradeMySqlDebeziumPayload using hudi location '$basePath'") + checkAnswer(spark, s"select ts, key, rider, driver, ${DebeziumConstants.FLATTENED_FILE_COL_NAME}, ${DebeziumConstants.FLATTENED_POS_COL_NAME}," + + s" ${DebeziumConstants.ADDED_SEQ_COL_NAME} from default.$tableName")( + Seq(8, "1", "rider-X", "driver-X", 1, 3, "1.3"), + Seq(8, "2", "rider-Y", "driver-Y", 4, 2, "4.2"), + Seq(10, "3", "rider-C", "driver-C", 3, 10, "3.10"), + Seq(10, "4", "rider-D", "driver-D", 4, 8, "4.8"), + Seq(10, "5", "rider-E", "driver-E", 5, 4, "5.4") + ) + + spark.sql(s"drop table default.$tableName") + } + def checkResultForVersion8(payloadClass: String): Unit = { metaClient = HoodieTableMetaClient.reload(metaClient) assertEquals(HoodieTableVersion.EIGHT, metaClient.getTableConfig.getTableVersion) @@ -120,6 +210,10 @@ class TestEightToNineUpgrade extends RecordLevelIndexTestBase { // The merge mode should be CUSTOM. assertEquals(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID, metaClient.getTableConfig.getRecordMergeStrategyId) } + if (payloadClass.equals(classOf[MySqlDebeziumAvroPayload].getName)) { + assertFalse(metaClient.getTableConfig.getPreCombineFieldsStr.isEmpty) + assertEquals(DebeziumConstants.ADDED_SEQ_COL_NAME, metaClient.getTableConfig.getPreCombineFieldsStr.get()) + } } def checkResultForVersion9(partitionFields: String, payloadClass: String): Unit = { @@ -160,6 +254,14 @@ class TestEightToNineUpgrade extends RecordLevelIndexTestBase { assertEquals(AWSDmsAvroPayload.OP_FIELD, deleteField) val deleteMarker = metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${DELETE_MARKER}") assertEquals(AWSDmsAvroPayload.DELETE_OPERATION_VALUE, deleteMarker) + } else if (payloadClass.equals(classOf[MySqlDebeziumAvroPayload].getName)) { + assertEquals( + HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, + metaClient.getTableConfig.getRecordMergeStrategyId) + assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, metaClient.getTableConfig.getRecordMergeMode) + assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty) + assertEquals(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME, + metaClient.getTableConfig.getPreCombineFieldsStr.get()) } else { assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala index b99c1f4159d63..376d8f069af8c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala @@ -23,7 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{AWSDmsAvroPayload, DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload} -import org.apache.hudi.common.model.debezium.{MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload} +import org.apache.hudi.common.model.debezium.{DebeziumConstants, MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -48,14 +48,14 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { expectedConfigs: Map[String, String]): Unit = { val opts: Map[String, String] = Map( HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz) - val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq") + val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME) // 1. Add an insert. val data = Seq( - (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1"), - (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1"), - (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1"), - (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1"), - (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1")) + (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1), + (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1), + (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1), + (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1), + (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) val precombineField = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) { "_event_seq" @@ -83,8 +83,8 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent) // 2. Add an update. val firstUpdateData = Seq( - (11, 1L, "rider-X", "driver-X", 19.10, "D", "11.1"), - (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1")) + (11, 1L, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1), + (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1)) val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*) firstUpdate.write.format("hudi"). option(OPERATION.key(), "upsert"). @@ -93,15 +93,15 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { mode(SaveMode.Append). save(basePath) // Validate table version. - metaClient = HoodieTableMetaClient.reload(metaClient); + metaClient = HoodieTableMetaClient.reload(metaClient) assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode()) val firstUpdateInstantTime = metaClient.getActiveTimeline.getInstants.get(1).requestedTime() // 3. Add an update. This is expected to trigger the upgrade val secondUpdateData = Seq( - (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1"), - (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1"), - (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1")) + (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1), + (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1), + (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1)) val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdate.write.format("hudi"). option(OPERATION.key(), "upsert"). @@ -118,7 +118,7 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { // 4. Add a trivial update to trigger payload class mismatch. val thirdUpdateData = Seq( - (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1")) + (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1)) val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*) if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) { assertThrows[HoodieException] { @@ -145,7 +145,8 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { } // Validate snapshot query. val df = spark.read.format("hudi").load(basePath) - val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq").sort("_event_lsn") + val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME) + .sort("_event_lsn") val expectedData = getExpectedResultForSnapshotQuery(payloadClazz) val expectedDf = spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns: _*).sort("_event_lsn") expectedDf.show(false) @@ -154,7 +155,8 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { // Validate time travel query. val timeTravelDf = spark.read.format("hudi") .option("as.of.instant", firstUpdateInstantTime).load(basePath) - .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq").sort("_event_lsn") + .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME) + .sort("_event_lsn") timeTravelDf.show(false) val expectedTimeTravelData = getExpectedResultForTimeTravelQuery(payloadClazz) val expectedTimeTravelDf = spark.createDataFrame( @@ -174,7 +176,7 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { .build() } - def getExpectedResultForSnapshotQuery(payloadClazz: String): Seq[(Int, Long, String, String, Double, String, String)] = { + def getExpectedResultForSnapshotQuery(payloadClazz: String): Seq[(Int, Long, String, String, Double, String, String, Int, Int)] = { if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) { if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName) || payloadClazz.equals(classOf[EventTimeAvroPayload].getName) @@ -182,49 +184,47 @@ class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness { || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName) || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) { Seq( - (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"), - (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"), - (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"), - (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"), - (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1")) + (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1), + (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1), + (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1), + (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1), + (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1)) } else { Seq( - (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"), - (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"), - (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"), - (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"), - (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1")) + (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1), + (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1), + (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1), + (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1), + (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1)) } } else { Seq( - (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"), - (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"), - (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"), - (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1")) + (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1), + (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1), + (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1), + (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1)) } } def getExpectedResultForTimeTravelQuery(payloadClazz: String): - Seq[(Int, Long, String, String, Double, String, String)] = { + Seq[(Int, Long, String, String, Double, String, String, Int, Int)] = { if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) { Seq( - (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"), - (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"), - (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"), - (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"), - (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1")) + (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1), + (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1), + (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1), + (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1), + (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1)) } else { Seq( - (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"), - (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"), - (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"), - (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1")) + (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1), + (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1), + (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1), + (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1)) } } } -// TODO: Add COPY_ON_WRITE table type tests when write path is updated accordingly. -// TODO: Add Test for MySqlDebeziumAvroPayload. object TestPayloadDeprecationFlow { def providePayloadClassTestCases(): java.util.List[Arguments] = { java.util.Arrays.asList( @@ -262,6 +262,14 @@ object TestPayloadDeprecationFlow { HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS", HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> "__debezium_unavailable_value"), + Arguments.of( + "COPY_ON_WRITE", + classOf[MySqlDebeziumAvroPayload].getName, + Map( + HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING", + HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, + HoodieTableConfig.PRECOMBINE_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))), Arguments.of( "COPY_ON_WRITE", classOf[AWSDmsAvroPayload].getName, @@ -324,6 +332,14 @@ object TestPayloadDeprecationFlow { HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS", HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> "__debezium_unavailable_value"), + Arguments.of( + "MERGE_ON_READ", + classOf[MySqlDebeziumAvroPayload].getName, + Map( + HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING", + HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, + HoodieTableConfig.PRECOMBINE_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))), Arguments.of( "MERGE_ON_READ", classOf[AWSDmsAvroPayload].getName, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index 8435028c154bd..5f857c575e4d8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -35,7 +35,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex import org.apache.hudi.storage.{HoodieStorage, StoragePath} import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest} -import org.apache.hudi.testutils.HoodieSparkClientTestHarness import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf @@ -48,6 +47,7 @@ import org.joda.time.DateTimeZone import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} +import org.scalatest.Assertions.assertResult import org.slf4j.LoggerFactory import java.io.File @@ -133,7 +133,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } protected def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = { - assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(sql).collect().sortBy(_.toString())) + HoodieSparkSqlTestBase.checkAnswer(spark, sql)(expects: _*) } protected def checkAnswer(array: Array[Row])(expects: Seq[Any]*): Unit = { @@ -435,6 +435,10 @@ object HoodieSparkSqlTestBase { storage.createNewFile(filePath) } + def checkAnswer(spark: SparkSession, sql: String)(expects: Seq[Any]*): Unit = { + assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(sql).collect().sortBy(_.toString())) + } + def validateDeleteLogBlockPrecombineNullOrZero(basePath: String): Unit = { val (metaClient, fsView) = getMetaClientAndFileSystemView(basePath) val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("").findFirst()