From 0a9223f10157a08f26c92c44c2df6e16a82b15f9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 22 Jun 2020 19:17:04 -0700 Subject: [PATCH 1/7] Coalesce partitions for repartition by key when AQE is enabled. --- .../plans/logical/basicLogicalOperators.scala | 18 +++++-- .../scala/org/apache/spark/sql/Dataset.scala | 54 +++++++++++-------- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 39 ++++++++++++-- 4 files changed, 86 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 79a8380826ab3..039fd9382000a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler @@ -953,16 +954,18 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) } /** - * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like - * `coalesce` and `repartition`. + * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data + * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, - numPartitions: Int) extends RepartitionOperation { + optNumPartitions: Option[Int]) extends RepartitionOperation { + val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") val partitioning: Partitioning = { @@ -990,6 +993,15 @@ case class RepartitionByExpression( override def shuffle: Boolean = true } +object RepartitionByExpression { + def apply( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Int): RepartitionByExpression = { + RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) + } +} + /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 524e231eb7eb9..64c7ac6154565 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2991,17 +2991,9 @@ class Dataset[T] private[sql]( Repartition(numPartitions, shuffle = true, logicalPlan) } - /** - * Returns a new Dataset partitioned by the given partitioning expressions into - * `numPartitions`. The resulting Dataset is hash partitioned. - * - * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). - * - * @group typedrel - * @since 2.0.0 - */ - @scala.annotation.varargs - def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + private def repartitionByExpression( + numPartitions: Option[Int], + partitionExprs: Column*): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3015,6 +3007,20 @@ class Dataset[T] private[sql]( } } + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group typedrel + * @since 2.0.0 + */ + @scala.annotation.varargs + def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + repartitionByExpression(Some(numPartitions), partitionExprs: _*) + } + /** * Returns a new Dataset partitioned by the given partitioning expressions, using * `spark.sql.shuffle.partitions` as number of partitions. @@ -3027,7 +3033,20 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = { - repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByExpression(None, partitionExprs: _*) + } + + private def repartitionByRange( + numPartitions: Option[Int], + partitionExprs: Column*): Dataset[T] = { + require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") + val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { + case expr: SortOrder => expr + case expr: Expression => SortOrder(expr, Ascending) + }) + withTypedPlan { + RepartitionByExpression(sortOrder, logicalPlan, numPartitions) + } } /** @@ -3049,14 +3068,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") - val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { - case expr: SortOrder => expr - case expr: Expression => SortOrder(expr, Ascending) - }) - withTypedPlan { - RepartitionByExpression(sortOrder, logicalPlan, numPartitions) - } + repartitionByRange(Some(numPartitions), partitionExprs: _*) } /** @@ -3078,7 +3090,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(partitionExprs: Column*): Dataset[T] = { - repartitionByRange(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByRange(None, partitionExprs: _*) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4d23e5e8a65b5..8d7a82061caa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -685,8 +685,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => + val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil + r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 9fa97bffa8910..22a44a489e3c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ @@ -1026,15 +1026,48 @@ class AdaptiveQueryExecSuite Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", SQLConf.SHUFFLE_PARTITIONS.key -> "6", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { - val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + val df = spark.range(10).repartition($"id") + val partitionsNum = df.rdd.collectPartitions().length if (enableAQE) { - assert(partitionsNum === 7) + assert(partitionsNum < 6) + + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 7) } else { assert(partitionsNum === 6) } } } } + + test("SPARK-32056 coalesce partitions for repartition by expressions when AQE is enabled") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + val partitionsNum1 = (1 to 10).toDF.repartition($"value") + .rdd.collectPartitions().length + + val partitionsNum2 = (1 to 10).toDF.repartitionByRange($"value".asc) + .rdd.collectPartitions().length + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + } + } + } + } } From 43c4726fa8b0de623d5563720c96632193262ec2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 22 Jun 2020 21:48:36 -0700 Subject: [PATCH 2/7] Address comments. --- .../main/scala/org/apache/spark/sql/Dataset.scala | 12 ++++++------ .../apache/spark/sql/execution/SparkStrategies.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 64c7ac6154565..6f97121d88ede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2993,7 +2993,7 @@ class Dataset[T] private[sql]( private def repartitionByExpression( numPartitions: Option[Int], - partitionExprs: Column*): Dataset[T] = { + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3018,7 +3018,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - repartitionByExpression(Some(numPartitions), partitionExprs: _*) + repartitionByExpression(Some(numPartitions), partitionExprs) } /** @@ -3033,12 +3033,12 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = { - repartitionByExpression(None, partitionExprs: _*) + repartitionByExpression(None, partitionExprs) } private def repartitionByRange( numPartitions: Option[Int], - partitionExprs: Column*): Dataset[T] = { + partitionExprs: Seq[Column]): Dataset[T] = { require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { case expr: SortOrder => expr @@ -3068,7 +3068,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - repartitionByRange(Some(numPartitions), partitionExprs: _*) + repartitionByRange(Some(numPartitions), partitionExprs) } /** @@ -3090,7 +3090,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(partitionExprs: Column*): Dataset[T] = { - repartitionByRange(None, partitionExprs: _*) + repartitionByRange(None, partitionExprs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8d7a82061caa0..5341b4778c539 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -687,7 +687,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.RepartitionByExpression => val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil + r.partitioning, planLater(r.child), canChangeNumParts) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil From 8e39ed7787c9f80591963de1e7ab4f0f2c24fda3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 22 Jun 2020 23:14:48 -0700 Subject: [PATCH 3/7] Add test. --- .../adaptive/AdaptiveQueryExecSuite.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 22a44a489e3c2..6cd82218230e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1029,12 +1029,15 @@ class AdaptiveQueryExecSuite SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", SQLConf.SHUFFLE_PARTITIONS.key -> "6", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { - val df = spark.range(10).repartition($"id") - val partitionsNum = df.rdd.collectPartitions().length + val df1 = spark.range(10).repartition($"id") + val df2 = spark.range(10).repartition(10, $"id") + val df3 = spark.range(10).repartition(10) + + val partitionsNum1 = df1.rdd.collectPartitions().length if (enableAQE) { - assert(partitionsNum < 6) + assert(partitionsNum1 < 6) - val plan = df.queryExecution.executedPlan + val plan = df1.queryExecution.executedPlan assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { case s: ShuffleExchangeExec => s @@ -1042,8 +1045,14 @@ class AdaptiveQueryExecSuite assert(shuffle.size == 1) assert(shuffle(0).outputPartitioning.numPartitions == 7) } else { - assert(partitionsNum === 6) + assert(partitionsNum1 === 6) } + + val partitionsNum2 = df2.rdd.collectPartitions().length + assert(partitionsNum2 == 10) + + val partitionsNum3 = df3.rdd.collectPartitions().length + assert(partitionsNum3 == 10) } } } From 4b9b0e84f826f29d419dd329f353dcac26b28c74 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 Jun 2020 06:52:44 -0700 Subject: [PATCH 4/7] For comments. --- .../adaptive/AdaptiveQueryExecSuite.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 6cd82218230e3..e78b5c69ce8b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1048,11 +1048,8 @@ class AdaptiveQueryExecSuite assert(partitionsNum1 === 6) } - val partitionsNum2 = df2.rdd.collectPartitions().length - assert(partitionsNum2 == 10) - - val partitionsNum3 = df3.rdd.collectPartitions().length - assert(partitionsNum3 == 10) + assert(df2.rdd.collectPartitions().length == 10) + assert(df3.rdd.collectPartitions().length == 10) } } } @@ -1064,17 +1061,22 @@ class AdaptiveQueryExecSuite SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + val partitionsNum1 = (1 to 10).toDF.repartition($"value") .rdd.collectPartitions().length - val partitionsNum2 = (1 to 10).toDF.repartitionByRange($"value".asc) .rdd.collectPartitions().length + val partitionsNum3 = (1 to 10).toDF.repartition($"value" + 1) + .rdd.collectPartitions().length + if (enableAQE) { assert(partitionsNum1 < 10) assert(partitionsNum2 < 10) + assert(partitionsNum3 < 10) } else { assert(partitionsNum1 === 10) assert(partitionsNum2 === 10) + assert(partitionsNum3 === 10) } } } From 7ceaebcdff8c149a76106d789d0270205592ca68 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 Jun 2020 22:36:33 -0700 Subject: [PATCH 5/7] For comments. --- .../adaptive/AdaptiveQueryExecSuite.scala | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index e78b5c69ce8b9..06ae738392169 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1032,6 +1032,7 @@ class AdaptiveQueryExecSuite val df1 = spark.range(10).repartition($"id") val df2 = spark.range(10).repartition(10, $"id") val df3 = spark.range(10).repartition(10) + val df4 = spark.range(10).repartitionByRange(10, $"id".asc) val partitionsNum1 = df1.rdd.collectPartitions().length if (enableAQE) { @@ -1050,6 +1051,7 @@ class AdaptiveQueryExecSuite assert(df2.rdd.collectPartitions().length == 10) assert(df3.rdd.collectPartitions().length == 10) + assert(df4.rdd.collectPartitions().length == 10) } } } @@ -1064,19 +1066,39 @@ class AdaptiveQueryExecSuite val partitionsNum1 = (1 to 10).toDF.repartition($"value") .rdd.collectPartitions().length - val partitionsNum2 = (1 to 10).toDF.repartitionByRange($"value".asc) + val partitionsNum2 = (1 to 10).toDF.repartition($"value" + 1) .rdd.collectPartitions().length - val partitionsNum3 = (1 to 10).toDF.repartition($"value" + 1) + + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + } + } + } + } + + test("SPARK-32056 coalesce partitions for repartition by range when AQE is enabled") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val partitionsNum1 = (1 to 10).toDF.repartitionByRange($"value".asc) + .rdd.collectPartitions().length + val partitionsNum2 = (1 to 10).toDF.repartitionByRange(($"value" + 1).asc) .rdd.collectPartitions().length if (enableAQE) { assert(partitionsNum1 < 10) assert(partitionsNum2 < 10) - assert(partitionsNum3 < 10) } else { assert(partitionsNum1 === 10) assert(partitionsNum2 === 10) - assert(partitionsNum3 === 10) } } } From df6a035beffad633f64fc9364e14afebb05a5123 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 Jun 2020 23:26:40 -0700 Subject: [PATCH 6/7] Refine test cases. --- .../adaptive/AdaptiveQueryExecSuite.scala | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 06ae738392169..dfec7132bdddd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1022,84 +1022,83 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + test("SPARK-31220 and SPARK-32056 coalesce partitions for repartition by expressions " + + "when AQE is enabled") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.SHUFFLE_PARTITIONS.key -> "6", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + val df1 = spark.range(10).repartition($"id") - val df2 = spark.range(10).repartition(10, $"id") - val df3 = spark.range(10).repartition(10) - val df4 = spark.range(10).repartitionByRange(10, $"id".asc) + val df2 = spark.range(10).repartition($"id" + 1) val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + if (enableAQE) { - assert(partitionsNum1 < 6) + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled val plan = df1.queryExecution.executedPlan assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { case s: ShuffleExchangeExec => s } assert(shuffle.size == 1) - assert(shuffle(0).outputPartitioning.numPartitions == 7) + assert(shuffle(0).outputPartitioning.numPartitions == 10) } else { - assert(partitionsNum1 === 6) + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) } - assert(df2.rdd.collectPartitions().length == 10) + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartition(10, $"id") + val df4 = spark.range(10).repartition(10) assert(df3.rdd.collectPartitions().length == 10) assert(df4.rdd.collectPartitions().length == 10) } } } - test("SPARK-32056 coalesce partitions for repartition by expressions when AQE is enabled") { + test("SPARK-31220 and SPARK-32056 coalesce partitions for repartition by range " + + "when AQE is enabled") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - val partitionsNum1 = (1 to 10).toDF.repartition($"value") - .rdd.collectPartitions().length - val partitionsNum2 = (1 to 10).toDF.repartition($"value" + 1) - .rdd.collectPartitions().length + val df1 = spark.range(10).toDF.repartitionByRange($"id".asc) + val df2 = spark.range(10).toDF.repartitionByRange(($"id" + 1).asc) - if (enableAQE) { - assert(partitionsNum1 < 10) - assert(partitionsNum2 < 10) - } else { - assert(partitionsNum1 === 10) - assert(partitionsNum2 === 10) - } - } - } - } - - test("SPARK-32056 coalesce partitions for repartition by range when AQE is enabled") { - Seq(true, false).foreach { enableAQE => - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, - SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50", - SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - - val partitionsNum1 = (1 to 10).toDF.repartitionByRange($"value".asc) - .rdd.collectPartitions().length - val partitionsNum2 = (1 to 10).toDF.repartitionByRange(($"value" + 1).asc) - .rdd.collectPartitions().length + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length if (enableAQE) { assert(partitionsNum1 < 10) assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) } else { assert(partitionsNum1 === 10) assert(partitionsNum2 === 10) } + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartitionByRange(10, $"id".asc) + assert(df3.rdd.collectPartitions().length == 10) } } } From 1ae1a8799ecf0a485cb4617a54d8d68feea0eaf8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jun 2020 09:02:07 -0700 Subject: [PATCH 7/7] Modify test name. --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index dfec7132bdddd..27d9748476c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1022,8 +1022,7 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 and SPARK-32056 coalesce partitions for repartition by expressions " + - "when AQE is enabled") { + test("SPARK-31220, SPARK-32056: repartition by expression with AQE") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, @@ -1064,8 +1063,7 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 and SPARK-32056 coalesce partitions for repartition by range " + - "when AQE is enabled") { + test("SPARK-31220, SPARK-32056: repartition by range with AQE") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,