Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,

override def isSplitable(sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = false
path: Path): Boolean = {
// NOTE: When we have and only the base file that needs to be read with normal reading mode,
// we can consider the current format to be equivalent to `org.apache.spark.sql.execution.datasources.parquet.ParquetFormat`.
// Naturally, we can maintain the same `isSplitable` logic as the upper-level format.
// This will enable us to take advantage of spark's file splitting capability.
// For overly large single files, we can use multiple concurrent tasks to read them, thereby reducing the overall job reading time consumption
val superSplitable = super.isSplitable(sparkSession, options, path)
val splitable = !isMOR && !isIncremental && !isBootstrap && superSplitable
logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
splitable
}

override def buildReaderWithPartitionValues(spark: SparkSession,
dataStructType: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,5 +911,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}

test("Test Query CoW table with splitable file format") {
withTable(generateTableName) { tableName =>
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (
| type = 'cow'
| )
| partitioned by (dt)
""".stripMargin
)

withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") {
spark.sql(
s"""
| insert overwrite table $tableName partition(dt) values
| (0, 'a0', 10, 1000, '2023-12-06'),
| (1, 'a1', 10, 1000, '2023-12-06'),
| (2, 'a2', 11, 1000, '2023-12-06'),
| (3, 'a3', 10, 1000, '2023-12-06')
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(0, "a0", 10.0, 1000, "2023-12-06"),
Seq(1, "a1", 10.0, 1000, "2023-12-06"),
Seq(2, "a2", 11.0, 1000, "2023-12-06"),
Seq(3, "a3", 10.0, 1000, "2023-12-06")
)
}

// force split file by setting small
withSQLConf(s"${SQLConf.FILES_MAX_PARTITION_BYTES.key}" -> "10240") {
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(0, "a0", 10.0, 1000, "2023-12-06"),
Seq(1, "a1", 10.0, 1000, "2023-12-06"),
Seq(2, "a2", 11.0, 1000, "2023-12-06"),
Seq(3, "a3", 10.0, 1000, "2023-12-06")
)
}
}
}

}

Loading