diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 51895751101cc..7a2f2427af817 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1390,6 +1390,10 @@ public boolean shouldAllowMultiWriteOnSameInstant() { return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE); } + public boolean shouldDropPartitionColumns() { + return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS); + } + public String getWriteStatusClassName() { return getString(WRITE_STATUS_CLASS_NAME); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java index 7f6054b229666..0773e8a5a0ae3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; @@ -38,11 +39,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + /** * Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2. */ @@ -124,7 +130,33 @@ public void write(InternalRow row) throws IOException { lastKnownPartitionPath = partitionPath.clone(); } - handle.write(row); + boolean shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns(); + if (shouldDropPartitionColumns) { + // Drop the partition columns from the row + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. Once hudi support for scala versions < 2.12 is + // stopped, can move this to JavaConverters.seqAsJavaList(...) + List partitionCols = JavaConversions.seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig)); + Set partitionIdx = new HashSet(); + for (String col : partitionCols) { + partitionIdx.add(this.structType.fieldIndex(col)); + } + + // Relies on InternalRow::toSeq(...) preserving the column ordering based on the supplied schema + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. + List cols = JavaConversions.seqAsJavaList(row.toSeq(structType)); + int idx = 0; + List newCols = new ArrayList(); + for (Object o : cols) { + if (!partitionIdx.contains(idx)) { + newCols.add(o); + } + idx += 1; + } + InternalRow newRow = InternalRow.fromSeq(JavaConverters.asScalaIteratorConverter(newCols.iterator()).asScala().toSeq()); + handle.write(newRow); + } else { + handle.write(row); + } } catch (Throwable t) { LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); throw t; diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 12e446d7be6e4..75ec069946d21 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper def prepareForBulkInsert(df: DataFrame, config: HoodieWriteConfig, partitioner: BulkInsertPartitioner[Dataset[Row]], - shouldDropPartitionColumns: Boolean, instantTime: String): Dataset[Row] = { val populateMetaFields = config.populateMetaFields() val schema = df.schema @@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery) } - val trimmedDF = if (shouldDropPartitionColumns) { - dropPartitionColumns(updatedDF, config) - } else { - updatedDF - } - val targetParallelism = - deduceShuffleParallelism(trimmedDF, config.getBulkInsertShuffleParallelism) + deduceShuffleParallelism(updatedDF, config.getBulkInsertShuffleParallelism) - partitioner.repartitionRecords(trimmedDF, targetParallelism) + partitioner.repartitionRecords(updatedDF, targetParallelism) } /** @@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper } } - private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = { - val partitionPathFields = getPartitionPathFields(config).toSet - val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) - if (nestedPartitionPathFields.nonEmpty) { - logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields") - } - - val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq - - df.drop(partitionPathCols: _*) - } - private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = { val keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME) val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator] keyGenerator.getPartitionPathFields.asScala } + + def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = { + val partitionPathFields = getPartitionPathFields(config).toSet + val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) + + return (partitionPathFields -- nestedPartitionPathFields).toSeq + } + } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index fb0218137d208..1e20e4ab663da 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -95,8 +95,7 @@ public final HoodieWriteResult execute(Dataset records, boolean isTablePart table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); BulkInsertPartitioner> bulkInsertPartitionerRows = getPartitioner(populateMetaFields, isTablePartitioned); - boolean shouldDropPartitionColumns = writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS()); - Dataset hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime); + Dataset hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, instantTime); preExecute(); HoodieWriteMetadata> result = buildHoodieWriteMetadata(doExecute(hoodieDF, bulkInsertPartitionerRows.arePartitionRecordsSorted())); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 52c4568697832..50ec641c182fc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -128,7 +128,7 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField) List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "0000000001"); + new NonSortPartitionerWithRows(), "0000000001"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -172,7 +172,7 @@ public void testBulkInsertHelperNoMetaFields() { .build(); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -209,7 +209,7 @@ public void testBulkInsertPreCombine(boolean enablePreCombine) { rows.addAll(updates); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), enablePreCombine ? 10 : 15); @@ -313,7 +313,7 @@ public void testNoPropsSet() { Dataset dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -325,7 +325,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -337,7 +337,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 865ca147eb057..38221cc05c7ea 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertNull, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments @@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter { testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields) } +@Test +def testBulkInsertForDropPartitionColumn(): Unit = { + //create a new table + val tableName = "trips_table" + val basePath = "file:///tmp/trips_table" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-M", 27.70, "san_francisco"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", "driver-P", 34.15, "sao_paulo"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", "driver-T", 17.85, "chennai")); + + var inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city"). + option(HoodieWriteConfig.TABLE_NAME, tableName). + option("hoodie.datasource.write.recordkey.field", "uuid"). + option("hoodie.datasource.write.precombine.field", "rider"). + option("hoodie.datasource.write.operation", "bulk_insert"). + option("hoodie.datasource.write.hive_style_partitioning", "true"). + option("hoodie.populate.meta.fields", "false"). + option("hoodie.datasource.write.drop.partition.columns", "true"). + mode(SaveMode.Overwrite). + save(basePath) + + // Ensure the partition column (i.e 'city') can be read back + val tripsDF = spark.read.format("hudi").load(basePath) + tripsDF.show() + tripsDF.select("city").foreach(row => { + assertNotNull(row) + }) + + // Peek into the raw parquet file and ensure partition column is not written to the file + val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo") + val partitionPaths = new Array[String](3) + for (i <- partitionPaths.indices) { + partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i)) + } + val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), partitionPaths(1), partitionPaths(2)) + rawFileDf.show() + rawFileDf.select("city").foreach(row => { + assertNull(row.get(0)) + }) +} + /** * Test case for disable and enable meta fields. */