diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e6a39669fb80e..a2576684d6899 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -434,9 +434,10 @@ object SQLConf { val COALESCE_PARTITIONS_MIN_PARTITION_NUM = buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum") - .doc("The minimum number of shuffle partitions after coalescing. If not set, the default " + - "value is the default parallelism of the Spark cluster. This configuration only " + - s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + .doc("The suggested (not guaranteed) minimum number of shuffle partitions after " + + "coalescing. If not set, the default value is the default parallelism of the " + + "Spark cluster. This configuration only has an effect when " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.") .version("3.0.0") .intConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 6450d49b050ba..e7f3bf17d62ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -49,7 +49,8 @@ case class CustomShuffleReaderExec private( // If it is a local shuffle reader with one mapper per task, then the output partitioning is // the same as the plan before shuffle. // TODO this check is based on assumptions of callers' behavior but is sufficient for now. - if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && + if (partitionSpecs.nonEmpty && + partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size == partitionSpecs.length) { child match { @@ -98,7 +99,7 @@ case class CustomShuffleReaderExec private( } @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { - if (!isLocalReader && shuffleStage.get.mapStats.isDefined) { + if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) { val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId Some(partitionSpecs.map { case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index a5b3cac4dfcd0..5416fde222cb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -141,8 +141,8 @@ object OptimizeLocalShuffleReader { def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) => - s.shuffle.canChangeNumPartitions + case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => + s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 58e07fabc3973..396c9c9d6b4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -88,9 +88,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private def targetSize(sizes: Seq[Long], medianSize: Long): Long = { val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize)) - // It's impossible that all the partitions are skewed, as we use median size to define skew. - assert(nonSkewSizes.nonEmpty) - math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) + if (nonSkewSizes.isEmpty) { + advisorySize + } else { + math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) + } } /** @@ -297,7 +299,7 @@ private object ShuffleStage { Some(ShuffleStageInfo(s, mapStats, partitions)) case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) - if s.mapStats.isDefined => + if s.mapStats.isDefined && partitionSpecs.nonEmpty => val mapStats = s.mapStats.get val sizes = mapStats.bytesByPartitionId val partitions = partitionSpecs.map { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index e10ed4f481cf7..d6e44b780d772 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -91,6 +91,14 @@ object ShufflePartitionsUtil extends Logging { var latestSplitPoint = 0 var coalescedSize = 0L var i = 0 + + def createPartitionSpec(): Unit = { + // Skip empty inputs, as it is a waste to launch an empty task. + if (coalescedSize > 0) { + partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i) + } + } + while (i < numPartitions) { // We calculate the total size of i-th shuffle partitions from all shuffles. var totalSizeOfCurrentPartition = 0L @@ -103,7 +111,7 @@ object ShufflePartitionsUtil extends Logging { // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a // new coalesced partition. if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) { - partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i) + createPartitionSpec() latestSplitPoint = i // reset postShuffleInputSize. coalescedSize = totalSizeOfCurrentPartition @@ -112,8 +120,7 @@ object ShufflePartitionsUtil extends Logging { } i += 1 } - partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions) - + createPartitionSpec() partitionSpecs } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index 7acc33c43b19d..f5c3b7816f5ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -41,13 +41,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { test("1 shuffle") { val targetSize = 100 - { - // All bytes per partition are 0. - val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0) - val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) - checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize) - } - { // Some bytes per partition are 0 and total size is less than the target size. // 1 coalesced partition is expected. @@ -70,8 +63,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { CoalescedPartitionSpec(0, 1), CoalescedPartitionSpec(1, 2), CoalescedPartitionSpec(2, 3), - CoalescedPartitionSpec(3, 4), - CoalescedPartitionSpec(4, 5)) + CoalescedPartitionSpec(3, 4)) checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize) } @@ -108,17 +100,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { } } - { - // All bytes per partition are 0. - val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) - val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) - val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) - checkEstimation( - Array(bytesByPartitionId1, bytesByPartitionId2), - expectedPartitionSpecs, - targetSize) - } - { // Some bytes per partition are 0. // 1 coalesced partition is expected. @@ -217,7 +198,18 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { // the size of data is 0. val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) - val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5)) + checkEstimation( + Array(bytesByPartitionId1, bytesByPartitionId2), + Seq.empty, targetSize, minNumPartitions) + } + + + { + // The minimal number of coalesced partitions is not enforced because + // there are too many 0-size partitions. + val bytesByPartitionId1 = Array[Long](200, 0, 0) + val bytesByPartitionId2 = Array[Long](100, 0, 0) + val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 1)) checkEstimation( Array(bytesByPartitionId1, bytesByPartitionId2), expectedPartitionSpecs, @@ -251,6 +243,37 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { } } + test("do not create partition spec for 0-size partitions") { + val targetSize = 100 + val minNumPartitions = 2 + + { + // 1 shuffle: All bytes per partition are 0, no partition spec created. + val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0) + checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize) + } + + { + // 2 shuffles: All bytes per partition are 0, no partition spec created. + val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0) + val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0) + checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize) + } + + { + // No partition spec created for the 0-size partitions. + val bytesByPartitionId1 = Array[Long](200, 0, 0, 0, 0) + val bytesByPartitionId2 = Array[Long](100, 0, 300, 0, 0) + val expectedPartitionSpecs = Seq( + CoalescedPartitionSpec(0, 1), + CoalescedPartitionSpec(2, 3)) + checkEstimation( + Array(bytesByPartitionId1, bytesByPartitionId2), + expectedPartitionSpecs, + targetSize, minNumPartitions) + } + } + test("splitSizeListByTargetSize") { val targetSize = 100 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 694be983d9699..e82ccdabdba98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -157,17 +157,17 @@ class AdaptiveQueryExecSuite val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD] // The pre-shuffle partition size is [0, 0, 0, 72, 0] - // And the partitionStartIndices is [0, 3, 4], so advisoryParallelism = 3. + // We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1 // the final parallelism is - // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1 + // math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1 // and the partitions length is 1 * numMappers = 2 assert(localShuffleRDD0.getPartitions.length == 2) // The pre-shuffle partition size is [0, 72, 0, 72, 126] - // And the partitionStartIndices is [0, 1, 2, 3, 4], so advisoryParallelism = 5. + // We exclude the 0-size partitions, so only 3 partition, advisoryParallelism = 3 // the final parallelism is - // math.max(1, advisoryParallelism / numMappers): math.max(1, 5/2) = 2 - // and the partitions length is 2 * numMappers = 4 - assert(localShuffleRDD1.getPartitions.length == 4) + // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1 + // and the partitions length is 1 * numMappers = 2 + assert(localShuffleRDD1.getPartitions.length == 2) } } @@ -197,6 +197,38 @@ class AdaptiveQueryExecSuite } } + test("Empty stage coalesced to 0-partition RDD") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") { + val df1 = spark.range(10).withColumn("a", 'id) + val df2 = spark.range(10).withColumn("b", 'id) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan + assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) + val coalescedReaders = collect(plan) { + case r: CustomShuffleReaderExec => r + } + assert(coalescedReaders.length == 2) + coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty)) + } + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { + val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan + assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) + val coalescedReaders = collect(plan) { + case r: CustomShuffleReaderExec => r + } + assert(coalescedReaders.length == 2, s"$plan") + coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty)) + } + } + } + test("Scalar subquery") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -647,12 +679,13 @@ class AdaptiveQueryExecSuite // Partition 0: both left and right sides are skewed, left side is divided // into 2 splits and right side is divided into 4 splits, so // 2 x 4 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. + // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the + // size is 0. // Partition 4: only left side is skewed, and divide into 2 splits, so // 2 sub-partitions. - // So total (8 + 1 + 3) partitions. + // So total (8 + 0 + 2) partitions. val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan) - checkSkewJoin(innerSmj, 8 + 1 + 2) + checkSkewJoin(innerSmj, 8 + 0 + 2) // skewed left outer join optimization val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult( @@ -661,12 +694,13 @@ class AdaptiveQueryExecSuite // right stats:[6292, 0, 0, 0, 0] // Partition 0: both left and right sides are skewed, but left join can't split right side, // so only left side is divided into 2 splits, and thus 2 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. + // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the + // size is 0. // Partition 4: only left side is skewed, and divide into 2 splits, so // 2 sub-partitions. - // So total (2 + 1 + 2) partitions. + // So total (2 + 0 + 2) partitions. val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan) - checkSkewJoin(leftSmj, 2 + 1 + 2) + checkSkewJoin(leftSmj, 2 + 0 + 2) // skewed right outer join optimization val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult( @@ -675,12 +709,13 @@ class AdaptiveQueryExecSuite // right stats:[6292, 0, 0, 0, 0] // Partition 0: both left and right sides are skewed, but right join can't split left side, // so only right side is divided into 4 splits, and thus 4 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. + // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the + // size is 0. // Partition 4: only left side is skewed, but right join can't split left side, so just // 1 partition. - // So total (4 + 1 + 1) partitions. + // So total (4 + 0 + 1) partitions. val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan) - checkSkewJoin(rightSmj, 4 + 1 + 1) + checkSkewJoin(rightSmj, 4 + 0 + 1) } } } @@ -852,7 +887,7 @@ class AdaptiveQueryExecSuite }.head assert(!reader.isLocalReader) assert(reader.hasSkewedPartition) - assert(reader.hasCoalescedPartition) + assert(!reader.hasCoalescedPartition) // 0-size partitions are ignored. assert(reader.metrics.contains("numSkewedPartitions")) assert(reader.metrics("numSkewedPartitions").value > 0) }