Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options, metaClient),
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
Expand Down Expand Up @@ -445,7 +445,7 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) }
}

def getConfigProperties(spark: SparkSession, options: Map[String, String], metaClient: HoodieTableMetaClient) = {
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava)

Expand All @@ -463,16 +463,6 @@ object HoodieFileIndex extends Logging {
if (listingModeOverride != null) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
}
val partitionColumns = metaClient.getTableConfig.getPartitionFields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice,and in our condition only merge this code would work well #9862 then we would have more test for it

if (partitionColumns.isPresent) {
// NOTE: Multiple partition fields could have non-encoded slashes in the partition value.
// We might not be able to properly parse partition-values from the listed partition-paths.
// Fallback to eager listing in this case.
if (partitionColumns.get().length > 1
&& (listingModeOverride == null || DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY.equals(listingModeOverride))) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER)
}
}

properties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -112,9 +112,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also fix HUDI-6914

val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class HoodieCDCRDD(

private val cdcSupplementalLoggingMode = metaClient.getTableConfig.cdcSupplementalLoggingMode

private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, metaClient)
private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)

protected val payloadProps: Properties = Option(metaClient.getTableConfig.getPreCombineField)
.map { preCombineField =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieSparkClientTestBase
Expand Down Expand Up @@ -325,21 +326,29 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
EqualTo(attribute("dt"), literal("2021/03/01")),
EqualTo(attribute("hh"), literal("10"))
)
val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
// NOTE: That if file-index is in lazy-listing mode and we can't parse partition values, there's no way
// to recover from this since Spark by default have to inject partition values parsed from the partition paths.
if (listingModeOverride == DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY) {
assertThrows(classOf[HoodieException]) {
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
}
} else {
val partitionAndFilesNoPruning = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)

assertEquals(1, partitionAndFilesNoPruning.size)
// The partition prune would not work for this case, so the partition value it
// returns is a InternalRow.empty.
assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
// The returned file size should equal to the whole file size in all the partition paths.
assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
partitionAndFilesNoPruning.flatMap(_.files).length)
assertEquals(1, partitionAndFilesNoPruning.size)
// The partition prune would not work for this case, so the partition value it
// returns is a InternalRow.empty.
assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
// The returned file size should equal to the whole file size in all the partition paths.
assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
partitionAndFilesNoPruning.flatMap(_.files).length)

val readDF = spark.read.format("hudi").options(readerOpts).load()
val readDF = spark.read.format("hudi").options(readerOpts).load()

assertEquals(10, readDF.count())
// There are 5 rows in the dt = 2021/03/01 and hh = 10
assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
assertEquals(10, readDF.count())
// There are 5 rows in the dt = 2021/03/01 and hh = 10
assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
}
}

{
Expand Down Expand Up @@ -422,7 +431,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
assertEquals(1, partitionAndFilesAfterPrune.size)

assertTrue(fileIndex.areAllPartitionPathsCached())
assertEquals(fileIndex.areAllPartitionPathsCached(), !complexExpressionPushDown)

val PartitionDirectory(partitionActualValues, filesAfterPrune) = partitionAndFilesAfterPrune.head
val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}

Expand Down Expand Up @@ -1006,6 +1006,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
}
}

@Disabled("HUDI-6320")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will be useful when HUDI-6320 is fixed properly.

@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testSparkPartitionByWithCustomKeyGenerator(recordType: HoodieRecordType): Unit = {
Expand Down