From ff71010ca795cbf6dc731968f8c1b500bd93e624 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Tue, 30 Dec 2025 17:31:26 -0800 Subject: [PATCH] Handle nested map and array columns in MDT --- .../hudi/SparkHoodieTableFileIndex.scala | 4 +- ...HoodieFileGroupReaderBasedFileFormat.scala | 1 + .../hudi/functional/TestCOWDataSource.scala | 65 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index cd79b78ddb816..7b06d7edb6bf7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -33,7 +33,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.Types.RecordType import org.apache.hudi.internal.schema.utils.Conversions -import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -131,6 +131,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + // || classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + // || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType)) StructType(partitionFields) } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 4dd3f66551de5..e52efa5ce4a35 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -214,6 +214,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val exclusionFields = new java.util.HashSet[String]() exclusionFields.add("op") partitionSchema.fields.foreach(f => exclusionFields.add(f.name)) + // QQ: How do we add nested partitioned fields to this without any field id's? val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields) val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructType, sanitizedTableName), exclusionFields) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6473bb3de3431..05948ee693bab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -2453,6 +2453,71 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup writeToHudi(opt, firstUpdateDF, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) }) } + + @Test + def testNestedFieldPartition(): Unit = { + // Define schema with nested_record containing level field + val nestedSchema = StructType(Seq( + StructField("nested_int", IntegerType, nullable = false), + StructField("level", StringType, nullable = false) + )) + + val schema = StructType(Seq( + StructField("key", StringType, nullable = false), + StructField("ts", LongType, nullable = false), + StructField("level", StringType, nullable = false), + StructField("int_field", IntegerType, nullable = false), + StructField("string_field", StringType, nullable = true), + StructField("nested_record", nestedSchema, nullable = true) + )) + + // Create test data where top-level 'level' and 'nested_record.level' have DIFFERENT values + // This helps verify we're correctly partitioning/filtering on the nested field + val records = Seq( + Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), + Row("key2", 2L, "L2", 2, "str2", Row(20, "ERROR")), + Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), + Row("key4", 4L, "L4", 4, "str4", Row(40, "DEBUG")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) + ) + + val inputDF = spark.createDataFrame( + spark.sparkContext.parallelize(records), + schema + ) + + // Write to Hudi partitioned by nested_record.level + inputDF.write.format("hudi") + .option("hoodie.insert.shuffle.parallelism", "4") + .option("hoodie.upsert.shuffle.parallelism", "4") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "nested_record.level") + .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") + .option(HoodieWriteConfig.TBL_NAME.key, "test_nested_partition") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + // Read and filter on nested_record.level = 'INFO' + val results = spark.read.format("hudi") + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + // Expected results - 3 records with nested_record.level = 'INFO' + val expectedResults = Array( + Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), + Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) + ) + + assertEquals(expectedResults.length, results.length) + expectedResults.zip(results).foreach { case (expected, actual) => + assertEquals(expected, actual) + } + } } object TestCOWDataSource {