diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2664fd638062d..afbf73027277e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -989,7 +989,10 @@ object ColumnPruning extends Rule[LogicalPlan] { object CollapseProject extends Rule[LogicalPlan] with AliasHelper { def apply(plan: LogicalPlan): LogicalPlan = { - val alwaysInline = conf.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE) + apply(plan, conf.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE)) + } + + def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = { plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) { case p1 @ Project(_, p2: Project) if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 9a1d20ed9b21d..6665d885554fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -730,7 +730,9 @@ object OptimizeOneRowRelationSubquery extends Rule[LogicalPlan] { object OneRowSubquery { def unapply(plan: LogicalPlan): Option[Seq[NamedExpression]] = { - CollapseProject(EliminateSubqueryAliases(plan)) match { + // SPARK-40800: always inline expressions to support a broader range of correlated + // subqueries and avoid expensive domain joins. + CollapseProject(EliminateSubqueryAliases(plan), alwaysInline = true) match { case Project(projectList, _: OneRowRelation) => Some(stripOuterReferences(projectList)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index ecb4bfd0ec41b..02437c4133f2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2469,10 +2469,41 @@ class SubquerySuite extends QueryTest Row(2)) // Cannot use non-orderable data type in one row subquery that cannot be collapsed. - val error = intercept[AnalysisException] { - sql("select (select concat(a, a) from (select upper(x['a']) as a)) from v1").collect() - } - assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type")) + val error = intercept[AnalysisException] { + sql( + """ + |select ( + | select concat(a, a) from + | (select upper(x['a'] + rand()) as a) + |) from v1 + |""".stripMargin).collect() + } + assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type")) + } + } + + test("SPARK-40800: always inline expressions in OptimizeOneRowRelationSubquery") { + withTempView("t1") { + sql("CREATE TEMP VIEW t1 AS SELECT ARRAY('a', 'b') a") + // Scalar subquery. + checkAnswer(sql( + """ + |SELECT ( + | SELECT array_sort(a, (i, j) -> rank[i] - rank[j])[0] AS sorted + | FROM (SELECT MAP('a', 1, 'b', 2) rank) + |) FROM t1 + |""".stripMargin), + Row("a")) + // Lateral subquery. + checkAnswer( + sql(""" + |SELECT sorted[0] FROM t1 + |JOIN LATERAL ( + | SELECT array_sort(a, (i, j) -> rank[i] - rank[j]) AS sorted + | FROM (SELECT MAP('a', 1, 'b', 2) rank) + |) + |""".stripMargin), + Row("a")) } } }