Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -120,7 +120,7 @@ license: |
- The `weekofyear`, `weekday`, `dayofweek`, `date_trunc`, `from_utc_timestamp`, `to_utc_timestamp`, and `unix_timestamp` functions use java.time API for calculation week number of year, day number of week as well for conversion from/to TimestampType values in UTC time zone.

- the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone.

- Formatting of `TIMESTAMP` and `DATE` literals.

- In Spark version 2.4 and earlier, invalid time zone ids are silently ignored and replaced by GMT time zone, for example, in the from_utc_timestamp function. Since Spark 3.0, such time zone ids are rejected, and Spark throws `java.time.DateTimeException`.
Expand All @@ -143,7 +143,7 @@ license: |

- Since Spark 3.0, when Avro files are written with user provided non-nullable schema, even the catalyst schema is nullable, Spark is still able to write the files. However, Spark will throw runtime NPE if any of the records contains null.

- Since Spark 3.0, we use a new protocol for fetching shuffle blocks, for external shuffle service users, we need to upgrade the server correspondingly. Otherwise, we'll get the error message `UnsupportedOperationException: Unexpected message: FetchShuffleBlocks`. If it is hard to upgrade the shuffle service right now, you can still use the old protocol by setting `spark.shuffle.useOldFetchProtocol` to `true`.
- Since Spark 3.0, we use a new protocol for fetching shuffle blocks, for external shuffle service users, we need to upgrade the server correspondingly. Otherwise, we'll get the error message `UnsupportedOperationException: Unexpected message: FetchShuffleBlocks`. If it is hard to upgrade the shuffle service right now, you can still use the old protocol by setting `spark.shuffle.useOldFetchProtocol` to `true`.

- Since Spark 3.0, a higher-order function `exists` follows the three-valued boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is obtained, then `exists` will return `null` instead of `false`. For example, `exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous behaviour can be restored by setting `spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`.

Expand All @@ -157,12 +157,14 @@ license: |

- The result of `java.lang.Math`'s `log`, `log1p`, `exp`, `expm1`, and `pow` may vary across platforms. In Spark 3.0, the result of the equivalent SQL functions (including related SQL functions like `LOG10`) return values consistent with `java.lang.StrictMath`. In virtually all cases this makes no difference in the return value, and the difference is very small, but may not exactly match `java.lang.Math` on x86 platforms in cases like, for example, `log(3.0)`, whose value varies between `Math.log()` and `StrictMath.log()`.

- Since Spark 3.0, Dataset query fails if it contains ambiguous column reference that is caused by self join. A typical example: `val df1 = ...; val df2 = df1.filter(...);`, then `df1.join(df2, df1("a") > df2("a"))` returns an empty result which is quite confusing. This is because Spark cannot resolve Dataset column references that point to tables being self joined, and `df1("a")` is exactly the same as `df2("a")` in Spark. To restore the behavior before Spark 3.0, you can set `spark.sql.analyzer.failAmbiguousSelfJoin` to `false`.

## Upgrading from Spark SQL 2.4 to 2.4.1

- The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was
inconsistently interpreted as both seconds and milliseconds in Spark 2.4.0 in different parts of the code.
Unitless values are now consistently interpreted as milliseconds. Applications that set values like "30"
need to specify a value with units like "30s" now, to avoid being interpreted as milliseconds; otherwise,
need to specify a value with units like "30s" now, to avoid being interpreted as milliseconds; otherwise,
the extremely short interval that results will likely cause applications to fail.

- When turning a Dataset to another Dataset, Spark will up cast the fields in the original Dataset to the type of corresponding fields in the target DataSet. In version 2.4 and earlier, this up cast is not very strict, e.g. `Seq("str").toDS.as[Int]` fails, but `Seq("str").toDS.as[Boolean]` works and throw NPE during execution. In Spark 3.0, the up cast is stricter and turning String into something else is not allowed, i.e. `Seq("str").toDS.as[Boolean]` will fail during analysis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ class Analyzer(
gid: Expression): Expression = {
expr transform {
case e: GroupingID =>
if (e.groupByExprs.isEmpty || e.groupByExprs == groupByExprs) {
if (e.groupByExprs.isEmpty ||
e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) {
Alias(gid, toPrettySQL(e))()
} else {
throw new AnalysisException(
Expand Down Expand Up @@ -1164,6 +1165,8 @@ class Analyzer(
// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
// intersect/except will be rewritten to join at the begininng of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val FAIL_AMBIGUOUS_SELF_JOIN =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would FAIL_AMBIGUOUS_DATASET_COLUMN_REF be more accurate?
And in the doc below, we can describe when this usually happens (e.g., with self-join).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A self-join that contains ambiguous column reference is an ambiguous self-join. I think ambiguous self-join is shorter and easier to understand, as a config name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW there are no other places that can make column reference ambiguous. Self-join is the only place.

buildConf("spark.sql.analyzer.failAmbiguousSelfJoin")
.doc("When true, fail the Dataset query if it contains ambiguous self-join.")
.internal()
.booleanConf
.createWithDefault(true)

// Whether to retain group by columns or not in GroupedData.agg.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving this comment to .doc() like the above is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this patch, we can fix it and other similar configs in another PR.

val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
.internal()
Expand Down
19 changes: 16 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ private[sql] object Column {
case expr => toPrettySQL(expr)
}
}

private[sql] def stripColumnReferenceMetadata(a: AttributeReference): AttributeReference = {
val metadataWithoutId = new MetadataBuilder()
.withMetadata(a.metadata)
.remove(Dataset.DATASET_ID_KEY)
.remove(Dataset.COL_POS_KEY)
.build()
a.withMetadata(metadataWithoutId)
}
}

/**
Expand Down Expand Up @@ -144,11 +153,15 @@ class Column(val expr: Expression) extends Logging {
override def toString: String = toPrettySQL(expr)

override def equals(that: Any): Boolean = that match {
case that: Column => that.expr.equals(this.expr)
case that: Column => that.normalizedExpr() == this.normalizedExpr()
case _ => false
}

override def hashCode: Int = this.expr.hashCode()
override def hashCode: Int = this.normalizedExpr().hashCode()

private def normalizedExpr(): Expression = expr transform {
case a: AttributeReference => Column.stripColumnReferenceMetadata(a)
}

/** Creates a column based on the given expression. */
private def withExpr(newExpr: Expression): Column = new Column(newExpr)
Expand Down Expand Up @@ -1008,7 +1021,7 @@ class Column(val expr: Expression) extends Logging {
* @since 2.0.0
*/
def name(alias: String): Column = withExpr {
expr match {
normalizedExpr() match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
Expand Down
42 changes: 37 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.annotation.{DeveloperApi, Evolving, Experimental, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonEvalType, PythonRDD, SerDeUtil}
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.api.r.RRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand All @@ -45,13 +45,15 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand All @@ -61,6 +63,11 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

private[sql] object Dataset {
val curId = new java.util.concurrent.atomic.AtomicLong()
val DATASET_ID_KEY = "__dataset_id"
val COL_POS_KEY = "__col_position"
val DATASET_ID_TAG = TreeNodeTag[Long]("dataset_id")

def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
// Eagerly bind the encoder so we verify that the encoder matches the underlying
Expand Down Expand Up @@ -182,6 +189,9 @@ class Dataset[T] private[sql](
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends Serializable {

// A globally unique id of this Dataset.
private val id = Dataset.curId.getAndIncrement()

queryExecution.assertAnalyzed()

// Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure
Expand All @@ -198,14 +208,18 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
val plan = queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN)) {
plan.setTagValue(Dataset.DATASET_ID_TAG, id)
}
plan
}

/**
Expand Down Expand Up @@ -1311,11 +1325,29 @@ class Dataset[T] private[sql](
if (sqlContext.conf.supportQuotedRegexColumnName) {
colRegex(colName)
} else {
val expr = resolve(colName)
Column(expr)
Column(addDataFrameIdToCol(resolve(colName)))
}
}

// Attach the dataset id and column position to the column reference, so that we can detect
// ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`.
// This must be called before we return a `Column` that contains `AttributeReference`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, document these metadata will be removed in the rule DetectAmbbiguousSelfJoin

// Note that, the metadata added here are only avaiable in the analyzer, as the analyzer rule
// `DetectAmbiguousSelfJoin` will remove it.
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
case a: AttributeReference
if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN) =>
val metadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(Dataset.DATASET_ID_KEY, id)
.putLong(Dataset.COL_POS_KEY, logicalPlan.output.indexWhere(a.semanticEquals))
.build()
a.withMetadata(metadata)
}
newExpr.asInstanceOf[NamedExpression]
}

/**
* Selects column based on the column name specified as a regex and returns it as [[Column]].
* @group untypedrel
Expand All @@ -1329,7 +1361,7 @@ class Dataset[T] private[sql](
case ParserUtils.qualifiedEscapedIdentifier(nameParts, columnNameRegex) =>
Column(UnresolvedRegex(columnNameRegex, Some(nameParts), caseSensitive))
case _ =>
Column(resolve(colName))
Column(addDataFrameIdToCol(resolve(colName)))
}
}

Expand Down
Loading