From a3519a64e2c69299bb14a1ba62091d0c1bea80c1 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Mon, 19 Oct 2020 10:55:38 -0700 Subject: [PATCH 1/7] fix --- .../sql/catalyst/optimizer/Optimizer.scala | 14 ++++- .../optimizer/EliminateSortsSuite.scala | 59 +++++++++++++++++-- .../spark/sql/execution/PlannerSuite.scala | 13 ---- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f3f64031843e..4fc9ed54ad0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1020,7 +1020,9 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * Note that changes in the final output ordering may affect the file size (SPARK-32318). * This rule handles the following cases: * 1) if the sort order is empty or the sort order does not have any reference - * 2) if the child is already sorted + * 2) if the Sort operator is a local sort and the child is already sorted, or + * the Sort operator is a global sort with the child being another global Sort operator or + * a Range operator that satisfies the parent sort orders. * 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or * RepartitionByExpression (with deterministic expressions) operators * 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or @@ -1035,8 +1037,14 @@ object EliminateSorts extends Rule[LogicalPlan] { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) child else s.copy(order = newOrders) - case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => - child + case s @ Sort(orders, global, child) + if SortOrder.orderingSatisfies(child.outputOrdering, orders) => + (global, child) match { + case (false, _) => child + case (true, r: Range) => r + case (true, s @ Sort(_, true, _)) => s + case (true, _) => s.copy(child = recursiveRemoveSort(child)) + } case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) => j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index cc351e365113..385a6e4f8087 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -99,14 +99,22 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("remove redundant order by") { + test("remove redundant local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) - val unnecessaryReordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst) + val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst) val optimized = Optimize.execute(unnecessaryReordered.analyze) val correctAnswer = orderedPlan.limit(2).select('a).analyze comparePlans(Optimize.execute(optimized), correctAnswer) } + test("should not remove global sort") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) + val reordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst) + val optimized = Optimize.execute(reordered.analyze) + val correctAnswer = reordered.analyze + comparePlans(Optimize.execute(optimized), correctAnswer) + } + test("do not remove sort if the order is different") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val reorderedDifferently = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc) @@ -115,22 +123,39 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("filters don't affect order") { + test("filters don't affect order for local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) - val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) val correctAnswer = orderedPlan.where('a > Literal(10)).analyze comparePlans(optimized, correctAnswer) } - test("limits don't affect order") { + test("should keep global sort when child is a filter operator with the same ordering") { + val projectPlan = testRelation.select('a, 'b) + val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) + val optimized = Optimize.execute(filteredAndReordered.analyze) + val correctAnswer = projectPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc).analyze + comparePlans(optimized, correctAnswer) + } + + test("limits don't affect order for local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) - val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) val correctAnswer = orderedPlan.limit(Literal(10)).analyze comparePlans(optimized, correctAnswer) } + test("should keep global sort when child is a limit operator with the same ordering") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) + val optimized = Optimize.execute(filteredAndReordered.analyze) + val correctAnswer = filteredAndReordered.analyze + comparePlans(optimized, correctAnswer) + } + test("different sorts are not simplified if limit is in between") { val orderedPlan = testRelation.select('a, 'b).orderBy('b.desc).limit(Literal(10)) .orderBy('a.asc) @@ -333,4 +358,26 @@ class EliminateSortsSuite extends AnalysisTest { val correctAnswer = PushDownOptimizer.execute(noOrderByPlan.analyze) comparePlans(optimized, correctAnswer) } + + test("remove two consecutive global sorts with same ordering") { + Seq( + (testRelation.orderBy('a.asc).orderBy('a.asc), testRelation.orderBy('a.asc)), + (testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc), + testRelation.orderBy('a.asc, 'b.desc)) + ).foreach { case (ordered, answer) => + val optimized = Optimize.execute(ordered.analyze) + comparePlans(optimized, answer.analyze) + } + } + + test("should keep global sort when child is a local sort with the same ordering") { + val correctAnswer = testRelation.orderBy('a.asc).analyze + Seq( + testRelation.sortBy('a.asc).orderBy('a.asc), + testRelation.orderBy('a.asc).sortBy('a.asc).orderBy('a.asc) + ).foreach { ordered => + val optimized = Optimize.execute(ordered.analyze) + comparePlans(optimized, correctAnswer) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 048466b3d863..be29acb6d3a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -234,19 +234,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } - test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { - val query = testData.select('key, 'value).sort('key.desc).cache() - assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) - val resorted = query.sort('key.desc) - assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) - assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == - (1 to 100).reverse) - // with a different order, the sort is needed - val sortedAsc = query.sort('key) - assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.size == 1) - assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100)) - } - test("PartitioningCollection") { withTempView("normal", "small", "tiny") { testData.createOrReplaceTempView("normal") From 52a191b68c2283f0762bc53b8963ae8b5bf7377d Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Tue, 20 Oct 2020 14:11:28 -0700 Subject: [PATCH 2/7] address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 16 ++--- .../optimizer/EliminateSortsSuite.scala | 58 ++++++++++++++----- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4fc9ed54ad0a..4b115e37de3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1020,9 +1020,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * Note that changes in the final output ordering may affect the file size (SPARK-32318). * This rule handles the following cases: * 1) if the sort order is empty or the sort order does not have any reference - * 2) if the Sort operator is a local sort and the child is already sorted, or - * the Sort operator is a global sort with the child being another global Sort operator or - * a Range operator that satisfies the parent sort orders. + * 2) if the Sort operator is a local sort and the child is already sorted * 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or * RepartitionByExpression (with deterministic expressions) operators * 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or @@ -1033,18 +1031,12 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * function is order irrelevant */ object EliminateSorts extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) child else s.copy(order = newOrders) - case s @ Sort(orders, global, child) - if SortOrder.orderingSatisfies(child.outputOrdering, orders) => - (global, child) match { - case (false, _) => child - case (true, r: Range) => r - case (true, s @ Sort(_, true, _)) => s - case (true, _) => s.copy(child = recursiveRemoveSort(child)) - } + case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => + child case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) => j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 385a6e4f8087..4385ea18a892 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -99,20 +99,27 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("remove redundant local sort") { + test("SPARK-33183: remove redundant sortBy") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst) val optimized = Optimize.execute(unnecessaryReordered.analyze) val correctAnswer = orderedPlan.limit(2).select('a).analyze - comparePlans(Optimize.execute(optimized), correctAnswer) + comparePlans(optimized, correctAnswer) + } + + test("SPARK-33183: remove all redundant local sorts") { + val orderedPlan = testRelation.sortBy('a.asc).orderBy('a.asc).sortBy('a.asc) + val optimized = Optimize.execute(orderedPlan.analyze) + val correctAnswer = testRelation.orderBy('a.asc).analyze + comparePlans(optimized, correctAnswer) } - test("should not remove global sort") { + test("SPARK-33183: should not remove global sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val reordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst) val optimized = Optimize.execute(reordered.analyze) val correctAnswer = reordered.analyze - comparePlans(Optimize.execute(optimized), correctAnswer) + comparePlans(optimized, correctAnswer) } test("do not remove sort if the order is different") { @@ -123,7 +130,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("filters don't affect order for local sort") { + test("SPARK-33183: filters should not affect order for local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) @@ -131,7 +138,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("should keep global sort when child is a filter operator with the same ordering") { + test("SPARK-33183: should not remove global sort with filter operators") { val projectPlan = testRelation.select('a, 'b) val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) @@ -140,7 +147,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("limits don't affect order for local sort") { + test("SPARK-33183: limits should not affect order for local sort") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) @@ -148,7 +155,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("should keep global sort when child is a limit operator with the same ordering") { + test("SPARK-33183: should not remove global sort with limit operators") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) @@ -164,11 +171,11 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("range is already sorted") { + test("SPARK-33183: should not remove global sort with range operator") { val inputPlan = Range(1L, 1000L, 1, 10) val orderedPlan = inputPlan.orderBy('id.asc) val optimized = Optimize.execute(orderedPlan.analyze) - val correctAnswer = inputPlan.analyze + val correctAnswer = orderedPlan.analyze comparePlans(optimized, correctAnswer) val reversedPlan = inputPlan.orderBy('id.desc) @@ -179,10 +186,18 @@ class EliminateSortsSuite extends AnalysisTest { val negativeStepInputPlan = Range(10L, 1L, -1, 10) val negativeStepOrderedPlan = negativeStepInputPlan.orderBy('id.desc) val negativeStepOptimized = Optimize.execute(negativeStepOrderedPlan.analyze) - val negativeStepCorrectAnswer = negativeStepInputPlan.analyze + val negativeStepCorrectAnswer = negativeStepOrderedPlan.analyze comparePlans(negativeStepOptimized, negativeStepCorrectAnswer) } + test("SPARK-33183: remove local sort with range operator") { + val inputPlan = Range(1L, 1000L, 1, 10) + val orderedPlan = inputPlan.sortBy('id.asc) + val optimized = Optimize.execute(orderedPlan.analyze) + val correctAnswer = inputPlan.analyze + comparePlans(optimized, correctAnswer) + } + test("sort should not be removed when there is a node which doesn't guarantee any order") { val orderedPlan = testRelation.select('a, 'b) val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) @@ -359,18 +374,31 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("remove two consecutive global sorts with same ordering") { + test("SPARK-33183: remove consecutive global sorts with the same ordering") { Seq( (testRelation.orderBy('a.asc).orderBy('a.asc), testRelation.orderBy('a.asc)), - (testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc), - testRelation.orderBy('a.asc, 'b.desc)) + (testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc), testRelation.orderBy('a.asc)) ).foreach { case (ordered, answer) => val optimized = Optimize.execute(ordered.analyze) comparePlans(optimized, answer.analyze) } } - test("should keep global sort when child is a local sort with the same ordering") { + test("SPARK-33183: remove consecutive local sorts with the same ordering") { + val orderedPlan = testRelation.sortBy('a.asc).sortBy('a.asc).sortBy('a.asc) + val optimized = Optimize.execute(orderedPlan.analyze) + val correctAnswer = testRelation.sortBy('a.asc).analyze + comparePlans(optimized, correctAnswer) + } + + test("SPARK-33183: remove consecutive local sorts with different ordering") { + val orderedPlan = testRelation.sortBy('b.asc).sortBy('a.desc).sortBy('a.asc) + val optimized = Optimize.execute(orderedPlan.analyze) + val correctAnswer = testRelation.sortBy('a.asc).analyze + comparePlans(optimized, correctAnswer) + } + + test("SPARK-33183: should keep global sort when child is a local sort with the same ordering") { val correctAnswer = testRelation.orderBy('a.asc).analyze Seq( testRelation.sortBy('a.asc).orderBy('a.asc), From 118ba4ccab7d12b1e7e0936ce03ac51dc87ffff2 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Tue, 20 Oct 2020 14:24:17 -0700 Subject: [PATCH 3/7] indentation --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4b115e37de3e..9043eb4a36c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1036,7 +1036,7 @@ object EliminateSorts extends Rule[LogicalPlan] { val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) child else s.copy(order = newOrders) case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => - child + child case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) => j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight)) From 5997320faa8ad0d6ebbeb13c926b092ab2653c62 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Thu, 22 Oct 2020 10:58:25 -0700 Subject: [PATCH 4/7] add physical rule --- .../sql/catalyst/optimizer/Optimizer.scala | 2 + .../apache/spark/sql/internal/SQLConf.scala | 7 + .../optimizer/EliminateSortsSuite.scala | 4 +- .../spark/sql/execution/QueryExecution.scala | 1 + .../sql/execution/RemoveRedundantSorts.scala | 43 ++++++ .../adaptive/AdaptiveSparkPlanExec.scala | 2 + .../execution/RemoveRedundantSortsSuite.scala | 136 ++++++++++++++++++ 7 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9043eb4a36c8..75cc4174d3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1031,6 +1031,8 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * function is order irrelevant */ object EliminateSorts extends Rule[LogicalPlan] { + // transformUp is needed here to ensure idempotency of this rule when removing consecutive + // local sorts. def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 302439839996..d84dfcc8f308 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1253,6 +1253,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts") + .internal() + .doc("Whether to remove redundant physical sort node") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val STATE_STORE_PROVIDER_CLASS = buildConf("spark.sql.streaming.stateStore.providerClass") .internal() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 4385ea18a892..0225411668f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -130,7 +130,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("SPARK-33183: filters should not affect order for local sort") { + test("SPARK-33183: remove top level local sort with filter operators") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc) val optimized = Optimize.execute(filteredAndReordered.analyze) @@ -138,7 +138,7 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("SPARK-33183: should not remove global sort with filter operators") { + test("SPARK-33183: keep top level global sort with filter operators") { val projectPlan = testRelation.select('a, 'b) val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index c37e1e92c857..b998430c1602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -343,6 +343,7 @@ object QueryExecution { PlanDynamicPruningFilters, PlanSubqueries, RemoveRedundantProjects, + RemoveRedundantSorts, EnsureRequirements, DisableUnnecessaryBucketedScan, ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala new file mode 100644 index 000000000000..40601d9e6478 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf + +/** + * Remove redundant SortExec node from the spark plan. A sort node is redundant when + * its child satisfies both its sort orders and its required child distribution. + */ +object RemoveRedundantSorts extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED)) { + plan + } else { + removeSorts(plan) + } + } + + private def removeSorts(plan: SparkPlan): SparkPlan = plan transform { + case s @ SortExec(orders, _, child, _) + if SortOrder.orderingSatisfies(child.outputOrdering, orders) && + child.outputPartitioning.satisfies(s.requiredChildDistribution.head) => + child + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index d30e16276b9f..a4a58dfe1de5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -83,6 +83,7 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new AQEOptimizer(conf) @transient private val removeRedundantProjects = RemoveRedundantProjects + @transient private val removeRedundantSorts = RemoveRedundantSorts @transient private val ensureRequirements = EnsureRequirements // A list of physical plan rules to be applied before creation of query stages. The physical @@ -90,6 +91,7 @@ case class AdaptiveSparkPlanExec( // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( removeRedundantProjects, + removeRedundantSorts, ensureRequirements ) ++ context.session.sessionState.queryStagePrepRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala new file mode 100644 index 000000000000..d5966dcd59f1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + + +abstract class RemoveRedundantSortsSuiteBase + extends QueryTest + with SharedSparkSession + with AdaptiveSparkPlanHelper { + import testImplicits._ + + private def checkNumSorts(df: DataFrame, count: Int): Unit = { + val plan = df.queryExecution.executedPlan + assert(collectWithSubqueries(plan) { case s: SortExec => s }.length == count) + } + + private def checkSorts(query: String, enabledCount: Int, disabledCount: Int): Unit = { + withSQLConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED.key -> "true") { + val df = sql(query) + checkNumSorts(df, enabledCount) + val result = df.collect() + withSQLConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED.key -> "false") { + val df = sql(query) + checkNumSorts(df, disabledCount) + checkAnswer(df, result) + } + } + } + + test("remove redundant sorts with limit") { + withTempView("t") { + spark.range(100).select('id as "key").createOrReplaceTempView("t") + val query = + """ + |SELECT key FROM + | (SELECT key FROM t WHERE key > 10 ORDER BY key DESC LIMIT 10) + |ORDER BY key DESC + |""".stripMargin + checkSorts(query, 0, 1) + } + } + + test("remove redundant sorts with broadcast hash join") { + withTempView("t1", "t2") { + spark.range(1000).select('id as "key").createOrReplaceTempView("t1") + spark.range(1000).select('id as "key").createOrReplaceTempView("t2") + val queryTemplate = """ + |SELECT t1.key FROM + | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 + |%s + | (SELECT key FROM t2 WHERE key > 50 ORDER BY key DESC LIMIT 100) t2 + |ON t1.key = t2.key + |ORDER BY %s + """.stripMargin + + val innerJoinAsc = queryTemplate.format("JOIN", "t2.key ASC") + checkSorts(innerJoinAsc, 1, 1) + + val innerJoinDesc = queryTemplate.format("JOIN", "t2.key DESC") + checkSorts(innerJoinDesc, 0, 1) + + val innerJoinDesc1 = queryTemplate.format("JOIN", "t1.key DESC") + checkSorts(innerJoinDesc1, 1, 1) + + val leftOuterJoinDesc = queryTemplate.format("LEFT JOIN", "t1.key DESC") + checkSorts(leftOuterJoinDesc, 0, 1) + } + } + + test("remove redundant sorts with sort merge join") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTempView("t1", "t2") { + spark.range(1000).select('id as "key").createOrReplaceTempView("t1") + spark.range(1000).select('id as "key").createOrReplaceTempView("t2") + val query = """ + |SELECT t1.key FROM + | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 + |JOIN + | (SELECT key FROM t2 WHERE key > 50 ORDER BY key DESC LIMIT 100) t2 + |ON t1.key = t2.key + |ORDER BY t1.key + """.stripMargin + + val queryAsc = query + " ASC" + checkSorts(queryAsc, 2, 3) + + // Top level sort should only be eliminated if it's order is descending with SMJ. + val queryDesc = query + " DESC" + checkSorts(queryDesc, 3, 3) + } + } + } + + test("cached sorted data doesn't need to be re-sorted") { + withSQLConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED.key -> "true") { + val df = spark.range(1000).select('id as "key").sort('key.desc).cache() + val resorted = df.sort('key.desc) + val sortedAsc = df.sort('key.asc) + checkNumSorts(df, 0) + checkNumSorts(resorted, 0) + checkNumSorts(sortedAsc, 1) + val result = resorted.collect() + withSQLConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED.key -> "false") { + val resorted = df.sort('key.desc) + checkNumSorts(resorted, 1) + checkAnswer(resorted, result) + } + } + } +} + +class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase + with DisableAdaptiveExecutionSuite + +class RemoveRedundantSortsSuiteAE extends RemoveRedundantSortsSuiteBase + with EnableAdaptiveExecutionSuite From cec266f4dbc4c34e9e94f7cd4cde5fe7cc114495 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Thu, 22 Oct 2020 23:35:59 -0700 Subject: [PATCH 5/7] update --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 14 +++++++++----- .../catalyst/optimizer/EliminateSortsSuite.scala | 9 ++++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75cc4174d3f2..02552c442f1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1031,14 +1031,18 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * function is order irrelevant */ object EliminateSorts extends Rule[LogicalPlan] { - // transformUp is needed here to ensure idempotency of this rule when removing consecutive - // local sorts. - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally + + val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) - if (newOrders.isEmpty) child else s.copy(order = newOrders) + if (newOrders.isEmpty) { + applyLocally.lift(child).getOrElse(child) + } else { + s.copy(order = newOrders) + } case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => - child + applyLocally.lift(child).getOrElse(child) case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) => j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 0225411668f4..62deebd93075 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -99,7 +99,14 @@ class EliminateSortsSuite extends AnalysisTest { comparePlans(optimized, correctAnswer) } - test("SPARK-33183: remove redundant sortBy") { + test("SPARK-33183: remove consecutive no-op sorts") { + val plan = testRelation.orderBy().orderBy().orderBy() + val optimized = Optimize.execute(plan.analyze) + val correctAnswer = testRelation.analyze + comparePlans(optimized, correctAnswer) + } + + test("SPARK-33183: remove redundant sort by") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst) val optimized = Optimize.execute(unnecessaryReordered.analyze) From 290cb4d8a92c1a6a9632b6fd430b89dd58bac32b Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Fri, 23 Oct 2020 17:08:51 -0700 Subject: [PATCH 6/7] address comments --- .../execution/RemoveRedundantSortsSuite.scala | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index d5966dcd59f1..ed10537023a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -64,50 +64,49 @@ abstract class RemoveRedundantSortsSuiteBase withTempView("t1", "t2") { spark.range(1000).select('id as "key").createOrReplaceTempView("t1") spark.range(1000).select('id as "key").createOrReplaceTempView("t2") + val queryTemplate = """ - |SELECT t1.key FROM + |SELECT /*+ BROADCAST(%s) */ t1.key FROM | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 - |%s + |JOIN | (SELECT key FROM t2 WHERE key > 50 ORDER BY key DESC LIMIT 100) t2 |ON t1.key = t2.key |ORDER BY %s """.stripMargin - val innerJoinAsc = queryTemplate.format("JOIN", "t2.key ASC") + val innerJoinAsc = queryTemplate.format("t1", "t2.key ASC") checkSorts(innerJoinAsc, 1, 1) - val innerJoinDesc = queryTemplate.format("JOIN", "t2.key DESC") + val innerJoinDesc = queryTemplate.format("t1", "t2.key DESC") checkSorts(innerJoinDesc, 0, 1) - val innerJoinDesc1 = queryTemplate.format("JOIN", "t1.key DESC") + val innerJoinDesc1 = queryTemplate.format("t1", "t1.key DESC") checkSorts(innerJoinDesc1, 1, 1) - val leftOuterJoinDesc = queryTemplate.format("LEFT JOIN", "t1.key DESC") + val leftOuterJoinDesc = queryTemplate.format("t2", "t1.key DESC") checkSorts(leftOuterJoinDesc, 0, 1) } } test("remove redundant sorts with sort merge join") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withTempView("t1", "t2") { - spark.range(1000).select('id as "key").createOrReplaceTempView("t1") - spark.range(1000).select('id as "key").createOrReplaceTempView("t2") - val query = """ - |SELECT t1.key FROM - | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 - |JOIN - | (SELECT key FROM t2 WHERE key > 50 ORDER BY key DESC LIMIT 100) t2 - |ON t1.key = t2.key - |ORDER BY t1.key - """.stripMargin - - val queryAsc = query + " ASC" - checkSorts(queryAsc, 2, 3) - - // Top level sort should only be eliminated if it's order is descending with SMJ. - val queryDesc = query + " DESC" - checkSorts(queryDesc, 3, 3) - } + withTempView("t1", "t2") { + spark.range(1000).select('id as "key").createOrReplaceTempView("t1") + spark.range(1000).select('id as "key").createOrReplaceTempView("t2") + val query = """ + |SELECT /*+ MERGE(t1) */ t1.key FROM + | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 + |JOIN + | (SELECT key FROM t2 WHERE key > 50 ORDER BY key DESC LIMIT 100) t2 + |ON t1.key = t2.key + |ORDER BY t1.key + """.stripMargin + + val queryAsc = query + " ASC" + checkSorts(queryAsc, 2, 3) + + // Top level sort should only be eliminated if it's order is descending with SMJ. + val queryDesc = query + " DESC" + checkSorts(queryDesc, 3, 3) } } From cdc7dbe1f69d2cec912c536d388eb68756107d61 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Mon, 26 Oct 2020 11:36:07 -0700 Subject: [PATCH 7/7] update --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/execution/RemoveRedundantSorts.scala | 5 ++- .../execution/RemoveRedundantSortsSuite.scala | 35 ++++++++++++------- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 02552c442f1d..9519a56c2817 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1033,7 +1033,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { object EliminateSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally - val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { + private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala index 40601d9e6478..87c08ec865fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala @@ -23,7 +23,10 @@ import org.apache.spark.sql.internal.SQLConf /** * Remove redundant SortExec node from the spark plan. A sort node is redundant when - * its child satisfies both its sort orders and its required child distribution. + * its child satisfies both its sort orders and its required child distribution. Note + * this rule differs from the Optimizer rule EliminateSorts in that this rule also checks + * if the child satisfies the required distribution so that it is safe to remove not only a + * local sort but also a global sort when its child already satisfies required sort orders. */ object RemoveRedundantSorts extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index ed10537023a6..54c5a3344190 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -64,7 +64,7 @@ abstract class RemoveRedundantSortsSuiteBase withTempView("t1", "t2") { spark.range(1000).select('id as "key").createOrReplaceTempView("t1") spark.range(1000).select('id as "key").createOrReplaceTempView("t2") - + val queryTemplate = """ |SELECT /*+ BROADCAST(%s) */ t1.key FROM | (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1 @@ -74,17 +74,25 @@ abstract class RemoveRedundantSortsSuiteBase |ORDER BY %s """.stripMargin - val innerJoinAsc = queryTemplate.format("t1", "t2.key ASC") - checkSorts(innerJoinAsc, 1, 1) - - val innerJoinDesc = queryTemplate.format("t1", "t2.key DESC") - checkSorts(innerJoinDesc, 0, 1) - - val innerJoinDesc1 = queryTemplate.format("t1", "t1.key DESC") - checkSorts(innerJoinDesc1, 1, 1) - - val leftOuterJoinDesc = queryTemplate.format("t2", "t1.key DESC") - checkSorts(leftOuterJoinDesc, 0, 1) + // No sort should be removed since the stream side (t2) order DESC + // does not satisfy the required sort order ASC. + val buildLeftOrderByRightAsc = queryTemplate.format("t1", "t2.key ASC") + checkSorts(buildLeftOrderByRightAsc, 1, 1) + + // The top sort node should be removed since the stream side (t2) order DESC already + // satisfies the required sort order DESC. + val buildLeftOrderByRightDesc = queryTemplate.format("t1", "t2.key DESC") + checkSorts(buildLeftOrderByRightDesc, 0, 1) + + // No sort should be removed since the sort ordering from broadcast-hash join is based + // on the stream side (t2) and the required sort order is from t1. + val buildLeftOrderByLeftDesc = queryTemplate.format("t1", "t1.key DESC") + checkSorts(buildLeftOrderByLeftDesc, 1, 1) + + // The top sort node should be removed since the stream side (t1) order DESC already + // satisfies the required sort order DESC. + val buildRightOrderByLeftDesc = queryTemplate.format("t2", "t1.key DESC") + checkSorts(buildRightOrderByLeftDesc, 0, 1) } } @@ -104,7 +112,8 @@ abstract class RemoveRedundantSortsSuiteBase val queryAsc = query + " ASC" checkSorts(queryAsc, 2, 3) - // Top level sort should only be eliminated if it's order is descending with SMJ. + // The top level sort should not be removed since the child output ordering is ASC and + // the required ordering is DESC. val queryDesc = query + " DESC" checkSorts(queryDesc, 3, 3) }