Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes")
val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"max #files: $maxFileNumInPartition")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
Expand Down Expand Up @@ -173,7 +175,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
if (currentSize + file.length > maxSplitBytes ||
currentFiles.length >= maxFileNumInPartition) {
closePartition()
addFile(file)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ object SQLConf {
doc = "The maximum number of bytes to pack into a single partition when reading files.",
isPublic = true)

val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
defaultValue = Some(32),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the default determined ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I have no reason to set this default value.

doc = "The maximum number of files to pack into a single partition when reading files.",
isPublic = true)

val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
defaultValue = Some(true),
doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
Expand Down Expand Up @@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)

def useCompression: Boolean = getConf(COMPRESS_CACHED)

def useFileScan: Boolean = getConf(USE_FILE_SCAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}

test("Unpartitioned table, many files that get split") {
val table =
createTable(
files = Seq(
"file1" -> 2,
"file2" -> 2,
"file3" -> 1,
"file4" -> 1,
"file5" -> 1,
"file6" -> 1))

withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
assert(partitions(3).files.size == 1, "when checking partition 4")

// First partition reads (file1)
assert(partitions(0).files(0).start == 0)
assert(partitions(0).files(0).length == 2)

// Second partition reads (file2, file3)
assert(partitions(1).files(0).start == 0)
assert(partitions(1).files(0).length == 2)
assert(partitions(1).files(1).start == 0)
assert(partitions(1).files(1).length == 1)

// Third partition reads (file4, file5)
assert(partitions(2).files(0).start == 0)
assert(partitions(2).files(0).length == 1)
assert(partitions(2).files(1).start == 0)
assert(partitions(2).files(1).length == 1)

// Final partition reads (file6)
assert(partitions(3).files(0).start == 0)
assert(partitions(3).files(0).length == 1)
}

checkPartitionSchema(StructType(Nil))
checkDataSchema(StructType(Nil).add("c1", IntegerType))
}
}

test("partitioned table") {
val table =
createTable(
Expand Down