Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see

## Upgrading From Spark SQL 2.3 to 2.4

- Since Spark 2.4, Spark will evaluate the set operations referenced in a query by following a precedence rule as per the SQL standard. If the order is not specified by parentheses, set operations are performed from left to right with the exception that all INTERSECT operations are performed before any UNION, EXCEPT or MINUS operations. The old behaviour of giving equal precedence to all the set operations are preserved under a newly added configuaration `spark.sql.legacy.setopsPrecedence.enabled` with a default value of `false`. When this property is set to `true`, spark will evaluate the set operators from left to right as they appear in the query given no explicit ordering is enforced by usage of parenthesis.
- Since Spark 2.4, Spark will display table description column Last Access value as UNKNOWN when the value was Jan 01 1970.
- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
grammar SqlBase;

@members {
/**
* When false, INTERSECT is given the greater precedence over the other set
* operations (UNION, EXCEPT and MINUS) as per the SQL standard.
*/
public boolean legacy_setops_precedence_enbled = false;

/**
* Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore.
Expand Down Expand Up @@ -352,8 +358,13 @@ multiInsertQueryBody
;

queryTerm
: queryPrimary #queryTermDefault
| left=queryTerm operator=(INTERSECT | UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation
: queryPrimary #queryTermDefault
| left=queryTerm {legacy_setops_precedence_enbled}?
operator=(INTERSECT | UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation
| left=queryTerm {!legacy_setops_precedence_enbled}?
operator=INTERSECT setQuantifier? right=queryTerm #setOperation
| left=queryTerm {!legacy_setops_precedence_enbled}?
operator=(UNION | EXCEPT | SETMINUS) setQuantifier? right=queryTerm #setOperation
;

queryPrimary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,11 @@ package object dsl {

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)

def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)
def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
Except(logicalPlan, otherPlan, isAll)

def intersect(otherPlan: LogicalPlan): LogicalPlan = Intersect(logicalPlan, otherPlan)
def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
Intersect(logicalPlan, otherPlan, isAll)

def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

try {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ object SetOperation {
}

case class Intersect(
left: LogicalPlan,
right: LogicalPlan,
isAll: Boolean = false) extends SetOperation(left, right) {
left: LogicalPlan,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to the current PR. This addresses a comment from @HyukjinKwon in 21886

right: LogicalPlan,
isAll: Boolean = false) extends SetOperation(left, right) {

override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" )

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,16 @@ object SQLConf {
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val LEGACY_SETOPS_PRECEDENCE_ENABLED =
buildConf("spark.sql.legacy.setopsPrecedence.enabled")
.internal()
.doc("When set to true and the order of evaluation is not specified by parentheses, the " +
"set operations are performed from left to right as they appear in the query. When set " +
"to false and order of evaluation is not specified by parentheses, INTERSECT operations " +
"are performed before any UNION, EXCEPT and MINUS operations.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -1841,6 +1851,8 @@ class SQLConf extends Serializable with Logging {

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)

def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -676,4 +677,48 @@ class PlanParserSuite extends AnalysisTest {
OneRowRelation().select('rtrim.function("c&^,.", "bc...,,,&&&ccc"))
)
}

test("precedence of set operations") {
val a = table("a").select(star())
val b = table("b").select(star())
val c = table("c").select(star())
val d = table("d").select(star())

val query1 =
"""
|SELECT * FROM a
|UNION
|SELECT * FROM b
|EXCEPT
|SELECT * FROM c
|INTERSECT
|SELECT * FROM d
""".stripMargin

val query2 =
"""
|SELECT * FROM a
|UNION
|SELECT * FROM b
|EXCEPT ALL
|SELECT * FROM c
|INTERSECT ALL
|SELECT * FROM d
""".stripMargin

assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add withSQLConf(SQLConf.SETOPS_PRECEDENCE_ENFORCED.key -> "true") {

assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))

// Now disable precedence enforcement to verify the old behaviour.
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "true") {
assertEqual(query1, Distinct(a.union(b)).except(c).intersect(d))
assertEqual(query2, Distinct(a.union(b)).except(c, isAll = true).intersect(d, isAll = true))
}

// Explicitly enable the precedence enforcement
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "false") {
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Intersect(left, right, true) =>
throw new IllegalStateException(
"logical intersect operator should have been replaced by union, aggregate" +
"and generate operators in the optimizer")
" and generate operators in the optimizer")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to the current PR. This addresses a comment from @HyukjinKwon in 21886

case logical.Except(left, right, false) =>
throw new IllegalStateException(
"logical except operator should have been replaced by anti-join in the optimizer")
case logical.Except(left, right, true) =>
throw new IllegalStateException(
"logical except (all) operator should have been replaced by union, aggregate" +
"and generate operators in the optimizer")
" and generate operators in the optimizer")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to the current PR. This addresses a comment from @HyukjinKwon in 21886


case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil
Expand Down
51 changes: 44 additions & 7 deletions sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,40 @@ INTERSECT ALL
SELECT * FROM tab2;

-- Chain of different `set operations
-- We need to parenthesize the following two queries to enforce
-- certain order of evaluation of operators. After fix to
-- SPARK-24966 this can be removed.
SELECT * FROM tab1
EXCEPT
SELECT * FROM tab2
UNION ALL
(
SELECT * FROM tab1
INTERSECT ALL
SELECT * FROM tab2
);
;

-- Chain of different `set operations
SELECT * FROM tab1
EXCEPT
SELECT * FROM tab2
EXCEPT
(
SELECT * FROM tab1
INTERSECT ALL
SELECT * FROM tab2
);
;

-- test use parenthesis to control order of evaluation
(
(
(
SELECT * FROM tab1
EXCEPT
SELECT * FROM tab2
)
EXCEPT
SELECT * FROM tab1
)
INTERSECT ALL
SELECT * FROM tab2
)
;

-- Join under intersect all
SELECT *
Expand Down Expand Up @@ -118,6 +129,32 @@ SELECT v FROM tab1 GROUP BY v
INTERSECT ALL
SELECT k FROM tab2 GROUP BY k;

-- Test pre spark2.4 behaviour of set operation precedence
-- All the set operators are given equal precedence and are evaluated
-- from left to right as they appear in the query.

-- Set the property
SET spark.sql.legacy.setopsPrecedence.enabled= true;

SELECT * FROM tab1
EXCEPT
SELECT * FROM tab2
UNION ALL
SELECT * FROM tab1
INTERSECT ALL
SELECT * FROM tab2;

SELECT * FROM tab1
EXCEPT
SELECT * FROM tab2
UNION ALL
SELECT * FROM tab1
INTERSECT
SELECT * FROM tab2;

-- Restore the property
SET spark.sql.legacy.setopsPrecedence.enabled = false;

-- Clean-up
DROP VIEW IF EXISTS tab1;
DROP VIEW IF EXISTS tab2;
Loading