diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index c920f0641cad..ed7e01e5bfb4 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -147,6 +147,8 @@ license: | - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time. + - Since Spark 3.0, substitution order of nested WITH clauses is changed and an inner CTE definition takes precedence over an outer. In version 2.4 and earlier, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` returns `1` while in version 3.0 it returns `2`. The previous behaviour can be restored by setting `spark.sql.legacy.ctePrecedence.enabled` to `true`. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5d37e909f80a..b95aaf073379 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -209,38 +209,6 @@ class Analyzer( CleanupAliases) ) - /** - * Analyze cte definitions and substitute child plan with analyzed cte definitions. - */ - object CTESubstitution extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case With(child, relations) => - // substitute CTE expressions right-to-left to resolve references to previous CTEs: - // with a as (select * from t), b as (select * from a) select * from b - relations.foldRight(child) { - case ((cteName, ctePlan), currentPlan) => - substituteCTE(currentPlan, cteName, ctePlan) - } - case other => other - } - - private def substituteCTE( - plan: LogicalPlan, - cteName: String, - ctePlan: LogicalPlan): LogicalPlan = { - plan resolveOperatorsUp { - case UnresolvedRelation(Seq(table)) if resolver(cteName, table) => - ctePlan - case other => - // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. - other transformExpressions { - case e: SubqueryExpression => - e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan)) - } - } - } - } - /** * Substitute child plan with WindowSpecDefinitions. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala new file mode 100644 index 000000000000..60e6bf8db06d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, With} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_ENABLED + +/** + * Analyze WITH nodes and substitute child plan with CTE definitions. + */ +object CTESubstitution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + if (SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_ENABLED)) { + legacyTraverseAndSubstituteCTE(plan) + } else { + traverseAndSubstituteCTE(plan, false) + } + } + + private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsUp { + case With(child, relations) => + // substitute CTE expressions right-to-left to resolve references to previous CTEs: + // with a as (select * from t), b as (select * from a) select * from b + relations.foldRight(child) { + case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan) + } + } + } + + /** + * Traverse the plan and expression nodes as a tree and replace matching references to CTE + * definitions. + * - If the rule encounters a WITH node then it substitutes the child of the node with CTE + * definitions of the node right-to-left order as a definition can reference to a previous + * one. + * For example the following query is valid: + * WITH + * t AS (SELECT 1), + * t2 AS (SELECT * FROM t) + * SELECT * FROM t2 + * - If a CTE definition contains an inner WITH node then substitution of inner should take + * precedence because it can shadow an outer CTE definition. + * For example the following query should return 2: + * WITH + * t AS (SELECT 1), + * t2 AS ( + * WITH t AS (SELECT 2) + * SELECT * FROM t + * ) + * SELECT * FROM t2 + * - If a CTE definition contains a subquery that contains an inner WITH node then substitution + * of inner should take precedence because it can shadow an outer CTE definition. + * For example the following query should return 2: + * WITH t AS (SELECT 1 AS c) + * SELECT max(c) FROM ( + * WITH t AS (SELECT 2 AS c) + * SELECT * FROM t + * ) + * - If a CTE definition contains a subquery expression that contains an inner WITH node then + * substitution of inner should take precedence because it can shadow an outer CTE + * definition. + * For example the following query should return 2: + * WITH t AS (SELECT 1) + * SELECT ( + * WITH t AS (SELECT 2) + * SELECT * FROM t + * ) + * @param plan the plan to be traversed + * @param inTraverse whether the current traverse is called from another traverse, only in this + * case name collision can occur + * @return the plan where CTE substitution is applied + */ + private def traverseAndSubstituteCTE(plan: LogicalPlan, inTraverse: Boolean): LogicalPlan = { + plan.resolveOperatorsUp { + case With(child: LogicalPlan, relations) => + // child might contain an inner CTE that has priority so traverse and substitute inner CTEs + // in child first + val traversedChild: LogicalPlan = child transformExpressions { + case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true)) + } + + // Substitute CTE definitions from last to first as a CTE definition can reference a + // previous one + relations.foldRight(traversedChild) { + case ((cteName, ctePlan), currentPlan) => + // A CTE definition might contain an inner CTE that has priority, so traverse and + // substitute CTE defined in ctePlan. + // A CTE definition might not be used at all or might be used multiple times. To avoid + // computation if it is not used and to avoid multiple recomputation if it is used + // multiple times we use a lazy construct with call-by-name parameter passing. + lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan, true) + substituteCTE(currentPlan, cteName, substitutedCTEPlan) + } + + // CTE name collision can occur only when inTraverse is true, it helps to avoid eager CTE + // substitution in a subquery expression. + case other if inTraverse => + other.transformExpressions { + case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true)) + } + } + } + + private def substituteCTE( + plan: LogicalPlan, + cteName: String, + ctePlan: => LogicalPlan): LogicalPlan = + plan resolveOperatorsUp { + case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan + + case other => + // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. + other transformExpressions { + case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan)) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index af67632706df..0caa416f4aad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1838,6 +1838,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_CTE_PRECEDENCE_ENABLED = buildConf("spark.sql.legacy.ctePrecedence.enabled") + .internal() + .doc("When true, outer CTE definitions takes precedence over inner definitions.") + .booleanConf + .createWithDefault(false) + val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC = buildConf("spark.sql.legacy.arrayExistsFollowsThreeValuedLogic") .doc("When true, the ArrayExists will follow the three-valued boolean logic.") diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-legacy.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-legacy.sql new file mode 100644 index 000000000000..2f2606d44d91 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-legacy.sql @@ -0,0 +1,115 @@ +create temporary view t as select * from values 0, 1, 2 as t(id); +create temporary view t2 as select * from values 0, 1 as t(id); + +-- CTE legacy substitution +SET spark.sql.legacy.ctePrecedence.enabled=true; + +-- CTE in CTE definition +WITH t as ( + WITH t2 AS (SELECT 1) + SELECT * FROM t2 +) +SELECT * FROM t; + +-- CTE in subquery +SELECT max(c) FROM ( + WITH t(c) AS (SELECT 1) + SELECT * FROM t +); + +-- CTE in subquery expression +SELECT ( + WITH t AS (SELECT 1) + SELECT * FROM t +); + +-- CTE in CTE definition shadows outer +WITH + t AS (SELECT 1), + t2 AS ( + WITH t AS (SELECT 2) + SELECT * FROM t + ) +SELECT * FROM t2; + +-- CTE in CTE definition shadows outer 2 +WITH + t(c) AS (SELECT 1), + t2 AS ( + SELECT ( + SELECT max(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t + ) + ) + ) +SELECT * FROM t2; + +-- CTE in CTE definition shadows outer 3 +WITH + t AS (SELECT 1), + t2 AS ( + WITH t AS (SELECT 2), + t2 AS ( + WITH t AS (SELECT 3) + SELECT * FROM t + ) + SELECT * FROM t2 + ) +SELECT * FROM t2; + +-- CTE in subquery shadows outer +WITH t(c) AS (SELECT 1) +SELECT max(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +); + +-- CTE in subquery shadows outer 2 +WITH t(c) AS (SELECT 1) +SELECT sum(c) FROM ( + SELECT max(c) AS c FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t + ) +); + +-- CTE in subquery shadows outer 3 +WITH t(c) AS (SELECT 1) +SELECT sum(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT max(c) AS c FROM ( + WITH t(c) AS (SELECT 3) + SELECT * FROM t + ) +); + +-- CTE in subquery expression shadows outer +WITH t AS (SELECT 1) +SELECT ( + WITH t AS (SELECT 2) + SELECT * FROM t +); + +-- CTE in subquery expression shadows outer 2 +WITH t AS (SELECT 1) +SELECT ( + SELECT ( + WITH t AS (SELECT 2) + SELECT * FROM t + ) +); + +-- CTE in subquery expression shadows outer 3 +WITH t AS (SELECT 1) +SELECT ( + WITH t AS (SELECT 2) + SELECT ( + WITH t AS (SELECT 3) + SELECT * FROM t + ) +); + +-- Clean up +DROP VIEW IF EXISTS t; +DROP VIEW IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out new file mode 100644 index 000000000000..5193e2536c0c --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out @@ -0,0 +1,208 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +create temporary view t as select * from values 0, 1, 2 as t(id) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +create temporary view t2 as select * from values 0, 1 as t(id) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SET spark.sql.legacy.ctePrecedence.enabled=true +-- !query 2 schema +struct +-- !query 2 output +spark.sql.legacy.ctePrecedence.enabled true + + +-- !query 3 +WITH t as ( + WITH t2 AS (SELECT 1) + SELECT * FROM t2 +) +SELECT * FROM t +-- !query 3 schema +struct<1:int> +-- !query 3 output +1 + + +-- !query 4 +SELECT max(c) FROM ( + WITH t(c) AS (SELECT 1) + SELECT * FROM t +) +-- !query 4 schema +struct +-- !query 4 output +1 + + +-- !query 5 +SELECT ( + WITH t AS (SELECT 1) + SELECT * FROM t +) +-- !query 5 schema +struct +-- !query 5 output +1 + + +-- !query 6 +WITH + t AS (SELECT 1), + t2 AS ( + WITH t AS (SELECT 2) + SELECT * FROM t + ) +SELECT * FROM t2 +-- !query 6 schema +struct<1:int> +-- !query 6 output +1 + + +-- !query 7 +WITH + t(c) AS (SELECT 1), + t2 AS ( + SELECT ( + SELECT max(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t + ) + ) + ) +SELECT * FROM t2 +-- !query 7 schema +struct +-- !query 7 output +1 + + +-- !query 8 +WITH + t AS (SELECT 1), + t2 AS ( + WITH t AS (SELECT 2), + t2 AS ( + WITH t AS (SELECT 3) + SELECT * FROM t + ) + SELECT * FROM t2 + ) +SELECT * FROM t2 +-- !query 8 schema +struct<2:int> +-- !query 8 output +2 + + +-- !query 9 +WITH t(c) AS (SELECT 1) +SELECT max(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t +) +-- !query 9 schema +struct +-- !query 9 output +2 + + +-- !query 10 +WITH t(c) AS (SELECT 1) +SELECT sum(c) FROM ( + SELECT max(c) AS c FROM ( + WITH t(c) AS (SELECT 2) + SELECT * FROM t + ) +) +-- !query 10 schema +struct +-- !query 10 output +2 + + +-- !query 11 +WITH t(c) AS (SELECT 1) +SELECT sum(c) FROM ( + WITH t(c) AS (SELECT 2) + SELECT max(c) AS c FROM ( + WITH t(c) AS (SELECT 3) + SELECT * FROM t + ) +) +-- !query 11 schema +struct +-- !query 11 output +3 + + +-- !query 12 +WITH t AS (SELECT 1) +SELECT ( + WITH t AS (SELECT 2) + SELECT * FROM t +) +-- !query 12 schema +struct +-- !query 12 output +1 + + +-- !query 13 +WITH t AS (SELECT 1) +SELECT ( + SELECT ( + WITH t AS (SELECT 2) + SELECT * FROM t + ) +) +-- !query 13 schema +struct +-- !query 13 output +1 + + +-- !query 14 +WITH t AS (SELECT 1) +SELECT ( + WITH t AS (SELECT 2) + SELECT ( + WITH t AS (SELECT 3) + SELECT * FROM t + ) +) +-- !query 14 schema +struct +-- !query 14 output +1 + + +-- !query 15 +DROP VIEW IF EXISTS t +-- !query 15 schema +struct<> +-- !query 15 output + + + +-- !query 16 +DROP VIEW IF EXISTS t2 +-- !query 16 schema +struct<> +-- !query 16 output + diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out index 9e90908d92fa..b7dd76c72520 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out @@ -204,9 +204,9 @@ WITH ) SELECT * FROM t2 -- !query 16 schema -struct<1:int> +struct<2:int> -- !query 16 output -1 +2 -- !query 17 @@ -224,7 +224,7 @@ SELECT * FROM t2 -- !query 17 schema struct -- !query 17 output -1 +2 -- !query 18 @@ -240,9 +240,9 @@ WITH ) SELECT * FROM t2 -- !query 18 schema -struct<2:int> +struct<3:int> -- !query 18 output -2 +3 -- !query 19 @@ -295,7 +295,7 @@ SELECT ( -- !query 22 schema struct -- !query 22 output -1 +2 -- !query 23 @@ -309,7 +309,7 @@ SELECT ( -- !query 23 schema struct -- !query 23 output -1 +2 -- !query 24 @@ -324,7 +324,7 @@ SELECT ( -- !query 24 schema struct -- !query 24 output -1 +3 -- !query 25