diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1233f2207f54e..a0d49c29470df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1251,9 +1251,14 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] { * independent and are of the same window function type, collapse into the parent. */ object CollapseWindow extends Rule[LogicalPlan] { + private def specCompatible(s1: Seq[Expression], s2: Seq[Expression]): Boolean = { + s1.length == s2.length && + s1.zip(s2).forall(e => e._1.semanticEquals(e._2)) + } + private def windowsCompatible(w1: Window, w2: Window): Boolean = { - w1.partitionSpec == w2.partitionSpec && - w1.orderSpec == w2.orderSpec && + specCompatible(w1.partitionSpec, w2.partitionSpec) && + specCompatible(w1.orderSpec, w2.orderSpec) && w1.references.intersect(w2.windowOutputSet).isEmpty && w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty && // This assumes Window contains the same type of window expressions. This is ensured diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 63cc3554564b2..515203da7caf6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -148,4 +148,24 @@ class CollapseWindowSuite extends PlanTest { comparePlans(optimized, query) } + + test("SPARK-42525: collapse two adjacent windows with the same partition/order " + + "but qualifiers are different ") { + + val query = testRelation + .window(Seq(min(a).as("_we0")), Seq(c.withQualifier(Seq("0"))), Seq(c.asc)) + .select($"a", $"b", $"c", $"_we0" as "min_a") + .window(Seq(max(a).as("_we1")), Seq(c.withQualifier(Seq("1"))), Seq(c.asc)) + .select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a") + .analyze + + val optimized = Optimize.execute(query) + + val correctAnswer = testRelation + .window(Seq(min(a).as("_we0"), max(a).as("_we1")), Seq(c), Seq(c.asc)) + .select(a, b, c, $"_we0" as "min_a", $"_we1" as "max_a") + .analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index b9421f8b13dd4..1fb937e93b843 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.matchers.must.Matchers.the import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral} import org.apache.spark.sql.catalyst.optimizer.TransposeWindow +import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} @@ -1429,4 +1430,19 @@ class DataFrameWindowFunctionsSuite extends QueryTest } } } + + test("SPARK-42525: collapse two adjacent windows with the same partition/order in subquery") { + withTempView("t1") { + Seq((1, 1), (2, 2)).toDF("a", "b").createOrReplaceTempView("t1") + val df = sql( + """ + |SELECT a, b, rk, row_number() OVER (PARTITION BY a ORDER BY b) AS rn + |FROM (SELECT a, b, rank() OVER (PARTITION BY a ORDER BY b) AS rk + | FROM t1) t2 + |""".stripMargin) + + val windows = df.queryExecution.optimizedPlan.collect { case w: LogicalWindow => w } + assert(windows.size === 1) + } + } }