From 5cb559dbee44721a3f439583492c24f301e04a1c Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sat, 8 May 2021 15:18:45 +0800 Subject: [PATCH 1/4] [SPARK-34775][SQL] Push down limit through window when partitionSpec is not empty --- .../LimitPushDownThroughWindow.scala | 14 +- .../LimitPushdownThroughWindowSuite.scala | 146 ++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 43 ++++++ 3 files changed, 188 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala index eaea167ee9ff2..16180c6890814 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SortOrder, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW} @@ -33,7 +33,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { // The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW. private def supportsPushdownThroughWindow( windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { - case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _, + case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(_, _, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true case _ => false } @@ -42,17 +42,19 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { _.containsAllPatterns(WINDOW, LIMIT), ruleId) { // Adding an extra Limit below WINDOW when the partitionSpec of all window functions is empty. case LocalLimit(limitExpr @ IntegerLiteral(limit), - window @ Window(windowExpressions, Nil, orderSpec, child)) + window @ Window(windowExpressions, partitionSpec, orderSpec, child)) if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && limit < conf.topKSortFallbackThreshold => // Sort is needed here because we need global sort. - window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child))) + window.copy(child = Limit(limitExpr, + Sort(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec, true, child))) // There is a Project between LocalLimit and Window if they do not have the same output. case LocalLimit(limitExpr @ IntegerLiteral(limit), project @ Project(_, - window @ Window(windowExpressions, Nil, orderSpec, child))) + window @ Window(windowExpressions, partitionSpec, orderSpec, child))) if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && limit < conf.topKSortFallbackThreshold => // Sort is needed here because we need global sort. - project.copy(child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child)))) + project.copy(child = window.copy(child = Limit(limitExpr, + Sort(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec, true, child)))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala index f2c1f452d0203..ddf0bdb0d5824 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala @@ -155,36 +155,164 @@ class LimitPushdownThroughWindowSuite extends PlanTest { WithoutOptimize.execute(correctAnswer.analyze)) } - test("Should not push down if partitionSpec is not empty") { + test("Should not push down when child's maxRows smaller than limit value") { val originalQuery = testRelation .select(a, b, c, - windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) - .limit(2) + windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(20) comparePlans( Optimize.execute(originalQuery.analyze), WithoutOptimize.execute(originalQuery.analyze)) } - test("Should not push down when child's maxRows smaller than limit value") { + test("Should not push down if it is not RankLike/RowNumberLike window function") { val originalQuery = testRelation .select(a, b, c, - windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) - .limit(20) + windowExpr(count(b), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) comparePlans( Optimize.execute(originalQuery.analyze), WithoutOptimize.execute(originalQuery.analyze)) } - test("Should not push down if it is not RankLike/RowNumberLike window function") { + + test("Should push down if partitionSpec is not empty") { val originalQuery = testRelation .select(a, b, c, - windowExpr(count(b), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c) + .orderBy(a.asc, c.desc) .limit(2) + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) comparePlans( Optimize.execute(originalQuery.analyze), - WithoutOptimize.execute(originalQuery.analyze)) + WithoutOptimize.execute(correctAnswer.analyze)) + } + + test("Should push down if partitionSpec is not empty and with multi partitionSpec") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: b :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c) + .orderBy(a.asc, b.asc, c.desc) + .limit(2) + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: b :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) + } + + test("Push down limit through window for multiple window functions " + + "when all partitionSpec is not empty and same") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), + windowExpr(new Rank(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rk")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c) + .orderBy(a.asc, c.desc) + .limit(2) + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), + windowExpr(new Rank(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rk")) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) + } + + test("Push down limit through window for multiple window functions " + + "when partitionSpec is not empty and not same") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"), + windowExpr(new Rank(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rk")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + .orderBy(b.asc, c.desc) + .limit(2) + .select(a, b, c, $"rn".attr, + windowExpr(new Rank(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rk")) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) + } + + test("Push down limit through window respect spark.sql.execution.topKSortFallbackThreshold " + + "when partitionSpec is not empty") { + Seq(1, 100).foreach { threshold => + withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> threshold.toString) { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) + val correctAnswer = if (threshold == 1) { + originalQuery + } else { + testRelation + .select(a, b, c) + .orderBy(a.asc, c.desc) + .limit(2) + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + } + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) + } + } + } + + test("Push down to first window if order column is different " + + "when partitionSpec is not empty") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn"), + windowExpr(new Rank(), windowSpec(a :: Nil, c.asc :: Nil, windowFrame)).as("rk")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn")) + .orderBy(a.asc, c.asc) + .limit(2) + .select(a, b, c, $"rn".attr, + windowExpr(new Rank(), windowSpec(a :: Nil, c.asc :: Nil, windowFrame)).as("rk")) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) + } + + test("Should push down if is a Project between LocalLimit and Window " + + "when partitionSpec is not empty") { + val originalQuery = testRelation + .select(a, b, + windowExpr(RowNumber(), windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn")) + .select(a, $"rn".attr) + .limit(2) + val correctAnswer = testRelation + .select(a, b) + .orderBy(a.asc, b.desc) + .limit(2) + .select(a, windowExpr(RowNumber(), windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn")) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(correctAnswer.analyze)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9add98c6ebf12..0cc0130fe9fd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4100,6 +4100,49 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-34775 Push down limit through window when partitionSpec is not empty") { + withTable("t1", "t2") { + var numRows = 20 + spark.range(numRows) + .selectExpr("id % 10 AS a", s"$numRows - id AS b") + .write + .saveAsTable("t1") + + val df1 = spark.sql( + """ + |SELECT a, b, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn, + |RANK() OVER(PARTITION BY a ORDER BY b) AS rk, + |DENSE_RANK() OVER(PARTITION BY a ORDER BY b) AS drk + |FROM t1 LIMIT 3 + |""".stripMargin) + val pushedLocalLimits1 = df1.queryExecution.optimizedPlan.collect { + case l @ LocalLimit(_, _: Sort) => l + } + assert(pushedLocalLimits1.length === 1) + checkAnswer(df1, Seq(Row(0, 10, 1, 1, 1), Row(0, 20, 2, 2, 2), Row(1, 9, 1, 1, 1))) + + + numRows = 10 + spark.range(numRows) + .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") + .write + .saveAsTable("t2") + val df2 = spark.sql( + """ + |SELECT a, b, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn, + |RANK() OVER(PARTITION BY a ORDER BY b) AS rk, + |DENSE_RANK() OVER(PARTITION BY a ORDER BY b) AS drk + |FROM t2 LIMIT 3 + |""".stripMargin) + val pushedLocalLimits2 = df2.queryExecution.optimizedPlan.collect { + case l @ LocalLimit(_, _: Sort) => l + } + assert(pushedLocalLimits2.length === 1) + checkAnswer(df2, + Seq(Row(null, 2, 1, 1, 1), Row(null, 4, 2, 2, 2), Row(null, 6, 3, 3, 3))) + } + } } case class Foo(bar: Option[String]) From c4f8220b35ca3fb6133d2707d1ce5131a7cdb67a Mon Sep 17 00:00:00 2001 From: Luan Date: Sat, 8 May 2021 15:21:30 +0800 Subject: [PATCH 2/4] Empty From b1be48befea805b1c87acb58e0222b035d30fa22 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 17 May 2021 15:09:42 +0800 Subject: [PATCH 3/4] update --- .../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala | 4 ++-- .../catalyst/optimizer/LimitPushdownThroughWindowSuite.scala | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala index 16180c6890814..b7c5a851f7bbd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala @@ -42,7 +42,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { _.containsAllPatterns(WINDOW, LIMIT), ruleId) { // Adding an extra Limit below WINDOW when the partitionSpec of all window functions is empty. case LocalLimit(limitExpr @ IntegerLiteral(limit), - window @ Window(windowExpressions, partitionSpec, orderSpec, child)) + window @ Window(windowExpressions, partitionSpec, orderSpec, child)) if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && limit < conf.topKSortFallbackThreshold => // Sort is needed here because we need global sort. @@ -50,7 +50,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { Sort(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec, true, child))) // There is a Project between LocalLimit and Window if they do not have the same output. case LocalLimit(limitExpr @ IntegerLiteral(limit), project @ Project(_, - window @ Window(windowExpressions, partitionSpec, orderSpec, child))) + window @ Window(windowExpressions, partitionSpec, orderSpec, child))) if supportsPushdownThroughWindow(windowExpressions) && child.maxRows.forall(_ > limit) && limit < conf.topKSortFallbackThreshold => // Sort is needed here because we need global sort. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala index ddf0bdb0d5824..3ed9d9d74b23d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala @@ -177,7 +177,6 @@ class LimitPushdownThroughWindowSuite extends PlanTest { WithoutOptimize.execute(originalQuery.analyze)) } - test("Should push down if partitionSpec is not empty") { val originalQuery = testRelation .select(a, b, c, From 78dd97a6d759d2bc2ee42969ef5d4fb1bba1d941 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 17 May 2021 15:12:41 +0800 Subject: [PATCH 4/4] Update LimitPushdownThroughWindowSuite.scala --- .../LimitPushdownThroughWindowSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala index 3ed9d9d74b23d..402faad1bf801 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala @@ -155,43 +155,43 @@ class LimitPushdownThroughWindowSuite extends PlanTest { WithoutOptimize.execute(correctAnswer.analyze)) } - test("Should not push down when child's maxRows smaller than limit value") { + test("Should push down if partitionSpec is not empty") { val originalQuery = testRelation .select(a, b, c, - windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) - .limit(20) + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(2) + val correctAnswer = testRelation + .select(a, b, c) + .orderBy(a.asc, c.desc) + .limit(2) + .select(a, b, c, + windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) comparePlans( Optimize.execute(originalQuery.analyze), - WithoutOptimize.execute(originalQuery.analyze)) + WithoutOptimize.execute(correctAnswer.analyze)) } - test("Should not push down if it is not RankLike/RowNumberLike window function") { + test("Should not push down when child's maxRows smaller than limit value") { val originalQuery = testRelation .select(a, b, c, - windowExpr(count(b), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) - .limit(2) + windowExpr(RowNumber(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) + .limit(20) comparePlans( Optimize.execute(originalQuery.analyze), WithoutOptimize.execute(originalQuery.analyze)) } - test("Should push down if partitionSpec is not empty") { + test("Should not push down if it is not RankLike/RowNumberLike window function") { val originalQuery = testRelation .select(a, b, c, - windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) - .limit(2) - val correctAnswer = testRelation - .select(a, b, c) - .orderBy(a.asc, c.desc) + windowExpr(count(b), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn")) .limit(2) - .select(a, b, c, - windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) comparePlans( Optimize.execute(originalQuery.analyze), - WithoutOptimize.execute(correctAnswer.analyze)) + WithoutOptimize.execute(originalQuery.analyze)) } test("Should push down if partitionSpec is not empty and with multi partitionSpec") {