From 686a827737a1f4797f34ede60f9d6c233add3230 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 23 Apr 2020 20:41:13 +0200 Subject: [PATCH 1/2] [SPARK-31535][SQL] Fix nested CTE substitution --- .../catalyst/analysis/CTESubstitution.scala | 60 +++++++------------ .../resources/sql-tests/inputs/cte-nested.sql | 8 +++ .../sql-tests/results/cte-legacy.sql.out | 15 ++++- .../sql-tests/results/cte-nested.sql.out | 16 ++++- .../sql-tests/results/cte-nonlegacy.sql.out | 15 ++++- 5 files changed, 71 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index c8078e10fd04..5de2704cee2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -31,28 +31,28 @@ object CTESubstitution extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { LegacyBehaviorPolicy.withName(SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => - assertNoNameConflictsInCTE(plan, inTraverse = false) - traverseAndSubstituteCTE(plan, inTraverse = false) + assertNoNameConflictsInCTE(plan) + traverseAndSubstituteCTE(plan) case LegacyBehaviorPolicy.LEGACY => legacyTraverseAndSubstituteCTE(plan) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, inTraverse = false) + traverseAndSubstituteCTE(plan) } } /** * Check the plan to be traversed has naming conflicts in nested CTE or not, traverse through - * child, innerChildren and subquery for the current plan. + * child, innerChildren and subquery expressions for the current plan. */ private def assertNoNameConflictsInCTE( plan: LogicalPlan, - inTraverse: Boolean, - cteNames: Set[String] = Set.empty): Unit = { - plan.foreach { + namesInChildren: Set[String] = Set.empty, + namesInExpressions: Set[String] = Set.empty): Unit = { + plan match { case w @ With(child, relations) => val newNames = relations.map { case (cteName, _) => - if (cteNames.contains(cteName)) { + if (namesInChildren.contains(cteName)) { throw new AnalysisException(s"Name $cteName is ambiguous in nested CTE. " + s"Please set ${LEGACY_CTE_PRECEDENCE_POLICY.key} to CORRECTED so that name " + "defined in inner CTE takes precedence. If set it to LEGACY, outer CTE " + @@ -60,24 +60,14 @@ object CTESubstitution extends Rule[LogicalPlan] { } else { cteName } - }.toSet - child.transformExpressions { - case e: SubqueryExpression => - assertNoNameConflictsInCTE(e.plan, inTraverse = true, cteNames ++ newNames) - e - } - w.innerChildren.foreach { p => - assertNoNameConflictsInCTE(p, inTraverse = true, cteNames ++ newNames) - } - - case other if inTraverse => - other.transformExpressions { - case e: SubqueryExpression => - assertNoNameConflictsInCTE(e.plan, inTraverse = true, cteNames) - e - } + }.toSet ++ namesInExpressions + assertNoNameConflictsInCTE(child, namesInChildren, newNames) + w.innerChildren.foreach(assertNoNameConflictsInCTE(_, newNames, newNames)) - case _ => + case other => + other.subqueries.foreach( + assertNoNameConflictsInCTE(_, namesInExpressions, namesInExpressions)) + other.children.foreach(assertNoNameConflictsInCTE(_, namesInChildren, namesInExpressions)) } } @@ -131,37 +121,27 @@ object CTESubstitution extends Rule[LogicalPlan] { * SELECT * FROM t * ) * @param plan the plan to be traversed - * @param inTraverse whether the current traverse is called from another traverse, only in this - * case name collision can occur * @return the plan where CTE substitution is applied */ - private def traverseAndSubstituteCTE(plan: LogicalPlan, inTraverse: Boolean): LogicalPlan = { + private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case With(child: LogicalPlan, relations) => - // child might contain an inner CTE that has priority so traverse and substitute inner CTEs - // in child first - val traversedChild: LogicalPlan = child transformExpressions { - case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true)) - } - // Substitute CTE definitions from last to first as a CTE definition can reference a // previous one - relations.foldRight(traversedChild) { + relations.foldRight(child) { case ((cteName, ctePlan), currentPlan) => // A CTE definition might contain an inner CTE that has priority, so traverse and // substitute CTE defined in ctePlan. // A CTE definition might not be used at all or might be used multiple times. To avoid // computation if it is not used and to avoid multiple recomputation if it is used // multiple times we use a lazy construct with call-by-name parameter passing. - lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan, true) + lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan) substituteCTE(currentPlan, cteName, substitutedCTEPlan) } - // CTE name collision can occur only when inTraverse is true, it helps to avoid eager CTE - // substitution in a subquery expression. - case other if inTraverse => + case other => other.transformExpressions { - case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true)) + case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan)) } } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql index 5e5e3a51f1f5..134f88bf3302 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql @@ -103,3 +103,11 @@ SELECT ( SELECT * FROM t ) ); + +-- CTE in subquery expression shadows outer 4 +WITH t(c) AS (SELECT 1) +SELECT * FROM t +WHERE c IN ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +); diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out index e8020e1a454e..629daf7410e5 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query @@ -166,3 +166,16 @@ SELECT ( struct -- !query output 1 + + +-- !query +WITH t(c) AS (SELECT 1) +SELECT * FROM t +WHERE c IN ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +) +-- !query schema +struct +-- !query output +1 diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out index 64d635adae42..34e9be0ca000 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query @@ -172,3 +172,17 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.; + + +-- !query +WITH t(c) AS (SELECT 1) +SELECT * FROM t +WHERE c IN ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.; diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out index 9422bb642d96..6eba1ad72523 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query @@ -166,3 +166,16 @@ SELECT ( struct -- !query output 3 + + +-- !query +WITH t(c) AS (SELECT 1) +SELECT * FROM t +WHERE c IN ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +) +-- !query schema +struct +-- !query output + From 4a3ebf746cc7efa2c7356c79a400b7bd04952764 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 24 Apr 2020 15:26:11 +0200 Subject: [PATCH 2/2] rename parameters --- .../sql/catalyst/analysis/CTESubstitution.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 5de2704cee2c..d9fdb56739e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -46,13 +46,13 @@ object CTESubstitution extends Rule[LogicalPlan] { */ private def assertNoNameConflictsInCTE( plan: LogicalPlan, - namesInChildren: Set[String] = Set.empty, - namesInExpressions: Set[String] = Set.empty): Unit = { + outerCTERelationNames: Set[String] = Set.empty, + namesInSubqueries: Set[String] = Set.empty): Unit = { plan match { case w @ With(child, relations) => val newNames = relations.map { case (cteName, _) => - if (namesInChildren.contains(cteName)) { + if (outerCTERelationNames.contains(cteName)) { throw new AnalysisException(s"Name $cteName is ambiguous in nested CTE. " + s"Please set ${LEGACY_CTE_PRECEDENCE_POLICY.key} to CORRECTED so that name " + "defined in inner CTE takes precedence. If set it to LEGACY, outer CTE " + @@ -60,14 +60,15 @@ object CTESubstitution extends Rule[LogicalPlan] { } else { cteName } - }.toSet ++ namesInExpressions - assertNoNameConflictsInCTE(child, namesInChildren, newNames) + }.toSet ++ namesInSubqueries + assertNoNameConflictsInCTE(child, outerCTERelationNames, newNames) w.innerChildren.foreach(assertNoNameConflictsInCTE(_, newNames, newNames)) case other => other.subqueries.foreach( - assertNoNameConflictsInCTE(_, namesInExpressions, namesInExpressions)) - other.children.foreach(assertNoNameConflictsInCTE(_, namesInChildren, namesInExpressions)) + assertNoNameConflictsInCTE(_, namesInSubqueries, namesInSubqueries)) + other.children.foreach( + assertNoNameConflictsInCTE(_, outerCTERelationNames, namesInSubqueries)) } }