Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1251,9 +1251,14 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] {
* independent and are of the same window function type, collapse into the parent.
*/
object CollapseWindow extends Rule[LogicalPlan] {
private def specCompatible(s1: Seq[Expression], s2: Seq[Expression]): Boolean = {
s1.length == s2.length &&
s1.zip(s2).forall(e => e._1.semanticEquals(e._2))
}

private def windowsCompatible(w1: Window, w2: Window): Boolean = {
w1.partitionSpec == w2.partitionSpec &&
w1.orderSpec == w2.orderSpec &&
specCompatible(w1.partitionSpec, w2.partitionSpec) &&
specCompatible(w1.orderSpec, w2.orderSpec) &&
w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty &&
// This assumes Window contains the same type of window expressions. This is ensured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,24 @@ class CollapseWindowSuite extends PlanTest {

comparePlans(optimized, query)
}

test("SPARK-42525: collapse two adjacent windows with the same partition/order " +
"but qualifiers are different ") {

val query = testRelation
.window(Seq(min(a).as("_we0")), Seq(c.withQualifier(Seq("0"))), Seq(c.asc))
.select($"a", $"b", $"c", $"_we0" as "min_a")
.window(Seq(max(a).as("_we1")), Seq(c.withQualifier(Seq("1"))), Seq(c.asc))
.select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a")
.analyze

val optimized = Optimize.execute(query)

val correctAnswer = testRelation
.window(Seq(min(a).as("_we0"), max(a).as("_we1")), Seq(c), Seq(c.asc))
.select(a, b, c, $"_we0" as "min_a", $"_we1" as "max_a")
.analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.matchers.must.Matchers.the
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral}
import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec}
Expand Down Expand Up @@ -1429,4 +1430,19 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}
}

test("SPARK-42525: collapse two adjacent windows with the same partition/order in subquery") {
withTempView("t1") {
Seq((1, 1), (2, 2)).toDF("a", "b").createOrReplaceTempView("t1")
val df = sql(
"""
|SELECT a, b, rk, row_number() OVER (PARTITION BY a ORDER BY b) AS rn
|FROM (SELECT a, b, rank() OVER (PARTITION BY a ORDER BY b) AS rk
| FROM t1) t2
|""".stripMargin)

val windows = df.queryExecution.optimizedPlan.collect { case w: LogicalWindow => w }
assert(windows.size === 1)
}
}
}