Skip to content

Commit

Permalink
Change AQE default minPartitionNum to defaultParallelism (#674)
Browse files Browse the repository at this point in the history
* Make estimatePartitionStartIndices static and pass advisory size and min partitions as parameters

* Make SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS optional and default to defaultParallelism

* Test that defaultParallelism is used as minimum

* Fix 'Change merge join to broadcast join and reduce number of shuffle partitions'

* Fix the mix

* Add issue reference for revert todo
  • Loading branch information
rshkv authored May 1, 2020
1 parent e52e8c5 commit da8a7ff
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution. " +
"If not set, the default value is the default parallelism of the Spark cluster.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)
.createOptional

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
Expand Down Expand Up @@ -1830,9 +1831,6 @@ class SQLConf extends Serializable with Logging {

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, subqueryCache),
ReduceNumShufflePartitions(conf),
ReduceNumShufflePartitions(queryExecution.sparkSession),
CollapseCodegenStages(conf)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration

import org.apache.spark.MapOutputStatistics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
Expand Down Expand Up @@ -51,7 +53,8 @@ import org.apache.spark.util.ThreadUtils
* - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
*/
case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
case class ReduceNumShufflePartitions(session: SparkSession) extends Rule[SparkPlan] {
private def conf = session.sessionState.conf

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.reducePostShufflePartitionsEnabled) {
Expand Down Expand Up @@ -88,7 +91,14 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
val minPartitionNum = conf.getConf(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
.getOrElse(session.sparkContext.defaultParallelism)
val partitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices(
validMetrics.toArray,
conf.targetPostShuffleInputSize,
minPartitionNum)
// This transformation adds new nodes, so we must use `transformUp` here.
plan.transformUp {
// even for shuffle exchange whose input RDD has 0 partition, we should still update its
Expand All @@ -102,19 +112,20 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
}

object ReduceNumShufflePartitions extends Logging {
/**
* Estimates partition start indices for post-shuffle partitions based on
* mapOutputStatistics provided by all pre-shuffle stages.
*/
// visible for testing.
private[sql] def estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
// value less than advisoryTargetPostShuffleInputSize as the target input size of
// a post shuffle task.
mapOutputStatistics: Array[MapOutputStatistics],
advisoryTargetPostShuffleInputSize: Long,
minNumPostShufflePartitions: Int): Array[Int] = {
// If `minNumPostShufflePartitions` is very large, it is possible that we need to use a value
// less than `advisoryTargetPostShuffleInputSize` as the target size of a coalesced task.
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// The max at here is to make sure that when we have an empty table, we
// only have a single post-shuffle partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,87 +52,79 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
}

private def checkEstimation(
rule: ReduceNumShufflePartitions,
bytesByPartitionIdArray: Array[Array[Long]],
expectedPartitionStartIndices: Array[Int]): Unit = {
expectedPartitionStartIndices: Seq[Int],
targetSize: Long,
minNumPartitions: Int = 1): Unit = {
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
case (bytesByPartitionId, index) =>
new MapOutputStatistics(index, bytesByPartitionId)
}
val estimatedPartitionStartIndices =
rule.estimatePartitionStartIndices(mapOutputStatistics)
val estimatedPartitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices(
mapOutputStatistics,
targetSize,
minNumPartitions)
assert(estimatedPartitionStartIndices === expectedPartitionStartIndices)
}

private def createReduceNumShufflePartitionsRule(
advisoryTargetPostShuffleInputSize: Long,
minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = {
val conf = new SQLConf().copy(
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize,
SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions)
ReduceNumShufflePartitions(conf)
}

test("test estimatePartitionStartIndices - 1 Exchange") {
val rule = createReduceNumShufflePartitionsRule(100L)
val targetSize = 100

{
// All bytes per partition are 0.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// Some bytes per partition are 0 and total size is less than the target size.
// 1 post-shuffle partition is needed.
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// 2 post-shuffle partitions are needed.
val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
val expectedPartitionStartIndices = Array[Int](0, 3)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// There are a few large pre-shuffle partitions.
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// All pre-shuffle partitions are larger than the targeted size.
val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// The last pre-shuffle partition is in a single post-shuffle partition.
val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
val expectedPartitionStartIndices = Array[Int](0, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}
}

test("test estimatePartitionStartIndices - 2 Exchanges") {
val rule = createReduceNumShufflePartitionsRule(100L)
val targetSize = 100

{
// If there are multiple values of the number of pre-shuffle partitions,
// we should see an assertion error.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0)
val mapOutputStatistics =
Array(
new MapOutputStatistics(0, bytesByPartitionId1),
new MapOutputStatistics(1, bytesByPartitionId2))
intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
intercept[AssertionError]{
checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize)
}
}

{
Expand All @@ -141,9 +133,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -153,9 +145,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -164,9 +156,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -175,9 +167,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -186,9 +178,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -197,9 +189,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -208,14 +200,15 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}
}

test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
val rule = createReduceNumShufflePartitionsRule(100L, 2)
val targetSize = 100
val minNumPartitions = 2

{
// The minimal number of post-shuffle partitions is not enforced because
Expand All @@ -224,9 +217,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}

{
Expand All @@ -235,9 +229,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
val expectedPartitionStartIndices = Array[Int](0, 3)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}

{
Expand All @@ -246,9 +241,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}
}

Expand All @@ -268,7 +264,8 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
def withSparkSession(
f: SparkSession => Unit,
targetPostShuffleInputSize: Int,
minNumPostShufflePartitions: Option[Int]): Unit = {
minNumPostShufflePartitions: Option[Int],
unsetMinMax: Boolean = false): Unit = {
val sparkConf =
new SparkConf(false)
.setMaster("local[*]")
Expand All @@ -288,6 +285,11 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
}

if (unsetMinMax) {
sparkConf.remove(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key)
sparkConf.remove(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key)
}

val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
Expand Down Expand Up @@ -523,6 +525,38 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
}
}

// TODO(rshkv): Remove after taking SPARK-31124 (#676)
test("number of reducers is lower-bound by default parallelism without configured minimum") {
val test = { spark: SparkSession =>
val df =
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 20 as key", "id as value")
val agg = df.groupBy("key").count()

agg.collect()

val finalPlan = agg.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case reader: CoalescedShuffleReaderExec => reader
}

assert(shuffleReaders.length === 1)
shuffleReaders.foreach { reader =>
// Assert that there is no configured minimum
assert(!spark.conf.contains(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key))
// The number of output partitions will be slightly larger than defaultParallelism because
// input partitions don't exactly fit. Key here is that have more than one partition.
assert(reader.outputPartitioning.numPartitions >= spark.sparkContext.defaultParallelism)
}
}
// Pick an advisory partition size such that we'd have one partition
// if min = defaultParallelism didn't work
val targetSizeForOnePartition = 1000000000
withSparkSession(test, targetSizeForOnePartition, None, unsetMinMax = true)
}

test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
val test: SparkSession => Unit = { spark: SparkSession =>
spark.sql("SET spark.sql.exchange.reuse=true")
Expand Down
Loading

0 comments on commit da8a7ff

Please sign in to comment.