Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
object CollapseProject extends Rule[LogicalPlan] with AliasHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
val alwaysInline = conf.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE)
apply(plan, conf.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE))
}

def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = {
plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) {
case p1 @ Project(_, p2: Project)
if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,9 @@ object OptimizeOneRowRelationSubquery extends Rule[LogicalPlan] {

object OneRowSubquery {
def unapply(plan: LogicalPlan): Option[Seq[NamedExpression]] = {
CollapseProject(EliminateSubqueryAliases(plan)) match {
// SPARK-40800: always inline expressions to support a broader range of correlated
// subqueries and avoid expensive domain joins.
CollapseProject(EliminateSubqueryAliases(plan), alwaysInline = true) match {
case Project(projectList, _: OneRowRelation) => Some(stripOuterReferences(projectList))
case _ => None
}
Expand Down
39 changes: 35 additions & 4 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2469,10 +2469,41 @@ class SubquerySuite extends QueryTest
Row(2))

// Cannot use non-orderable data type in one row subquery that cannot be collapsed.
val error = intercept[AnalysisException] {
sql("select (select concat(a, a) from (select upper(x['a']) as a)) from v1").collect()
}
assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type"))
val error = intercept[AnalysisException] {
sql(
"""
|select (
| select concat(a, a) from
| (select upper(x['a'] + rand()) as a)
|) from v1
|""".stripMargin).collect()
}
assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type"))
}
}

test("SPARK-40800: always inline expressions in OptimizeOneRowRelationSubquery") {
withTempView("t1") {
sql("CREATE TEMP VIEW t1 AS SELECT ARRAY('a', 'b') a")
// Scalar subquery.
checkAnswer(sql(
"""
|SELECT (
| SELECT array_sort(a, (i, j) -> rank[i] - rank[j])[0] AS sorted
| FROM (SELECT MAP('a', 1, 'b', 2) rank)
|) FROM t1
|""".stripMargin),
Row("a"))
// Lateral subquery.
checkAnswer(
sql("""
|SELECT sorted[0] FROM t1
|JOIN LATERAL (
| SELECT array_sort(a, (i, j) -> rank[i] - rank[j]) AS sorted
| FROM (SELECT MAP('a', 1, 'b', 2) rank)
|)
|""".stripMargin),
Row("a"))
}
}
}