Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ license: |

- Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time.

- Since Spark 3.0, substitution order of nested WITH clauses is changed and an inner CTE definition takes precedence over an outer. In version 2.4 and earlier, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` returns `1` while in version 3.0 it returns `2`. The previous behaviour can be restored by setting `spark.sql.legacy.ctePrecedence.enabled` to `true`.

## Upgrading from Spark SQL 2.4 to 2.4.1

- The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,38 +209,6 @@ class Analyzer(
CleanupAliases)
)

/**
* Analyze cte definitions and substitute child plan with analyzed cte definitions.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case With(child, relations) =>
// substitute CTE expressions right-to-left to resolve references to previous CTEs:
// with a as (select * from t), b as (select * from a) select * from b
relations.foldRight(child) {
case ((cteName, ctePlan), currentPlan) =>
substituteCTE(currentPlan, cteName, ctePlan)
}
case other => other
}

private def substituteCTE(
plan: LogicalPlan,
cteName: String,
ctePlan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUp {
case UnresolvedRelation(Seq(table)) if resolver(cteName, table) =>
ctePlan
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
case e: SubqueryExpression =>
e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan))
}
}
}
}

/**
* Substitute child plan with WindowSpecDefinitions.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, With}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_ENABLED

/**
* Analyze WITH nodes and substitute child plan with CTE definitions.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_ENABLED)) {
legacyTraverseAndSubstituteCTE(plan)
} else {
traverseAndSubstituteCTE(plan, false)
}
}

private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUp {
case With(child, relations) =>
// substitute CTE expressions right-to-left to resolve references to previous CTEs:
// with a as (select * from t), b as (select * from a) select * from b
relations.foldRight(child) {
case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan)
}
}
}

/**
* Traverse the plan and expression nodes as a tree and replace matching references to CTE
* definitions.
* - If the rule encounters a WITH node then it substitutes the child of the node with CTE
* definitions of the node right-to-left order as a definition can reference to a previous
* one.
* For example the following query is valid:
* WITH
* t AS (SELECT 1),
* t2 AS (SELECT * FROM t)
* SELECT * FROM t2
* - If a CTE definition contains an inner WITH node then substitution of inner should take
* precedence because it can shadow an outer CTE definition.
* For example the following query should return 2:
* WITH
* t AS (SELECT 1),
* t2 AS (
* WITH t AS (SELECT 2)
* SELECT * FROM t
* )
* SELECT * FROM t2
* - If a CTE definition contains a subquery that contains an inner WITH node then substitution
* of inner should take precedence because it can shadow an outer CTE definition.
* For example the following query should return 2:
* WITH t AS (SELECT 1 AS c)
* SELECT max(c) FROM (
* WITH t AS (SELECT 2 AS c)
* SELECT * FROM t
* )
* - If a CTE definition contains a subquery expression that contains an inner WITH node then
* substitution of inner should take precedence because it can shadow an outer CTE
* definition.
* For example the following query should return 2:
* WITH t AS (SELECT 1)
* SELECT (
* WITH t AS (SELECT 2)
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @param inTraverse whether the current traverse is called from another traverse, only in this
* case name collision can occur
* @return the plan where CTE substitution is applied
*/
private def traverseAndSubstituteCTE(plan: LogicalPlan, inTraverse: Boolean): LogicalPlan = {
plan.resolveOperatorsUp {
case With(child: LogicalPlan, relations) =>
// child might contain an inner CTE that has priority so traverse and substitute inner CTEs
// in child first
val traversedChild: LogicalPlan = child transformExpressions {
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subquery expression seems not correctly handled.

with t1 as (select 1 i) select * from t1 where i in (with t1 as (select 2 i) select * from t1) returns 1 in Spark, but empty row in pgsql.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it probably should be transformAllExpressions. Will look into it soon...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Thanks, @cloud-fan .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #28318 to fix it.

}

// Substitute CTE definitions from last to first as a CTE definition can reference a
// previous one
relations.foldRight(traversedChild) {
case ((cteName, ctePlan), currentPlan) =>
// A CTE definition might contain an inner CTE that has priority, so traverse and
// substitute CTE defined in ctePlan.
// A CTE definition might not be used at all or might be used multiple times. To avoid
// computation if it is not used and to avoid multiple recomputation if it is used
// multiple times we use a lazy construct with call-by-name parameter passing.
lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan, true)
substituteCTE(currentPlan, cteName, substitutedCTEPlan)
}

// CTE name collision can occur only when inTraverse is true, it helps to avoid eager CTE
// substitution in a subquery expression.
case other if inTraverse =>
other.transformExpressions {
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true))
}
}
}

private def substituteCTE(
plan: LogicalPlan,
cteName: String,
ctePlan: => LogicalPlan): LogicalPlan =
plan resolveOperatorsUp {
case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan

case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,12 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_CTE_PRECEDENCE_ENABLED = buildConf("spark.sql.legacy.ctePrecedence.enabled")
.internal()
.doc("When true, outer CTE definitions takes precedence over inner definitions.")
.booleanConf
.createWithDefault(false)

val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
buildConf("spark.sql.legacy.arrayExistsFollowsThreeValuedLogic")
.doc("When true, the ArrayExists will follow the three-valued boolean logic.")
Expand Down
115 changes: 115 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/cte-legacy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
create temporary view t as select * from values 0, 1, 2 as t(id);
create temporary view t2 as select * from values 0, 1 as t(id);

-- CTE legacy substitution
SET spark.sql.legacy.ctePrecedence.enabled=true;

-- CTE in CTE definition
WITH t as (
WITH t2 AS (SELECT 1)
SELECT * FROM t2
)
SELECT * FROM t;

-- CTE in subquery
SELECT max(c) FROM (
WITH t(c) AS (SELECT 1)
SELECT * FROM t
);

-- CTE in subquery expression
SELECT (
WITH t AS (SELECT 1)
SELECT * FROM t
);

-- CTE in CTE definition shadows outer
WITH
t AS (SELECT 1),
t2 AS (
WITH t AS (SELECT 2)
SELECT * FROM t
)
SELECT * FROM t2;

-- CTE in CTE definition shadows outer 2
WITH
t(c) AS (SELECT 1),
t2 AS (
SELECT (
SELECT max(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
)
)
SELECT * FROM t2;

-- CTE in CTE definition shadows outer 3
WITH
t AS (SELECT 1),
t2 AS (
WITH t AS (SELECT 2),
t2 AS (
WITH t AS (SELECT 3)
SELECT * FROM t
)
SELECT * FROM t2
)
SELECT * FROM t2;

-- CTE in subquery shadows outer
WITH t(c) AS (SELECT 1)
SELECT max(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
);

-- CTE in subquery shadows outer 2
WITH t(c) AS (SELECT 1)
SELECT sum(c) FROM (
SELECT max(c) AS c FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
);

-- CTE in subquery shadows outer 3
WITH t(c) AS (SELECT 1)
SELECT sum(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT max(c) AS c FROM (
WITH t(c) AS (SELECT 3)
SELECT * FROM t
)
);

-- CTE in subquery expression shadows outer
WITH t AS (SELECT 1)
SELECT (
WITH t AS (SELECT 2)
SELECT * FROM t
);

-- CTE in subquery expression shadows outer 2
WITH t AS (SELECT 1)
SELECT (
SELECT (
WITH t AS (SELECT 2)
SELECT * FROM t
)
);

-- CTE in subquery expression shadows outer 3
WITH t AS (SELECT 1)
SELECT (
WITH t AS (SELECT 2)
SELECT (
WITH t AS (SELECT 3)
SELECT * FROM t
)
);

-- Clean up
DROP VIEW IF EXISTS t;
DROP VIEW IF EXISTS t2;
Loading