diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index 8918bb3ea86f3..0980e87db283e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -42,6 +42,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_EXPRESSION) ++ nodePatternsInternal + override lazy val deterministic: Boolean = children.forall(_.deterministic) && + plan.deterministic + // Subclasses can override this function to provide more TreePatterns. def nodePatternsInternal(): Seq[TreePattern] = Seq() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala index 6bcbc9f821de6..1de300ef9c09d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala @@ -53,7 +53,7 @@ object InlineCTE extends Rule[LogicalPlan] { // 1) It is fine to inline a CTE if it references another CTE that is non-deterministic; // 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first. refCount == 1 || - cteDef.child.find(_.expressions.exists(!_.deterministic)).isEmpty || + cteDef.deterministic || cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3c9946ba3772b..2417ff904570b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -84,6 +84,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes } + /** + * Returns true when the all the expressions in the current node as well as all of its children + * are deterministic + */ + lazy val deterministic: Boolean = expressions.forall(_.deterministic) && + children.forall(_.deterministic) + /** * Attributes that are referenced by expressions but not provided by this node's children. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala index 404c8895c4d11..fb014bb8391f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression, Rand} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} @@ -101,4 +101,32 @@ class QueryPlanSuite extends SparkFunSuite { val plan = t.select($"a", $"b").select($"a", $"b").select($"a", $"b").analyze assert(testRule(plan).resolved) } + + test("SPARK-37199: add a deterministic field to QueryPlan") { + val a: NamedExpression = AttributeReference("a", IntegerType)() + val aRand: NamedExpression = Alias(a + Rand(1), "aRand")() + val deterministicPlan = Project( + Seq(a), + Filter( + ListQuery(Project( + Seq(a), + UnresolvedRelation(TableIdentifier("t", None)) + )), + UnresolvedRelation(TableIdentifier("t", None)) + ) + ) + assert(deterministicPlan.deterministic) + + val nonDeterministicPlan = Project( + Seq(aRand), + Filter( + ListQuery(Project( + Seq(a), + UnresolvedRelation(TableIdentifier("t", None)) + )), + UnresolvedRelation(TableIdentifier("t", None)) + ) + ) + assert(!nonDeterministicPlan.deterministic) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 9e7ce55639148..be0af80ff7879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1945,4 +1945,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark correctAnswer) } } + + test("SPARK-37199: deterministic in QueryPlan considers subquery") { + val deterministicQueryPlan = sql("select (select 1 as b) as b") + .queryExecution.executedPlan + assert(deterministicQueryPlan.deterministic) + + val nonDeterministicQueryPlan = sql("select (select rand(1) as b) as b") + .queryExecution.executedPlan + assert(!nonDeterministicQueryPlan.deterministic) + } + }