diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 4f1fb566d809e..85f7351f4e5a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -184,7 +184,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, override def isSplitable(sparkSession: SparkSession, options: Map[String, String], - path: Path): Boolean = false + path: Path): Boolean = { + // NOTE: When we have and only the base file that needs to be read with normal reading mode, + // we can consider the current format to be equivalent to `org.apache.spark.sql.execution.datasources.parquet.ParquetFormat`. + // Naturally, we can maintain the same `isSplitable` logic as the upper-level format. + // This will enable us to take advantage of spark's file splitting capability. + // For overly large single files, we can use multiple concurrent tasks to read them, thereby reducing the overall job reading time consumption + val superSplitable = super.isSplitable(sparkSession, options, path) + val splitable = !isMOR && !isIncremental && !isBootstrap && superSplitable + logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap") + splitable + } override def buildReaderWithPartitionValues(spark: SparkSession, dataStructType: StructType, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala index b0354b03a1011..28fd39e6022c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala @@ -911,5 +911,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Query CoW table with splitable file format") { + withTable(generateTableName) { tableName => + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties ( + | type = 'cow' + | ) + | partitioned by (dt) + """.stripMargin + ) + + withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") { + spark.sql( + s""" + | insert overwrite table $tableName partition(dt) values + | (0, 'a0', 10, 1000, '2023-12-06'), + | (1, 'a1', 10, 1000, '2023-12-06'), + | (2, 'a2', 11, 1000, '2023-12-06'), + | (3, 'a3', 10, 1000, '2023-12-06') + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(0, "a0", 10.0, 1000, "2023-12-06"), + Seq(1, "a1", 10.0, 1000, "2023-12-06"), + Seq(2, "a2", 11.0, 1000, "2023-12-06"), + Seq(3, "a3", 10.0, 1000, "2023-12-06") + ) + } + + // force split file by setting small + withSQLConf(s"${SQLConf.FILES_MAX_PARTITION_BYTES.key}" -> "10240") { + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(0, "a0", 10.0, 1000, "2023-12-06"), + Seq(1, "a1", 10.0, 1000, "2023-12-06"), + Seq(2, "a2", 11.0, 1000, "2023-12-06"), + Seq(3, "a3", 10.0, 1000, "2023-12-06") + ) + } + } + } + }