diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 8a7c06b1d15ce..60b134a5cd378 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -81,7 +81,7 @@ case class HoodieFileIndex(spark: SparkSession, spark = spark, metaClient = metaClient, schemaSpec = schemaSpec, - configProperties = getConfigProperties(spark, options, metaClient), + configProperties = getConfigProperties(spark, options), queryPaths = HoodieFileIndex.getQueryPaths(options), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache @@ -445,7 +445,7 @@ object HoodieFileIndex extends Logging { schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) } } - def getConfigProperties(spark: SparkSession, options: Map[String, String], metaClient: HoodieTableMetaClient) = { + def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { val sqlConf: SQLConf = spark.sessionState.conf val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava) @@ -463,16 +463,6 @@ object HoodieFileIndex extends Logging { if (listingModeOverride != null) { properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) } - val partitionColumns = metaClient.getTableConfig.getPartitionFields - if (partitionColumns.isPresent) { - // NOTE: Multiple partition fields could have non-encoded slashes in the partition value. - // We might not be able to properly parse partition-values from the listed partition-paths. - // Fallback to eager listing in this case. - if (partitionColumns.get().length > 1 - && (listingModeOverride == null || DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY.equals(listingModeOverride))) { - properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER) - } - } properties } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index d1b6df6619da2..c9a69a5210e8a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.internal.schema.Types.RecordType import org.apache.hudi.internal.schema.utils.Conversions -import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.util.JFunction import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging @@ -112,9 +112,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // Note that key generator class name could be null val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) StructType(partitionFields) } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 521fb7f3a5fbf..839b02828d0e9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -86,7 +86,7 @@ class HoodieCDCRDD( private val cdcSupplementalLoggingMode = metaClient.getTableConfig.cdcSupplementalLoggingMode - private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, metaClient) + private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty) protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField) .map { preCombineField => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 1ccb4081fb8ea..a6c9300b7d439 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -38,6 +38,7 @@ import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.testutils.HoodieSparkClientTestBase @@ -325,21 +326,29 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS EqualTo(attribute("dt"), literal("2021/03/01")), EqualTo(attribute("hh"), literal("10")) ) - val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) + // NOTE: That if file-index is in lazy-listing mode and we can't parse partition values, there's no way + // to recover from this since Spark by default have to inject partition values parsed from the partition paths. + if (listingModeOverride == DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY) { + assertThrows(classOf[HoodieException]) { + fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) + } + } else { + val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) - assertEquals(1, partitionAndFilesNoPruning.size) - // The partition prune would not work for this case, so the partition value it - // returns is a InternalRow.empty. - assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0)) - // The returned file size should equal to the whole file size in all the partition paths. - assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"), - partitionAndFilesNoPruning.flatMap(_.files).length) + assertEquals(1, partitionAndFilesNoPruning.size) + // The partition prune would not work for this case, so the partition value it + // returns is a InternalRow.empty. + assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0)) + // The returned file size should equal to the whole file size in all the partition paths. + assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"), + partitionAndFilesNoPruning.flatMap(_.files).length) - val readDF = spark.read.format("hudi").options(readerOpts).load() + val readDF = spark.read.format("hudi").options(readerOpts).load() - assertEquals(10, readDF.count()) - // There are 5 rows in the dt = 2021/03/01 and hh = 10 - assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count()) + assertEquals(10, readDF.count()) + // There are 5 rows in the dt = 2021/03/01 and hh = 10 + assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count()) + } } { @@ -422,7 +431,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilters), Seq.empty) assertEquals(1, partitionAndFilesAfterPrune.size) - assertTrue(fileIndex.areAllPartitionPathsCached()) + assertEquals(fileIndex.areAllPartitionPathsCached(), !complexExpressionPushDown) val PartitionDirectory(partitionActualValues, filesAfterPrune) = partitionAndFilesAfterPrune.head val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 68227ba074ef7..ece1deacd7a25 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -54,7 +54,7 @@ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} @@ -1006,6 +1006,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } + @Disabled("HUDI-6320") @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testSparkPartitionByWithCustomKeyGenerator(recordType: HoodieRecordType): Unit = {