Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction

Expand Down Expand Up @@ -131,6 +131,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
// || classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
// || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
val exclusionFields = new java.util.HashSet[String]()
exclusionFields.add("op")
partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
// QQ: How do we add nested partitioned fields to this without any field id's?
val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields)
val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructType, sanitizedTableName), exclusionFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,71 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
writeToHudi(opt, firstUpdateDF, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
})
}

@Test
def testNestedFieldPartition(): Unit = {
// Define schema with nested_record containing level field
val nestedSchema = StructType(Seq(
StructField("nested_int", IntegerType, nullable = false),
StructField("level", StringType, nullable = false)
))

val schema = StructType(Seq(
StructField("key", StringType, nullable = false),
StructField("ts", LongType, nullable = false),
StructField("level", StringType, nullable = false),
StructField("int_field", IntegerType, nullable = false),
StructField("string_field", StringType, nullable = true),
StructField("nested_record", nestedSchema, nullable = true)
))

// Create test data where top-level 'level' and 'nested_record.level' have DIFFERENT values
// This helps verify we're correctly partitioning/filtering on the nested field
val records = Seq(
Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")),
Row("key2", 2L, "L2", 2, "str2", Row(20, "ERROR")),
Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")),
Row("key4", 4L, "L4", 4, "str4", Row(40, "DEBUG")),
Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO"))
)

val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(records),
schema
)

// Write to Hudi partitioned by nested_record.level
inputDF.write.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "4")
.option("hoodie.upsert.shuffle.parallelism", "4")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "nested_record.level")
.option(HoodieTableConfig.ORDERING_FIELDS.key, "ts")
.option(HoodieWriteConfig.TBL_NAME.key, "test_nested_partition")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

// Read and filter on nested_record.level = 'INFO'
val results = spark.read.format("hudi")
.load(basePath)
.filter("nested_record.level = 'INFO'")
.select("key", "ts", "level", "int_field", "string_field", "nested_record")
.orderBy("key")
.collect()

// Expected results - 3 records with nested_record.level = 'INFO'
val expectedResults = Array(
Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")),
Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")),
Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO"))
)

assertEquals(expectedResults.length, results.length)
expectedResults.zip(results).foreach { case (expected, actual) =>
assertEquals(expected, actual)
}
}
}

object TestCOWDataSource {
Expand Down
Loading