Skip to content

Commit

Permalink
[SC-83988][DELTA] Upgrade Spark to 3.2.0 in OSS Delta
Browse files Browse the repository at this point in the history
This PR upgrades Spark to 3.2.0 for OSS Delta. The major change we need to do is implementing `withNewChildrenInternal`(TreeNode)  or `withNewChildInternal` (LeafNode) for Delta's logical nodes.

Closes #618 #765

GitOrigin-RevId: 6e05659bfffd563a19b701fcb2dc0b58c886d5f4
  • Loading branch information
zsxwing authored and mengtong-db committed Nov 5, 2021
1 parent 5ad7047 commit 5571d95
Show file tree
Hide file tree
Showing 49 changed files with 307 additions and 297 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ jobs:
~/.sbt
~/.ivy2
~/.cache/coursier
key: delta-sbt-cache
# Change the key if dependencies are changed. For each key, GitHub Actions will cache the
# the above directories when we use the key for the first time. After that, each run will
# just use the cache. The cache is immutable so we need to use a new key when trying to
# cache new stuff.
key: delta-sbt-cache-spark3.2
- name: Install Job dependencies
shell: bash -l {0}
run: |
Expand All @@ -33,7 +37,7 @@ jobs:
pyenv install 3.7.4
pyenv global system 3.7.4
pipenv --python 3.7 install
pipenv run pip install pyspark==3.1.1
pipenv run pip install pyspark==3.2.0
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install importlib_metadata==3.10.0
- name: Run Scala/Java and Python tests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM python:3.7.3-stretch

RUN apt-get update && apt-get -y install openjdk-8-jdk

RUN pip install pyspark==3.1.1
RUN pip install pyspark==3.2.0

COPY . /usr/src/delta

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import java.nio.file.Files

val sparkVersion = "3.1.1"
val sparkVersion = "3.2.0"
scalaVersion := "2.12.14"

lazy val commonSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
private def createUnresolvedTable(
tableName: Seq[String],
commandName: String): UnresolvedTable = {
UnresolvedTable(tableName, commandName)
UnresolvedTable(tableName, commandName, relationTypeMismatchHint = None)
}

// Build the text of the CHECK constraint expression. The user-specified whitespace is in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils}
import org.apache.spark.sql.delta.commands.VacuumCommand
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.types.StringType

/**
Expand All @@ -35,7 +35,7 @@ case class VacuumTableCommand(
path: Option[String],
table: Option[TableIdentifier],
horizonHours: Option[Double],
dryRun: Boolean) extends RunnableCommand {
dryRun: Boolean) extends LeafRunnableCommand {

override val output: Seq[Attribute] =
Seq(AttributeReference("path", StringType, nullable = true)())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ case class DeltaDelete(
extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildInternal(newChild: LogicalPlan): DeltaDelete =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ case class DeltaUpdateTable(

override def output: Seq[Attribute] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildInternal(newChild: LogicalPlan): DeltaUpdateTable =
copy(child = newChild)
}

object DeltaUpdateTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}

import org.apache.spark.sql.connector.catalog.TableChange

/**
* The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
*/
case class AlterTableAddConstraint(
table: LogicalPlan, constraintName: String, expr: String) extends Command {
// TODO: extend UnaryCommand when new Spark version released, now fails on OSS Delta build
override def children: Seq[LogicalPlan] = Seq(table)
// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
table: LogicalPlan, constraintName: String, expr: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = Seq(AddConstraint(constraintName, expr))

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... DROP CONSTRAINT command.
*/
case class AlterTableDropConstraint(
table: LogicalPlan, constraintName: String) extends Command {
// TODO: extend UnaryCommand when new Spark version released, now fails on OSS Delta build
override def children: Seq[LogicalPlan] = Seq(table)
// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
table: LogicalPlan, constraintName: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = Seq(DropConstraint(constraintName))

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ case class DeltaMergeAction(
override def toString: String = s"$targetColString = $expr"
private lazy val targetColString: String = targetColNameParts.mkString("`", "`.`", "`")

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildInternal(newChild: Expression): DeltaMergeAction =
copy(expr = newChild)
}


Expand Down Expand Up @@ -165,7 +166,14 @@ case class DeltaMergeIntoUpdateClause(condition: Option[Expression], actions: Se
def this(cond: Option[Expression], cols: Seq[UnresolvedAttribute], exprs: Seq[Expression]) =
this(cond, DeltaMergeIntoClause.toActions(cols, exprs))

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): DeltaMergeIntoUpdateClause = {
if (condition.isDefined) {
copy(condition = Some(newChildren.head), actions = newChildren.tail)
} else {
copy(condition = None, actions = newChildren)
}
}
}

/** Represents the clause WHEN MATCHED THEN DELETE in MERGE. See [[DeltaMergeInto]]. */
Expand All @@ -175,7 +183,9 @@ case class DeltaMergeIntoDeleteClause(condition: Option[Expression])
children
override def actions: Seq[Expression] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): DeltaMergeIntoDeleteClause =
copy(condition = if (condition.isDefined) Some(newChildren.head) else None)
}

/** Represents the clause WHEN NOT MATCHED THEN INSERT in MERGE. See [[DeltaMergeInto]]. */
Expand All @@ -185,7 +195,13 @@ case class DeltaMergeIntoInsertClause(condition: Option[Expression], actions: Se
def this(cond: Option[Expression], cols: Seq[UnresolvedAttribute], exprs: Seq[Expression]) =
this(cond, DeltaMergeIntoClause.toActions(cols, exprs))

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): DeltaMergeIntoInsertClause =
if (condition.isDefined) {
copy(condition = Some(newChildren.head), actions = newChildren.tail)
} else {
copy(condition = None, actions = newChildren)
}
}

/**
Expand Down Expand Up @@ -237,7 +253,9 @@ case class DeltaMergeInto(
// TODO: extend BinaryCommand once the new Spark version is released
override def children: Seq[LogicalPlan] = Seq(target, source)
override def output: Seq[Attribute] = Seq.empty
// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): DeltaMergeInto =
copy(target = newChildren(0), source = newChildren(1))
}

object DeltaMergeInto {
Expand Down
25 changes: 6 additions & 19 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,21 @@ class DeltaAnalysis(session: SparkSession)
DeltaMergeIntoUpdateClause(
update.condition,
DeltaMergeIntoClause.toActions(update.assignments))
case update: UpdateStarAction =>
DeltaMergeIntoUpdateClause(update.condition, DeltaMergeIntoClause.toActions(Nil))
case delete: DeleteAction =>
DeltaMergeIntoDeleteClause(delete.condition)
case insert =>
case other =>
throw new AnalysisException(
"Insert clauses cannot be part of the WHEN MATCHED clause in MERGE INTO.")
s"${other.prettyName} clauses cannot be part of the WHEN MATCHED clause in MERGE INTO.")
}
val notMatchedActions = notMatched.map {
case insert: InsertAction =>
DeltaMergeIntoInsertClause(
insert.condition,
DeltaMergeIntoClause.toActions(insert.assignments))
case insert: InsertStarAction =>
DeltaMergeIntoInsertClause(insert.condition, DeltaMergeIntoClause.toActions(Nil))
case other =>
throw new AnalysisException(s"${other.prettyName} clauses cannot be part of the " +
s"WHEN NOT MATCHED clause in MERGE INTO.")
Expand All @@ -166,23 +170,6 @@ class DeltaAnalysis(session: SparkSession)
} else deltaMerge
d.copy(target = stripTempViewForMergeWrapper(d.target))

// TODO: remove the 2 cases below after OSS 3.2 is released.
case AlterTableAddConstraint(t: ResolvedTable, constraintName, expr)
if t.table.isInstanceOf[DeltaTableV2] =>
CatalogV2Util.createAlterTable(
t.catalog.name +: t.identifier.asMultipartIdentifier,
t.catalog,
t.identifier.asMultipartIdentifier,
Seq(AddConstraint(constraintName, expr)))

case AlterTableDropConstraint(t: ResolvedTable, constraintName)
if t.table.isInstanceOf[DeltaTableV2] =>
CatalogV2Util.createAlterTable(
t.catalog.name +: t.identifier.asMultipartIdentifier,
t.catalog,
t.identifier.asMultipartIdentifier,
Seq(DropConstraint(constraintName)))

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object DeltaTableUtils extends PredicateHelper
*/
def isDeltaTable(spark: SparkSession, tableName: TableIdentifier): Boolean = {
val catalog = spark.sessionState.catalog
val tableIsNotTemporaryTable = !catalog.isTemporaryTable(tableName)
val tableIsNotTemporaryTable = !catalog.isTempView(tableName)
val tableExists =
(tableName.database.isEmpty || catalog.databaseExists(tableName.database.get)) &&
catalog.tableExists(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object DeltaTableIdentifier extends Logging {
*/
def isDeltaPath(spark: SparkSession, identifier: TableIdentifier): Boolean = {
val catalog = spark.sessionState.catalog
def tableIsTemporaryTable = catalog.isTemporaryTable(identifier)
def tableIsTemporaryTable = catalog.isTempView(identifier)
def tableExists: Boolean = {
try {
catalog.databaseExists(identifier.database.get) && catalog.tableExists(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class DeltaUnsupportedOperationsCheck(spark: SparkSession)
recordDeltaEvent(null, "delta.unsupported.dropPartition")
fail(operation = "ALTER TABLE DROP PARTITION", a.tableName)

case a: AlterTableRecoverPartitionsCommand =>
case a: RepairTableCommand =>
recordDeltaEvent(null, "delta.unsupported.recoverPartitions")
fail(operation = "ALTER TABLE RECOVER PARTITIONS", a.tableName)

Expand Down
Loading

0 comments on commit 5571d95

Please sign in to comment.