Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)

val HIVE_TABLE_SCAN_MAX_PARALLELISM = buildConf("spark.sql.hive.tableScan.maxParallelism")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really useful? The parallelism should depend on data size, and it's a hard job to tune this config.

Copy link
Member Author

@viirya viirya Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading a Hive partitioned table, users could get an unreasonable number of partitions like dozens of thousands.

Hive Scan node returns a UnionRDD of Hive table partitions. Each Hive table partition is read as a HadoopRDD. For each Hive table partition, the parallelism depends on data size. But final UnionRDD sums up all number of parallelism of all Hive table partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to get the size of each hadoop RDD and do coalesce automatically?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get split size and total number of splits of a Hadoop RDD.

.doc("When reading Hive partitioned table, the default parallelism is the sum of Hive " +
"partition RDDs' parallelism. For Hive table of many partitions with many files, " +
"the parallelism could be very big and not good for Spark job scheduling. This optional " +
"config can set a maximum parallelism for reading Hive partitioned table. If the result " +
"RDD of reading such table is larger than this value, Spark will reduce the partition " +
"number by doing a coalesce on the RDD.")
.intConf
.createOptional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a default value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I leave it optional is to allow us keep current behavior.


/**
* The version of the hive client that will be used to communicate with the metastore. Note that
* this does not necessarily need to be the same version of Hive that is used internally by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,17 @@ class HadoopTableReader(
if (hivePartitionRDDs.size == 0) {
new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
val unionRDD = new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
val partNums = unionRDD.partitions.length
val maxPartNums = SQLConf.get.getConf(HiveUtils.HIVE_TABLE_SCAN_MAX_PARALLELISM)
if (maxPartNums.isDefined && partNums > maxPartNums.get) {
logWarning(s"Union of Hive partitions' HadoopRDDs has ${partNums} partitions " +
"which exceeds the config `spark.sql.hive.tableScan.maxParallelism`. " +
s"Coalesces the Union RDD to ${maxPartNums.get} partitions.")
unionRDD.coalesce(maxPartNums.get)
} else {
unionRDD
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
Expand Down Expand Up @@ -187,6 +188,35 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
}
}

test("HiveTableScanExec should not increase data parallelism") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not increase data parallelism -> should respect HIVE_TABLE_SCAN_MAX_PARALLELISM?

withSQLConf(HiveUtils.HIVE_TABLE_SCAN_MAX_PARALLELISM.key -> "1") {
val view = "src"
withTempView(view) {
spark.range(1, 5).createOrReplaceTempView(view)
val table = "hive_tbl_part"
withTable(table) {
sql(
s"""
|CREATE TABLE $table (id int)
|PARTITIONED BY (a int, b int)
""".stripMargin)
sql(
s"""
|FROM $view v
|INSERT INTO TABLE $table
|PARTITION (a=1, b=2)
|SELECT v.id
|INSERT INTO TABLE $table
|PARTITION (a=2, b=3)
|SELECT v.id
""".stripMargin)
val scanRdd = getHiveTableScanExec(s"SELECT * FROM $table").execute()
assert(scanRdd.partitions.length == 1)
}
}
}
}

private def getHiveTableScanExec(query: String): HiveTableScanExec = {
sql(query).queryExecution.sparkPlan.collectFirst {
case p: HiveTableScanExec => p
Expand Down