From 2dfc64811abcfb9a36ae5a2cf5060a5a28ef726b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 00:05:35 -0700 Subject: [PATCH 01/21] Add failing test illustrating bad exchange planning. --- .../apache/spark/sql/execution/ExchangeSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 79e903c2bbd4..0600bb2a2bbc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.execution.joins.SortMergeJoin class ExchangeSuite extends SparkPlanTest { test("shuffling UnsafeRows in exchange") { @@ -29,4 +30,14 @@ class ExchangeSuite extends SparkPlanTest { input.map(Row.fromTuple) ) } + + test("EnsureRequirements shouldn't add exchange to SMJ inputs if both are SinglePartition") { + val df = (1 to 10).map(Tuple1.apply).toDF("a").repartition(1) + val keys = Seq(df.col("a").expr) + val smj = SortMergeJoin(keys, keys, df.queryExecution.sparkPlan, df.queryExecution.sparkPlan) + val afterEnsureRequirements = EnsureRequirements(df.sqlContext).apply(smj) + if (afterEnsureRequirements.collect { case Exchange(_, _) => true }.nonEmpty) { + fail(s"No Exchanges should have been added:\n$afterEnsureRequirements") + } + } } From cc5669cd1cfd81df30d00901c65c79bc50c8d448 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 00:05:55 -0700 Subject: [PATCH 02/21] Adding outputPartitioning to Repartition does not fix the test. --- .../org/apache/spark/sql/execution/basicOperators.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f4677b4ee86b..842dd7f08a48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -245,6 +245,11 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = { + if (numPartitions == 1) SinglePartition + else UnknownPartitioning(numPartitions) + } + protected override def doExecute(): RDD[InternalRow] = { child.execute().map(_.copy()).coalesce(numPartitions, shuffle) } From 067595637b3fea6eba43b43a87e274bb60e846e2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 00:06:23 -0700 Subject: [PATCH 03/21] Preserving ordering and partitioning in row format converters also does not help. --- .../org/apache/spark/sql/execution/rowFormatConverters.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala index 29f3beb3cb3c..855555dd1d4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala @@ -21,6 +21,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule /** @@ -33,6 +34,8 @@ case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode { require(UnsafeProjection.canSupport(child.schema), s"Cannot convert ${child.schema} to Unsafe") override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputsUnsafeRows: Boolean = true override def canProcessUnsafeRows: Boolean = false override def canProcessSafeRows: Boolean = true @@ -51,6 +54,8 @@ case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode { @DeveloperApi case class ConvertToSafe(child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputsUnsafeRows: Boolean = false override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = false From adcc742e3152bacbe7d0d642de9945b7441c2bbe Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 00:35:37 -0700 Subject: [PATCH 04/21] Move test to PlannerSuite. --- .../apache/spark/sql/execution/ExchangeSuite.scala | 10 ---------- .../apache/spark/sql/execution/PlannerSuite.scala | 12 +++++++++++- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 0600bb2a2bbc..ad9f7697f680 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -30,14 +30,4 @@ class ExchangeSuite extends SparkPlanTest { input.map(Row.fromTuple) ) } - - test("EnsureRequirements shouldn't add exchange to SMJ inputs if both are SinglePartition") { - val df = (1 to 10).map(Tuple1.apply).toDF("a").repartition(1) - val keys = Seq(df.col("a").expr) - val smj = SortMergeJoin(keys, keys, df.queryExecution.sparkPlan, df.queryExecution.sparkPlan) - val afterEnsureRequirements = EnsureRequirements(df.sqlContext).apply(smj) - if (afterEnsureRequirements.collect { case Exchange(_, _) => true }.nonEmpty) { - fail(s"No Exchanges should have been added:\n$afterEnsureRequirements") - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 18b0e54dc7c5..6cb751c7bf73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin, SortMergeJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext} import org.apache.spark.sql.test.TestSQLContext._ @@ -202,4 +202,14 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } } } + + test("EnsureRequirements shouldn't add exchange to SMJ inputs if both are SinglePartition") { + val df = (1 to 10).map(Tuple1.apply).toDF("a").repartition(1) + val keys = Seq(df.col("a").expr) + val smj = SortMergeJoin(keys, keys, df.queryExecution.sparkPlan, df.queryExecution.sparkPlan) + val afterEnsureRequirements = EnsureRequirements(df.sqlContext).apply(smj) + if (afterEnsureRequirements.collect { case Exchange(_, _) => true }.nonEmpty) { + fail(s"No Exchanges should have been added:\n$afterEnsureRequirements") + } + } } From c9fb2315f6d85d7d70769a49eb6e7d40dbc6d0a1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 02:04:39 -0700 Subject: [PATCH 05/21] Rewrite exchange to fix better handle this case. --- .../apache/spark/sql/execution/Exchange.scala | 104 ++++++++++-------- .../spark/sql/execution/SparkPlan.scala | 6 + .../execution/joins/LeftSemiJoinHash.scala | 2 + .../execution/joins/ShuffledHashJoin.scala | 2 + .../joins/ShuffledHashOuterJoin.scala | 2 + .../sql/execution/joins/SortMergeJoin.scala | 2 + 6 files changed, 73 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 6ea5eeedf1bb..21029861709a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -201,62 +201,76 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una */ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - def numPartitions: Int = sqlContext.conf.numShufflePartitions + private def numPartitions: Int = sqlContext.conf.numShufflePartitions - def apply(plan: SparkPlan): SparkPlan = plan.transformUp { - case operator: SparkPlan => - // Adds Exchange or Sort operators as required - def addOperatorsIfNecessary( - partitioning: Partitioning, - rowOrdering: Seq[SortOrder], - child: SparkPlan): SparkPlan = { - - def addShuffleIfNecessary(child: SparkPlan): SparkPlan = { - if (!child.outputPartitioning.guarantees(partitioning)) { - Exchange(partitioning, child) - } else { - child - } - } - - def addSortIfNecessary(child: SparkPlan): SparkPlan = { + private def canonicalPartitioning(requiredDistribution: Distribution): Partitioning = { + requiredDistribution match { + case AllTuples => SinglePartition + case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions) + case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions) + case dist => sys.error(s"Do not know how to satisfy distribution $dist") + } + } - if (rowOrdering.nonEmpty) { - // If child.outputOrdering is [a, b] and rowOrdering is [a], we do not need to sort. - val minSize = Seq(rowOrdering.size, child.outputOrdering.size).min - if (minSize == 0 || rowOrdering.take(minSize) != child.outputOrdering.take(minSize)) { - sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) - } else { + private def ensureChildNumPartitionsAgreementIfNecessary(operator: SparkPlan): SparkPlan = { + if (operator.requiresChildrenToProduceSameNumberOfPartitions) { + if (operator.children.map(_.outputPartitioning.numPartitions).distinct.size > 1) { + val newChildren = operator.children.zip(operator.requiredChildDistribution).map { + case (child, requiredDistribution) => + val targetPartitioning = canonicalPartitioning(requiredDistribution) + if (child.outputPartitioning.guarantees(targetPartitioning)) { child + } else { + Exchange(targetPartitioning, child) } - } else { - child - } } - - addSortIfNecessary(addShuffleIfNecessary(child)) + operator.withNewChildren(newChildren) + } else { + operator } + } else { + operator + } + } - val requirements = - (operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children) + private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - val fixedChildren = requirements.zipped.map { - case (AllTuples, rowOrdering, child) => - addOperatorsIfNecessary(SinglePartition, rowOrdering, child) - case (ClusteredDistribution(clustering), rowOrdering, child) => - addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child) - case (OrderedDistribution(ordering), rowOrdering, child) => - addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child) + def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { + if (child.outputPartitioning.satisfies(requiredDistribution)) { + child + } else { + Exchange(canonicalPartitioning(requiredDistribution), child) + } + } - case (UnspecifiedDistribution, Seq(), child) => + def addSortIfNecessary(child: SparkPlan, requiredOrdering: Seq[SortOrder]): SparkPlan = { + if (requiredOrdering.nonEmpty) { + // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort. + val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min + if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize)) { + sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false, child) + } else { child - case (UnspecifiedDistribution, rowOrdering, child) => - sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) - - case (dist, ordering, _) => - sys.error(s"Don't know how to ensure $dist with ordering $ordering") + } + } else { + child } + } + + val children = operator.children + val requiredChildDistribution = operator.requiredChildDistribution + val requiredChildOrdering = operator.requiredChildOrdering + assert(children.length == requiredChildDistribution.length) + assert(children.length == requiredChildOrdering.length) + val newChildren = (children, requiredChildDistribution, requiredChildOrdering).zipped.map { + case (child, requiredDistribution, requiredOrdering) => + addSortIfNecessary(addShuffleIfNecessary(child, requiredDistribution), requiredOrdering) + } + operator.withNewChildren(newChildren) + } - operator.withNewChildren(fixedChildren) + def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case operator: SparkPlan => + ensureDistributionAndOrdering(ensureChildNumPartitionsAgreementIfNecessary(operator)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2f29067f5646..89703405a38b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -109,6 +109,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) + /** + * Specifies whether this operator requires all of its children to produce the same number of + * output partitions. + */ + def requiresChildrenToProduceSameNumberOfPartitions: Boolean = false + /** Specifies whether this operator outputs UnsafeRows */ def outputsUnsafeRows: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 68ccd34d8ed9..8c42e4f04f02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -42,6 +42,8 @@ case class LeftSemiJoinHash( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + protected override def doExecute(): RDD[InternalRow] = { right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) => if (condition.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index c923dc837c44..e709b07a336b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -46,6 +46,8 @@ case class ShuffledHashJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + protected override def doExecute(): RDD[InternalRow] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index eee8ad800f98..0965ff2bd684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -44,6 +44,8 @@ case class ShuffledHashOuterJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def outputPartitioning: Partitioning = joinType match { case LeftOuter => left.outputPartitioning case RightOuter => right.outputPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 4ae23c186cf7..12bb3cbc0982 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -48,6 +48,8 @@ case class SortMergeJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) override def requiredChildOrdering: Seq[Seq[SortOrder]] = From c628daf9575ed27b1d2bd62a21b5fb3ccb74bce4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 02:06:56 -0700 Subject: [PATCH 06/21] Revert accidental ExchangeSuite change. --- .../scala/org/apache/spark/sql/execution/ExchangeSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index ad9f7697f680..79e903c2bbd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical.SinglePartition -import org.apache.spark.sql.execution.joins.SortMergeJoin class ExchangeSuite extends SparkPlanTest { test("shuffling UnsafeRows in exchange") { From 752b8de2d6f0c4f28be01d53f35c75f0273a6aeb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 02:22:53 -0700 Subject: [PATCH 07/21] style fix --- .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 21029861709a..e66768d10969 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -269,7 +269,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ operator.withNewChildren(newChildren) } - def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => ensureDistributionAndOrdering(ensureChildNumPartitionsAgreementIfNecessary(operator)) } From 2e0f33acb20ab45d1c5e41ed2a943ed90ed12d14 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 09:46:25 -0700 Subject: [PATCH 08/21] Write a more generic test for EnsureRequirements. --- .../spark/sql/execution/PlannerSuite.scala | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 6cb751c7bf73..b9fed5c5294b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -18,10 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkFunSuite +import org.apache.spark.rdd.RDD import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions .{Ascending, Literal, Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin, SortMergeJoin} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext} import org.apache.spark.sql.test.TestSQLContext._ @@ -203,13 +207,38 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } } - test("EnsureRequirements shouldn't add exchange to SMJ inputs if both are SinglePartition") { - val df = (1 to 10).map(Tuple1.apply).toDF("a").repartition(1) - val keys = Seq(df.col("a").expr) - val smj = SortMergeJoin(keys, keys, df.queryExecution.sparkPlan, df.queryExecution.sparkPlan) - val afterEnsureRequirements = EnsureRequirements(df.sqlContext).apply(smj) - if (afterEnsureRequirements.collect { case Exchange(_, _) => true }.nonEmpty) { - fail(s"No Exchanges should have been added:\n$afterEnsureRequirements") + // --- Unit tests of EnsureRequirements --------------------------------------------------------- + + test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { + val outputOrdering = Seq(SortOrder(Literal(1), Ascending)) + val distribution = ClusteredDistribution(Literal(1) :: Nil) + val inputPlan = DummyPlan( + children = Seq( + DummyPlan(outputPartitioning = SinglePartition), + DummyPlan(outputPartitioning = SinglePartition) + ), + requiresChildrenToProduceSameNumberOfPartitions = true, + requiredChildDistribution = Seq(distribution, distribution), + requiredChildOrdering = Seq(outputOrdering, outputOrdering) + ) + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + if (outputPlan.collect { case Exchange(_, _) => true }.nonEmpty) { + fail(s"No Exchanges should have been added:\n$outputPlan") } } + + // --------------------------------------------------------------------------------------------- +} + +// Used for unit-testing EnsureRequirements +private case class DummyPlan( + override val children: Seq[SparkPlan] = Nil, + override val outputOrdering: Seq[SortOrder] = Nil, + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val requiresChildrenToProduceSameNumberOfPartitions: Boolean = false, + override val requiredChildDistribution: Seq[Distribution] = Nil, + override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil + ) extends SparkPlan { + override protected def doExecute(): RDD[InternalRow] = throw new NotImplementedError + override def output: Seq[Attribute] = Seq.empty } From 5172ac561def701754af8e955130f4b0f4e979dc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 09:54:57 -0700 Subject: [PATCH 09/21] Add test for requiresChildrenToProduceSameNumberOfPartitions. --- .../spark/sql/execution/PlannerSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index b9fed5c5294b..7ab20363e99e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -209,6 +209,22 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // --- Unit tests of EnsureRequirements --------------------------------------------------------- + test("EnsureRequirements ensures that children produce same number of partitions when required") { + val clustering = Literal(1) :: Nil + val distribution = ClusteredDistribution(clustering) + val inputPlan = DummyPlan( + children = Seq( + DummyPlan(outputPartitioning = HashPartitioning(clustering, 1)), + DummyPlan(outputPartitioning = HashPartitioning(clustering, 2)) + ), + requiresChildrenToProduceSameNumberOfPartitions = true, + requiredChildDistribution = Seq(distribution, distribution), + requiredChildOrdering = Seq(Seq.empty, Seq.empty) + ) + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assert (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size === 1) + } + test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { val outputOrdering = Seq(SortOrder(Literal(1), Ascending)) val distribution = ClusteredDistribution(Literal(1) :: Nil) From 0725a346e920d782124270ec2e8351009ea5af34 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 10:05:01 -0700 Subject: [PATCH 10/21] Small assertion cleanup. --- .../spark/sql/execution/PlannerSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7ab20363e99e..5cc87f5e12af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -209,6 +209,19 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // --- Unit tests of EnsureRequirements --------------------------------------------------------- + private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = { + if (outputPlan.requiresChildrenToProduceSameNumberOfPartitions) { + if (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size != 1) { + fail(s"Children did not produce the same number of partitions:\n$outputPlan") + } + } + outputPlan.children.zip(outputPlan.requiredChildDistribution).foreach { + case (child, requiredDist) => + assert(child.outputPartitioning.satisfies(requiredDist), + s"$child output partitioning does not satisfy $requiredDist:\n$outputPlan") + } + } + test("EnsureRequirements ensures that children produce same number of partitions when required") { val clustering = Literal(1) :: Nil val distribution = ClusteredDistribution(clustering) @@ -222,7 +235,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) - assert (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size === 1) + assertDistributionRequirementsAreSatisfied(outputPlan) } test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { @@ -238,6 +251,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { requiredChildOrdering = Seq(outputOrdering, outputOrdering) ) val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case Exchange(_, _) => true }.nonEmpty) { fail(s"No Exchanges should have been added:\n$outputPlan") } From a1c12b98228a4c7157ebb0ec326023b815b141b0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 12:55:17 -0700 Subject: [PATCH 11/21] Add failing test to demonstrate allCompatible bug --- .../plans/physical/partitioning.scala | 16 ++++++ .../apache/spark/sql/execution/Exchange.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 6 +- .../execution/joins/LeftSemiJoinHash.scala | 2 +- .../execution/joins/ShuffledHashJoin.scala | 2 +- .../joins/ShuffledHashOuterJoin.scala | 2 +- .../sql/execution/joins/SortMergeJoin.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 56 ++++++++++++++++--- 8 files changed, 72 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index ec659ce789c2..97cf83d897a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -95,6 +95,22 @@ sealed trait Partitioning { def guarantees(other: Partitioning): Boolean } +object Partitioning { + def allCompatible(partitionings: Seq[Partitioning]): Boolean = { + // Note: this assumes transitivity + partitionings.sliding(2).map { + case Seq(a) => true + case Seq(a, b) => + if (a.numPartitions != b.numPartitions) { + assert(!a.guarantees(b) && !b.guarantees(a)) + false + } else { + a.guarantees(b) && b.guarantees(a) + } + }.forall(_ == true) + } +} + case class UnknownPartitioning(numPartitions: Int) extends Partitioning { override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index e66768d10969..6d4d4834dfc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -213,7 +213,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } private def ensureChildNumPartitionsAgreementIfNecessary(operator: SparkPlan): SparkPlan = { - if (operator.requiresChildrenToProduceSameNumberOfPartitions) { + if (operator.requiresChildPartitioningsToBeCompatible) { if (operator.children.map(_.outputPartitioning.numPartitions).distinct.size > 1) { val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 89703405a38b..c6fe18c7202b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -110,10 +110,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) /** - * Specifies whether this operator requires all of its children to produce the same number of - * output partitions. + * Specifies whether this operator requires all of its children to have [[outputPartitioning]]s + * that are compatible with each other. */ - def requiresChildrenToProduceSameNumberOfPartitions: Boolean = false + def requiresChildPartitioningsToBeCompatible: Boolean = false /** Specifies whether this operator outputs UnsafeRows */ def outputsUnsafeRows: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 8c42e4f04f02..6790eaa85e88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -42,7 +42,7 @@ case class LeftSemiJoinHash( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def requiresChildPartitioningsToBeCompatible: Boolean = true protected override def doExecute(): RDD[InternalRow] = { right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index e709b07a336b..741ca402436c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -46,7 +46,7 @@ case class ShuffledHashJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def requiresChildPartitioningsToBeCompatible: Boolean = true protected override def doExecute(): RDD[InternalRow] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index 0965ff2bd684..49e67fef86b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -44,7 +44,7 @@ case class ShuffledHashOuterJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def requiresChildPartitioningsToBeCompatible: Boolean = true override def outputPartitioning: Partitioning = joinType match { case LeftOuter => left.outputPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 12bb3cbc0982..c928bea7059b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -48,7 +48,7 @@ case class SortMergeJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildrenToProduceSameNumberOfPartitions: Boolean = true + override def requiresChildPartitioningsToBeCompatible: Boolean = true override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 5cc87f5e12af..2e6ea76ac65e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.rdd.RDD import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions .{Ascending, Literal, Attribute, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ @@ -210,9 +210,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // --- Unit tests of EnsureRequirements --------------------------------------------------------- private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = { - if (outputPlan.requiresChildrenToProduceSameNumberOfPartitions) { - if (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size != 1) { - fail(s"Children did not produce the same number of partitions:\n$outputPlan") + if (outputPlan.requiresChildPartitioningsToBeCompatible) { + val childPartitionings = outputPlan.children.map(_.outputPartitioning) + if (!Partitioning.allCompatible(childPartitionings)) { + fail(s"Partitionings are not compatible: $childPartitionings") } } outputPlan.children.zip(outputPlan.requiredChildDistribution).foreach { @@ -222,7 +223,42 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } } - test("EnsureRequirements ensures that children produce same number of partitions when required") { + test("EnsureRequirements ensures that child partitionings guarantee each other, if required") { + // Consider an operator that requires inputs that are clustered by two expressions (e.g. + // sort merge join where there are multiple columns in the equi-join condition) + val clusteringA = Literal(1) :: Nil + val clusteringB = Literal(2) :: Nil + val distribution = ClusteredDistribution(clusteringA ++ clusteringB) + // Say that the left and right inputs are each partitioned by _one_ of the two join columns: + val leftPartitioning = HashPartitioning(clusteringA, 1) + val rightPartitioning = HashPartitioning(clusteringB, 1) + // Individually, each input's partitioning satisfies the clustering distribution: + assert(leftPartitioning.satisfies(distribution)) + assert(rightPartitioning.satisfies(distribution)) + // However, these partitionings are not compatible with each other, so we still need to + // repartition both inputs prior to performing the join: + assert(!leftPartitioning.guarantees(rightPartitioning)) + assert(!rightPartitioning.guarantees(leftPartitioning)) + val inputPlan = DummyPlan( + children = Seq( + DummyPlan(outputPartitioning = HashPartitioning(clusteringA, 1)), + DummyPlan(outputPartitioning = HashPartitioning(clusteringB, 1)) + ), + requiresChildPartitioningsToBeCompatible = true, + requiredChildDistribution = Seq(distribution, distribution), + requiredChildOrdering = Seq(Seq.empty, Seq.empty) + ) + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) + if (outputPlan.collect { case Exchange(_, _) => true }.isEmpty) { + fail(s"Exchanges should have been added:\n$outputPlan") + } + } + + test("EnsureRequirements ensures that children produce same number of partitions, if required") { + // This is similar to the previous test, except it checks that partitionings are not compatible + // unless they produce the same number of partitions. This requirement is also enforced via + // assertions in Exchange. val clustering = Literal(1) :: Nil val distribution = ClusteredDistribution(clustering) val inputPlan = DummyPlan( @@ -230,7 +266,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummyPlan(outputPartitioning = HashPartitioning(clustering, 1)), DummyPlan(outputPartitioning = HashPartitioning(clustering, 2)) ), - requiresChildrenToProduceSameNumberOfPartitions = true, + requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) @@ -239,6 +275,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { + // Consider an operator that imposes both output distribution and ordering requirements on its + // children, such as sort sort merge join. If the distribution requirements are satisfied but + // the output ordering requirements are unsatisfied, then the planner should only add sorts and + // should not need to add additional shuffles / exchanges. val outputOrdering = Seq(SortOrder(Literal(1), Ascending)) val distribution = ClusteredDistribution(Literal(1) :: Nil) val inputPlan = DummyPlan( @@ -246,7 +286,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummyPlan(outputPartitioning = SinglePartition), DummyPlan(outputPartitioning = SinglePartition) ), - requiresChildrenToProduceSameNumberOfPartitions = true, + requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(outputOrdering, outputOrdering) ) @@ -265,7 +305,7 @@ private case class DummyPlan( override val children: Seq[SparkPlan] = Nil, override val outputOrdering: Seq[SortOrder] = Nil, override val outputPartitioning: Partitioning = UnknownPartitioning(0), - override val requiresChildrenToProduceSameNumberOfPartitions: Boolean = false, + override val requiresChildPartitioningsToBeCompatible: Boolean = false, override val requiredChildDistribution: Seq[Distribution] = Nil, override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil ) extends SparkPlan { From 4f08278cd8fde941a2885ffd07de78c0e0b4aeea Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 12:56:47 -0700 Subject: [PATCH 12/21] Fix the test by adding the compatibility check to EnsureRequirements --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 6d4d4834dfc7..4a53e90dabcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -212,9 +212,9 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } } - private def ensureChildNumPartitionsAgreementIfNecessary(operator: SparkPlan): SparkPlan = { + private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = { if (operator.requiresChildPartitioningsToBeCompatible) { - if (operator.children.map(_.outputPartitioning.numPartitions).distinct.size > 1) { + if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) { val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => val targetPartitioning = canonicalPartitioning(requiredDistribution) @@ -271,6 +271,6 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => - ensureDistributionAndOrdering(ensureChildNumPartitionsAgreementIfNecessary(operator)) + ensureDistributionAndOrdering(ensureChildPartitioningsAreCompatible(operator)) } } From 8dbc84523b1920ace83065c150f2f6ed345def44 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 14:40:50 -0700 Subject: [PATCH 13/21] Add even more tests. --- .../spark/sql/execution/PlannerSuite.scala | 61 ++++++++++++++++--- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2e6ea76ac65e..1c8e3ae347bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -209,6 +209,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // --- Unit tests of EnsureRequirements --------------------------------------------------------- + // When it comes to testing whether EnsureRequirements properly ensures distribution requirements, + // there two dimensions that need to be considered: are the child partitionings compatible and + // do they satisfy the distribution requirements? As a result, we need at least four test cases. + private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = { if (outputPlan.requiresChildPartitioningsToBeCompatible) { val childPartitionings = outputPlan.children.map(_.outputPartitioning) @@ -223,7 +227,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } } - test("EnsureRequirements ensures that child partitionings guarantee each other, if required") { + test("EnsureRequirements with incompatible child partitionings which satisfy distribution") { // Consider an operator that requires inputs that are clustered by two expressions (e.g. // sort merge join where there are multiple columns in the equi-join condition) val clusteringA = Literal(1) :: Nil @@ -241,8 +245,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { assert(!rightPartitioning.guarantees(leftPartitioning)) val inputPlan = DummyPlan( children = Seq( - DummyPlan(outputPartitioning = HashPartitioning(clusteringA, 1)), - DummyPlan(outputPartitioning = HashPartitioning(clusteringB, 1)) + DummyPlan(outputPartitioning = leftPartitioning), + DummyPlan(outputPartitioning = rightPartitioning) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -251,14 +255,13 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case Exchange(_, _) => true }.isEmpty) { - fail(s"Exchanges should have been added:\n$outputPlan") + fail(s"Exchange should have been added:\n$outputPlan") } } - test("EnsureRequirements ensures that children produce same number of partitions, if required") { + test("EnsureRequirements with child partitionings with different numbers of output partitions") { // This is similar to the previous test, except it checks that partitionings are not compatible - // unless they produce the same number of partitions. This requirement is also enforced via - // assertions in Exchange. + // unless they produce the same number of partitions. val clustering = Literal(1) :: Nil val distribution = ClusteredDistribution(clustering) val inputPlan = DummyPlan( @@ -274,6 +277,50 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { assertDistributionRequirementsAreSatisfied(outputPlan) } + test("EnsureRequirements with compatible child partitionings that do not satisfy distribution") { + val distribution = ClusteredDistribution(Literal(1) :: Nil) + // The left and right inputs have compatible partitionings but they do not satisfy the + // distribution because they are clustered on different columns. Thus, we need to shuffle. + val childPartitioning = HashPartitioning(Literal(2) :: Nil, 1) + assert(!childPartitioning.satisfies(distribution)) + val inputPlan = DummyPlan( + children = Seq( + DummyPlan(outputPartitioning = childPartitioning), + DummyPlan(outputPartitioning = childPartitioning) + ), + requiresChildPartitioningsToBeCompatible = true, + requiredChildDistribution = Seq(distribution, distribution), + requiredChildOrdering = Seq(Seq.empty, Seq.empty) + ) + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) + if (outputPlan.collect { case Exchange(_, _) => true }.isEmpty) { + fail(s"Exchange should have been added:\n$outputPlan") + } + } + + test("EnsureRequirements with compatible child partitionings that satisfy distribution") { + // In this case, all requirements are satisfied and no exchange should be added. + val distribution = ClusteredDistribution(Literal(1) :: Nil) + val childPartitioning = HashPartitioning(Literal(1) :: Nil, 5) + assert(childPartitioning.satisfies(distribution)) + val inputPlan = DummyPlan( + children = Seq( + DummyPlan(outputPartitioning = childPartitioning), + DummyPlan(outputPartitioning = childPartitioning) + ), + requiresChildPartitioningsToBeCompatible = true, + requiredChildDistribution = Seq(distribution, distribution), + requiredChildOrdering = Seq(Seq.empty, Seq.empty) + ) + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) + if (outputPlan.collect { case Exchange(_, _) => true }.nonEmpty) { + fail(s"Exchange should not have been added:\n$outputPlan") + } + } + + // This is a regression test for SPARK-9703 test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") { // Consider an operator that imposes both output distribution and ordering requirements on its // children, such as sort sort merge join. If the distribution requirements are satisfied but From 06aba0c98790d156892f1f5c0a6b506b0d68a1c9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 14:54:14 -0700 Subject: [PATCH 14/21] Add more comments --- .../apache/spark/sql/execution/Exchange.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4a53e90dabcb..01628ebe5f1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -197,12 +197,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una * of input data meets the * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for * each operator by inserting [[Exchange]] Operators where required. Also ensure that the - * required input partition ordering requirements are met. + * input partition ordering requirements are met. */ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. private def numPartitions: Int = sqlContext.conf.numShufflePartitions + /** + * Given a required distribution, returns a partitioning that satisfies that distribution. + */ private def canonicalPartitioning(requiredDistribution: Distribution): Partitioning = { requiredDistribution match { case AllTuples => SinglePartition @@ -212,6 +215,19 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } } + /** + * Return true if all of the operator's children satisfy their output distribution requirements. + */ + private def childPartitioningsSatisfyDistributionRequirements(operator: SparkPlan): Boolean = { + operator.children.zip(operator.requiredChildDistribution).forall { + case (child, distribution) => child.outputPartitioning.satisfies(distribution) + } + } + + /** + * Given an operator, check whether the operator requires its children to have compatible + * output partitionings and add Exchanges to fix any detected incompatibilities. + */ private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = { if (operator.requiresChildPartitioningsToBeCompatible) { if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) { @@ -224,7 +240,9 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ Exchange(targetPartitioning, child) } } - operator.withNewChildren(newChildren) + val newOperator = operator.withNewChildren(newChildren) + assert(childPartitioningsSatisfyDistributionRequirements(newOperator)) + newOperator } else { operator } @@ -235,6 +253,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { + // Precondition: joins' children have compatible partitionings. + def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { if (child.outputPartitioning.satisfies(requiredDistribution)) { child From 642b0bb52de9063df99af9de19344572b39b148c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 15:22:24 -0700 Subject: [PATCH 15/21] Further expand comment / reasoning --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 01628ebe5f1d..32d2ff93db8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -253,9 +253,14 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - // Precondition: joins' children have compatible partitionings. - def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { + // A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible + // partitionings. Thus, we only need to check whether the output partitionings satisfy + // the required distribution. In the case where the children are all compatible, then they + // will either all satisfy the required distribution or will all fail to satisfy it:, since + // (A.guarantees(B) && B.satisfies(C)) => A.satisfies(C). + // Therefore, if all children are compatible then either all or none of them will shuffled to + // ensure that the distribution requirements are met. if (child.outputPartitioning.satisfies(requiredDistribution)) { child } else { From fee65c4b304c7c7bb5dfcc1f18507d12314f53e0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 6 Aug 2015 15:56:10 -0700 Subject: [PATCH 16/21] Further refinement to comments / reasoning --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 32d2ff93db8a..b5b9390b8812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -257,10 +257,13 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ // A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible // partitionings. Thus, we only need to check whether the output partitionings satisfy // the required distribution. In the case where the children are all compatible, then they - // will either all satisfy the required distribution or will all fail to satisfy it:, since - // (A.guarantees(B) && B.satisfies(C)) => A.satisfies(C). + // will either all satisfy the required distribution or will all fail to satisfy it, since + // A.guarantees(B) implies that A and B satisfy the same set of distributions. // Therefore, if all children are compatible then either all or none of them will shuffled to // ensure that the distribution requirements are met. + // + // Note that this reasoning implicitly assumes that operators which require compatible + // child partitionings have equivalent required distributions for those children. if (child.outputPartitioning.satisfies(requiredDistribution)) { child } else { From 18cddeb90856fb2448cdc3380065eadc4fe6092b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Aug 2015 19:44:00 -0700 Subject: [PATCH 17/21] Rename DummyPlan to DummySparkPlan. --- .../spark/sql/execution/PlannerSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 1c8e3ae347bc..d0e173d9508a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -243,10 +243,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // repartition both inputs prior to performing the join: assert(!leftPartitioning.guarantees(rightPartitioning)) assert(!rightPartitioning.guarantees(leftPartitioning)) - val inputPlan = DummyPlan( + val inputPlan = DummySparkPlan( children = Seq( - DummyPlan(outputPartitioning = leftPartitioning), - DummyPlan(outputPartitioning = rightPartitioning) + DummySparkPlan(outputPartitioning = leftPartitioning), + DummySparkPlan(outputPartitioning = rightPartitioning) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -264,10 +264,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // unless they produce the same number of partitions. val clustering = Literal(1) :: Nil val distribution = ClusteredDistribution(clustering) - val inputPlan = DummyPlan( + val inputPlan = DummySparkPlan( children = Seq( - DummyPlan(outputPartitioning = HashPartitioning(clustering, 1)), - DummyPlan(outputPartitioning = HashPartitioning(clustering, 2)) + DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 1)), + DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 2)) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -283,10 +283,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // distribution because they are clustered on different columns. Thus, we need to shuffle. val childPartitioning = HashPartitioning(Literal(2) :: Nil, 1) assert(!childPartitioning.satisfies(distribution)) - val inputPlan = DummyPlan( + val inputPlan = DummySparkPlan( children = Seq( - DummyPlan(outputPartitioning = childPartitioning), - DummyPlan(outputPartitioning = childPartitioning) + DummySparkPlan(outputPartitioning = childPartitioning), + DummySparkPlan(outputPartitioning = childPartitioning) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -304,10 +304,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { val distribution = ClusteredDistribution(Literal(1) :: Nil) val childPartitioning = HashPartitioning(Literal(1) :: Nil, 5) assert(childPartitioning.satisfies(distribution)) - val inputPlan = DummyPlan( + val inputPlan = DummySparkPlan( children = Seq( - DummyPlan(outputPartitioning = childPartitioning), - DummyPlan(outputPartitioning = childPartitioning) + DummySparkPlan(outputPartitioning = childPartitioning), + DummySparkPlan(outputPartitioning = childPartitioning) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -328,10 +328,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // should not need to add additional shuffles / exchanges. val outputOrdering = Seq(SortOrder(Literal(1), Ascending)) val distribution = ClusteredDistribution(Literal(1) :: Nil) - val inputPlan = DummyPlan( + val inputPlan = DummySparkPlan( children = Seq( - DummyPlan(outputPartitioning = SinglePartition), - DummyPlan(outputPartitioning = SinglePartition) + DummySparkPlan(outputPartitioning = SinglePartition), + DummySparkPlan(outputPartitioning = SinglePartition) ), requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), @@ -348,7 +348,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { } // Used for unit-testing EnsureRequirements -private case class DummyPlan( +private case class DummySparkPlan( override val children: Seq[SparkPlan] = Nil, override val outputOrdering: Seq[SortOrder] = Nil, override val outputPartitioning: Partitioning = UnknownPartitioning(0), From 1307c50ec58551ae03339a65875df9d22840ea1a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Aug 2015 19:55:16 -0700 Subject: [PATCH 18/21] Update conditions for requiring child compatibility. --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 5 ++++- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 6 ------ .../spark/sql/execution/joins/LeftSemiJoinHash.scala | 2 -- .../spark/sql/execution/joins/ShuffledHashJoin.scala | 2 -- .../sql/execution/joins/ShuffledHashOuterJoin.scala | 2 -- .../apache/spark/sql/execution/joins/SortMergeJoin.scala | 2 -- .../org/apache/spark/sql/execution/PlannerSuite.scala | 9 ++------- 7 files changed, 6 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 06cb997e21ab..82867b466627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -222,7 +222,10 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ * output partitionings and add Exchanges to fix any detected incompatibilities. */ private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = { - if (operator.requiresChildPartitioningsToBeCompatible) { + // If an operator has multiple children and the operator requires a specific child output + // distribution then we need to ensure that all children have compatible output partitionings. + if (operator.children.length > 1 + && operator.requiredChildDistribution.toSet != Set(UnspecifiedDistribution)) { if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) { val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 16a1e4c3e61f..1915496d1620 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -124,12 +124,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) - /** - * Specifies whether this operator requires all of its children to have [[outputPartitioning]]s - * that are compatible with each other. - */ - def requiresChildPartitioningsToBeCompatible: Boolean = false - /** Specifies whether this operator outputs UnsafeRows */ def outputsUnsafeRows: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 6790eaa85e88..68ccd34d8ed9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -42,8 +42,6 @@ case class LeftSemiJoinHash( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildPartitioningsToBeCompatible: Boolean = true - protected override def doExecute(): RDD[InternalRow] = { right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) => if (condition.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 741ca402436c..c923dc837c44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -46,8 +46,6 @@ case class ShuffledHashJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildPartitioningsToBeCompatible: Boolean = true - protected override def doExecute(): RDD[InternalRow] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index 8689ae2ef660..6a8c35efca8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -44,8 +44,6 @@ case class ShuffledHashOuterJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildPartitioningsToBeCompatible: Boolean = true - override def outputPartitioning: Partitioning = joinType match { case LeftOuter => left.outputPartitioning case RightOuter => right.outputPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index c928bea7059b..4ae23c186cf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -48,8 +48,6 @@ case class SortMergeJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def requiresChildPartitioningsToBeCompatible: Boolean = true - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) override def requiredChildOrdering: Seq[Seq[SortOrder]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index d0e173d9508a..f7aac569ad46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -214,7 +214,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { // do they satisfy the distribution requirements? As a result, we need at least four test cases. private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = { - if (outputPlan.requiresChildPartitioningsToBeCompatible) { + if (outputPlan.children.length > 1 + && outputPlan.requiredChildDistribution.toSet != Set(UnspecifiedDistribution)) { val childPartitionings = outputPlan.children.map(_.outputPartitioning) if (!Partitioning.allCompatible(childPartitionings)) { fail(s"Partitionings are not compatible: $childPartitionings") @@ -248,7 +249,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummySparkPlan(outputPartitioning = leftPartitioning), DummySparkPlan(outputPartitioning = rightPartitioning) ), - requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) @@ -269,7 +269,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 1)), DummySparkPlan(outputPartitioning = HashPartitioning(clustering, 2)) ), - requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) @@ -288,7 +287,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummySparkPlan(outputPartitioning = childPartitioning), DummySparkPlan(outputPartitioning = childPartitioning) ), - requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) @@ -309,7 +307,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummySparkPlan(outputPartitioning = childPartitioning), DummySparkPlan(outputPartitioning = childPartitioning) ), - requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(Seq.empty, Seq.empty) ) @@ -333,7 +330,6 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { DummySparkPlan(outputPartitioning = SinglePartition), DummySparkPlan(outputPartitioning = SinglePartition) ), - requiresChildPartitioningsToBeCompatible = true, requiredChildDistribution = Seq(distribution, distribution), requiredChildOrdering = Seq(outputOrdering, outputOrdering) ) @@ -352,7 +348,6 @@ private case class DummySparkPlan( override val children: Seq[SparkPlan] = Nil, override val outputOrdering: Seq[SortOrder] = Nil, override val outputPartitioning: Partitioning = UnknownPartitioning(0), - override val requiresChildPartitioningsToBeCompatible: Boolean = false, override val requiredChildDistribution: Seq[Distribution] = Nil, override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil ) extends SparkPlan { From 8784bd9671627a5a16c2284dc9cccda2acf0f94c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 7 Aug 2015 20:40:36 -0700 Subject: [PATCH 19/21] Giant comment explaining compatibleWith vs. guarantees --- .../plans/physical/partitioning.scala | 91 ++++++++++++++++--- .../apache/spark/sql/execution/Exchange.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 4 +- 3 files changed, 83 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 97cf83d897a8..d36947e6d538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { def clustering: Set[Expression] = ordering.map(_.child).toSet } +/** + * Describes how an operator's output is split across partitions. The `satisfies`, + * `compatibleWith`, and `guarantees` methods describe relationships between child partitionings, + * target partitionings, and [[Distribution]]s. These relations are described more precisely in + * their individual method docs, but at a high level: + * + * - `satisfies` is a relationship between partitionings and distributions. + * - `compatibleWith` is relationships between an operator's child output partitionings. + * - `guarantees` is a relationship between a child's existing output partitioning and a target + * output partitioning. + * + * Diagrammatically: + * + * +--------------+ + * | Distribution | + * +--------------+ + * ^ + * | + * satisfies + * | + * +--------------+ +--------------+ + * | Child | | Target | + * +----| Partitioning |----guarantees--->| Partitioning | + * | +--------------+ +--------------+ + * | ^ + * | | + * | compatibleWith + * | | + * +------------+ + * + */ sealed trait Partitioning { /** Returns the number of partitions that the data is split across */ val numPartitions: Int @@ -87,12 +118,50 @@ sealed trait Partitioning { */ def satisfies(required: Distribution): Boolean + /** + * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees + * the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning + * the child's output according to `B` will be unnecessary. `guarantees` is used as a performance + * optimization to allow the exchange planner to avoid redundant repartitionings. By default, + * a partitioning only guarantees partitionings that are equal to itself (i.e. the same number + * of partitions, same strategy (range or hash), etc). + * + * In order to enable more aggressive optimization, this strict equality check can be relaxed. + * For example, say that the planner needs to repartition all of an operator's children so that + * they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children + * to have the [[SinglePartition]] partitioning. If one of the operator's children already happens + * to be hash-partitioned with a single partition then we do not need to re-shuffle this child; + * this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees` + * [[SinglePartition]]. + * + * The SinglePartition example given above is not particularly interesting; guarantees' real + * value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion + * of null-safe partitionings, under which partitionings can specify whether rows whose + * partitioning keys contain null values will be grouped into the same partition or whether they + * will have an unknown / random distribution. If a partitioning does not require nulls to be + * clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered + * partitioning. The converse is not true, however: a partitioning which clusters nulls cannot + * be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a + * symmetric relation. + * + * Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows + * produced by `A` could have also been produced by `B`. + */ + def guarantees(other: Partitioning): Boolean = this == other + /** * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] * guarantees the same partitioning scheme described by `other`. + * + * Compatibility of partitionings is only checked for operators that have multiple children + * and that require a specific child output [[Distribution]], such as joins. + * + * Intuitively, partitionings are compatible if they route the same partitioning key to the same + * partition. For instance, two hash partitionings are only compatible if they produce the same + * number of output partitionings and hash records according to the same hash function and + * same partitioning key schema. */ - // TODO: Add an example once we have the `nullSafe` concept. - def guarantees(other: Partitioning): Boolean + def compatibleWith(other: Partitioning): Boolean } object Partitioning { @@ -102,10 +171,10 @@ object Partitioning { case Seq(a) => true case Seq(a, b) => if (a.numPartitions != b.numPartitions) { - assert(!a.guarantees(b) && !b.guarantees(a)) + assert(!a.compatibleWith(b) && !b.compatibleWith(a)) false } else { - a.guarantees(b) && b.guarantees(a) + a.compatibleWith(b) && b.compatibleWith(a) } }.forall(_ == true) } @@ -117,7 +186,7 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { case _ => false } - override def guarantees(other: Partitioning): Boolean = false + override def compatibleWith(other: Partitioning): Boolean = false } case object SinglePartition extends Partitioning { @@ -125,7 +194,7 @@ case object SinglePartition extends Partitioning { override def satisfies(required: Distribution): Boolean = true - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case SinglePartition => true case _ => false } @@ -136,7 +205,7 @@ case object BroadcastPartitioning extends Partitioning { override def satisfies(required: Distribution): Boolean = true - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case BroadcastPartitioning => true case _ => false } @@ -163,7 +232,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) case _ => false } - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case o: HashPartitioning => this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions case _ => false @@ -201,7 +270,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) case _ => false } - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case o: RangePartitioning => this == o case _ => false } @@ -248,8 +317,8 @@ case class PartitioningCollection(partitionings: Seq[Partitioning]) * Returns true if any `partitioning` of this collection guarantees * the given [[Partitioning]]. */ - override def guarantees(other: Partitioning): Boolean = - partitionings.exists(_.guarantees(other)) + override def compatibleWith(other: Partitioning): Boolean = + partitionings.exists(_.compatibleWith(other)) override def toString: String = { partitionings.map(_.toString).mkString("(", " or ", ")") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 82867b466627..123417e28e37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => val targetPartitioning = canonicalPartitioning(requiredDistribution) - if (child.outputPartitioning.guarantees(targetPartitioning)) { + if (child.outputPartitioning.compatibleWith(targetPartitioning)) { child } else { Exchange(targetPartitioning, child) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f7aac569ad46..5582caa0d366 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -242,8 +242,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { assert(rightPartitioning.satisfies(distribution)) // However, these partitionings are not compatible with each other, so we still need to // repartition both inputs prior to performing the join: - assert(!leftPartitioning.guarantees(rightPartitioning)) - assert(!rightPartitioning.guarantees(leftPartitioning)) + assert(!leftPartitioning.compatibleWith(rightPartitioning)) + assert(!rightPartitioning.compatibleWith(leftPartitioning)) val inputPlan = DummySparkPlan( children = Seq( DummySparkPlan(outputPartitioning = leftPartitioning), From 0983f75d8156a69ca1c04af49dd6f3eb57c27a88 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 8 Aug 2015 23:29:16 -0700 Subject: [PATCH 20/21] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning. --- .../plans/physical/partitioning.scala | 73 +++++++++++-------- .../apache/spark/sql/execution/Exchange.scala | 2 +- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index d36947e6d538..5a89a90b735a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -76,8 +76,8 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { } /** - * Describes how an operator's output is split across partitions. The `satisfies`, - * `compatibleWith`, and `guarantees` methods describe relationships between child partitionings, + * Describes how an operator's output is split across partitions. The `compatibleWith`, + * `guarantees`, and `satisfies` methods describe relationships between child partitionings, * target partitionings, and [[Distribution]]s. These relations are described more precisely in * their individual method docs, but at a high level: * @@ -118,6 +118,23 @@ sealed trait Partitioning { */ def satisfies(required: Distribution): Boolean + /** + * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] + * guarantees the same partitioning scheme described by `other`. + * + * Compatibility of partitionings is only checked for operators that have multiple children + * and that require a specific child output [[Distribution]], such as joins. + * + * Intuitively, partitionings are compatible if they route the same partitioning key to the same + * partition. For instance, two hash partitionings are only compatible if they produce the same + * number of output partitionings and hash records according to the same hash function and + * same partitioning key schema. + * + * Put another way, two partitionings are compatible with each other if they satisfy all of the + * same distribution guarantees. + */ + def compatibleWith(other: Partitioning): Boolean + /** * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees * the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning @@ -148,20 +165,6 @@ sealed trait Partitioning { * produced by `A` could have also been produced by `B`. */ def guarantees(other: Partitioning): Boolean = this == other - - /** - * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] - * guarantees the same partitioning scheme described by `other`. - * - * Compatibility of partitionings is only checked for operators that have multiple children - * and that require a specific child output [[Distribution]], such as joins. - * - * Intuitively, partitionings are compatible if they route the same partitioning key to the same - * partition. For instance, two hash partitionings are only compatible if they produce the same - * number of output partitionings and hash records according to the same hash function and - * same partitioning key schema. - */ - def compatibleWith(other: Partitioning): Boolean } object Partitioning { @@ -187,6 +190,8 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { } override def compatibleWith(other: Partitioning): Boolean = false + + override def guarantees(other: Partitioning): Boolean = false } case object SinglePartition extends Partitioning { @@ -194,21 +199,9 @@ case object SinglePartition extends Partitioning { override def satisfies(required: Distribution): Boolean = true - override def compatibleWith(other: Partitioning): Boolean = other match { - case SinglePartition => true - case _ => false - } -} - -case object BroadcastPartitioning extends Partitioning { - val numPartitions = 1 - - override def satisfies(required: Distribution): Boolean = true + override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1 - override def compatibleWith(other: Partitioning): Boolean = other match { - case BroadcastPartitioning => true - case _ => false - } + override def guarantees(other: Partitioning): Boolean = other.numPartitions == 1 } /** @@ -237,6 +230,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions case _ => false } + + override def guarantees(other: Partitioning): Boolean = other match { + case o: HashPartitioning => + this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions + case _ => false + } } /** @@ -274,6 +273,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) case o: RangePartitioning => this == o case _ => false } + + override def guarantees(other: Partitioning): Boolean = other match { + case o: RangePartitioning => this == o + case _ => false + } } /** @@ -314,12 +318,19 @@ case class PartitioningCollection(partitionings: Seq[Partitioning]) partitionings.exists(_.satisfies(required)) /** - * Returns true if any `partitioning` of this collection guarantees + * Returns true if any `partitioning` of this collection is compatible with * the given [[Partitioning]]. */ override def compatibleWith(other: Partitioning): Boolean = partitionings.exists(_.compatibleWith(other)) + /** + * Returns true if any `partitioning` of this collection guarantees + * the given [[Partitioning]]. + */ + override def guarantees(other: Partitioning): Boolean = + partitionings.exists(_.guarantees(other)) + override def toString: String = { partitionings.map(_.toString).mkString("(", " or ", ")") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 123417e28e37..82867b466627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => val targetPartitioning = canonicalPartitioning(requiredDistribution) - if (child.outputPartitioning.compatibleWith(targetPartitioning)) { + if (child.outputPartitioning.guarantees(targetPartitioning)) { child } else { Exchange(targetPartitioning, child) From 38006e75650f7f0a827dd4b00fc2984786a2244f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 9 Aug 2015 00:05:07 -0700 Subject: [PATCH 21/21] Rewrite EnsureRequirements _yet again_ to make things even simpler --- .../apache/spark/sql/execution/Exchange.scala | 89 ++++++------------- 1 file changed, 25 insertions(+), 64 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 82867b466627..b89e634761eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -208,66 +208,37 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } } - /** - * Return true if all of the operator's children satisfy their output distribution requirements. - */ - private def childPartitioningsSatisfyDistributionRequirements(operator: SparkPlan): Boolean = { - operator.children.zip(operator.requiredChildDistribution).forall { - case (child, distribution) => child.outputPartitioning.satisfies(distribution) - } - } + private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { + val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution + val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering + var children: Seq[SparkPlan] = operator.children - /** - * Given an operator, check whether the operator requires its children to have compatible - * output partitionings and add Exchanges to fix any detected incompatibilities. - */ - private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = { - // If an operator has multiple children and the operator requires a specific child output - // distribution then we need to ensure that all children have compatible output partitionings. - if (operator.children.length > 1 - && operator.requiredChildDistribution.toSet != Set(UnspecifiedDistribution)) { - if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) { - val newChildren = operator.children.zip(operator.requiredChildDistribution).map { - case (child, requiredDistribution) => - val targetPartitioning = canonicalPartitioning(requiredDistribution) - if (child.outputPartitioning.guarantees(targetPartitioning)) { - child - } else { - Exchange(targetPartitioning, child) - } - } - val newOperator = operator.withNewChildren(newChildren) - assert(childPartitioningsSatisfyDistributionRequirements(newOperator)) - newOperator + // Ensure that the operator's children satisfy their output distribution requirements: + children = children.zip(requiredChildDistributions).map { case (child, distribution) => + if (child.outputPartitioning.satisfies(distribution)) { + child } else { - operator + Exchange(canonicalPartitioning(distribution), child) } - } else { - operator } - } - - private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { - // A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible - // partitionings. Thus, we only need to check whether the output partitionings satisfy - // the required distribution. In the case where the children are all compatible, then they - // will either all satisfy the required distribution or will all fail to satisfy it, since - // A.guarantees(B) implies that A and B satisfy the same set of distributions. - // Therefore, if all children are compatible then either all or none of them will shuffled to - // ensure that the distribution requirements are met. - // - // Note that this reasoning implicitly assumes that operators which require compatible - // child partitionings have equivalent required distributions for those children. - if (child.outputPartitioning.satisfies(requiredDistribution)) { - child - } else { - Exchange(canonicalPartitioning(requiredDistribution), child) + // If the operator has multiple children and specifies child output distributions (e.g. join), + // then the children's output partitionings must be compatible: + if (children.length > 1 + && requiredChildDistributions.toSet != Set(UnspecifiedDistribution) + && !Partitioning.allCompatible(children.map(_.outputPartitioning))) { + children = children.zip(requiredChildDistributions).map { case (child, distribution) => + val targetPartitioning = canonicalPartitioning(distribution) + if (child.outputPartitioning.guarantees(targetPartitioning)) { + child + } else { + Exchange(targetPartitioning, child) + } } } - def addSortIfNecessary(child: SparkPlan, requiredOrdering: Seq[SortOrder]): SparkPlan = { + // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: + children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => if (requiredOrdering.nonEmpty) { // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort. val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min @@ -281,20 +252,10 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } } - val children = operator.children - val requiredChildDistribution = operator.requiredChildDistribution - val requiredChildOrdering = operator.requiredChildOrdering - assert(children.length == requiredChildDistribution.length) - assert(children.length == requiredChildOrdering.length) - val newChildren = (children, requiredChildDistribution, requiredChildOrdering).zipped.map { - case (child, requiredDistribution, requiredOrdering) => - addSortIfNecessary(addShuffleIfNecessary(child, requiredDistribution), requiredOrdering) - } - operator.withNewChildren(newChildren) + operator.withNewChildren(children) } def apply(plan: SparkPlan): SparkPlan = plan.transformUp { - case operator: SparkPlan => - ensureDistributionAndOrdering(ensureChildPartitioningsAreCompatible(operator)) + case operator: SparkPlan => ensureDistributionAndOrdering(operator) } }