Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dropping columns as a metadata-only operation #1076

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,28 @@
"message" : [ "Writing data with column mapping mode is not supported." ],
"sqlState" : "0A000"
},
"UNSUPPORTED_DROP_COLUMN" : {
"message" : [ "DROP COLUMN is not supported for your Delta table. %s" ],
"sqlState" : "0A000"
},
"UNSUPPORTED_DROP_PARTITION_COLUMN" : {
"message" : [ "Dropping partition columns (%s) is not allowed." ],
"sqlState" : "0A000"
},
"UNSUPPORTED_INVALID_CHARACTERS_IN_COLUMN_NAME" : {
"message" : [ "Found invalid character(s) among ' ,;{}()\\n\\t=' in the column names of your schema. %s" ],
"sqlState" : "0A000"
},
"UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING" : {
"message" : [ "Manifest generation is not supported for tables that leverage column mapping, as external readers cannot read these Delta tables. See Databricks documentation for more details." ],
"sqlState" : "0A000"
},
"UNSUPPORTED_NESTED_COLUMN_IN_BLOOM_FILTER" : {
"message" : [ "Creating a bloom filer index on a nested column is currently unsupported: %s" ],
"sqlState" : "0A000"
},
"UNSUPPORTED_RENAME_COLUMN" : {
"message" : [ "Column rename is not supported for your Delta table. %s" ],
"sqlState" : "0A000"
}
}
66 changes: 30 additions & 36 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1325,32 +1325,32 @@ object DeltaErrors
s"$oldProtocol"))
}

def columnRenameNotSupported(spark: SparkSession, protocol: Protocol): Throwable = {
// scalastyle:off line.size.limit
val adviceMsg = if (!DeltaColumnMapping.satisfyColumnMappingProtocol(protocol)) {
s"""
|Please upgrade your Delta table to reader version 2 and writer version 5 (Refer to table versioning at ${generateDocsLink(spark.sparkContext.getConf, "/versioning.html")})
| and change the column mapping mode to name mapping. You can use the following command:
|
| ALTER TABLE <table_name> SET TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5')
|
""".stripMargin
} else {
s"""
|Please change the column mapping mode to name mapping mode. You can use the following command:
|
| ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""".stripMargin
}
private def columnMappingAdviceMessage: String = {
s"""
|Please upgrade your Delta table to reader version 2 and writer version 5
| and change the column mapping mode to 'name' mapping. You can use the following command:
|
| ALTER TABLE <table_name> SET TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5')
|
""".stripMargin
}

new AnalysisException(
s"""
|Column rename is not supported for your Delta table. $adviceMsg
|""".stripMargin)
// scalastyle:on line.size.limit
def columnRenameNotSupported: Throwable = {
val adviceMsg = columnMappingAdviceMessage
new DeltaAnalysisException("UNSUPPORTED_RENAME_COLUMN", Array(adviceMsg))
}

def dropColumnNotSupported(suggestUpgrade: Boolean): Throwable = {
val adviceMsg = if (suggestUpgrade) columnMappingAdviceMessage else ""
new DeltaAnalysisException("UNSUPPORTED_DROP_COLUMN", Array(adviceMsg))
}

def dropPartitionColumnNotSupported(droppingPartCols: Seq[String]): Throwable = {
new DeltaAnalysisException("UNSUPPORTED_DROP_PARTITION_COLUMN",
Array(droppingPartCols.mkString(",")))
}

def schemaChangeDuringMappingModeChangeNotSupported(
Expand All @@ -1362,17 +1362,11 @@ object DeltaErrors
formatSchema(oldSchema),
formatSchema(newSchema)))

def foundInvalidCharsInColumnNames(cause: Throwable): Throwable = {
// scalastyle:off line.size.limit
var adviceMsg = "Please use alias to rename it."

new AnalysisException(
s"""
|Found invalid character(s) among " ,;{}()\\n\\t=" in the column names of your
|schema. $adviceMsg
|""".stripMargin, cause = Some(cause))
// scalastyle:on line.size.limit
}
def foundInvalidCharsInColumnNames(cause: Throwable): Throwable =
new DeltaAnalysisException(
errorClass = "UNSUPPORTED_INVALID_CHARACTERS_IN_COLUMN_NAME",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to include the column name here to make it easy for the user to find the column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column name shows up in the child exception, so i think that would be ok. it's a bit involved to surface it to the top level.

messageParameters = Array(columnMappingAdviceMessage),
cause = Some(cause))

def foundViolatingConstraintsForColumnChange(
operation: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ object DeltaOperations {
) ++ colPosition.map("position" -> _.toString)
}))
}

/** Recorded when columns are dropped. */
case class DropColumns(
colsToDrop: Seq[Seq[String]]) extends Operation("DROP COLUMNS") {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name)))
}

/** Recorded when columns are changed. */
case class ChangeColumn(
columnPath: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import scala.collection.mutable
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaErrors, DeltaTableUtils}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError
import org.apache.spark.sql.delta.commands.{AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableSetLocationDeltaCommand, AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, CreateDeltaTableCommand, TableCreationModes}
import org.apache.spark.sql.delta.commands.{AlterTableAddConstraintDeltaCommand, AlterTableDropConstraintDeltaCommand, WriteIntoDelta}
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -459,6 +458,10 @@ class DeltaCatalog extends DelegatingCatalogExtension
Option(col.position()).map(UnresolvedFieldPosition))
}).run(spark)

case (t, deleteColumns) if t == classOf[DeleteColumn] =>
AlterTableDropColumnsDeltaCommand(
table, deleteColumns.asInstanceOf[Seq[DeleteColumn]].map(_.fieldNames().toSeq)).run(spark)

case (t, newProperties) if t == classOf[SetProperty] =>
AlterTableSetPropertiesDeltaCommand(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.delta.commands
// scalastyle:off import.ordering.noEmptyLine
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta._
Expand All @@ -28,14 +27,13 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, IsNull, IsUnknown, Not, Or}
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, QualifiedColType}
import org.apache.spark.sql.catalyst.expressions.{IsNull, IsUnknown, Not, Or}
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, QualifiedColType}
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition, First}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
Expand All @@ -55,6 +53,37 @@ trait AlterDeltaTableCommand extends DeltaCommand {
}
txn
}

/**
* Check if the column to change has any dependent expressions:
* - generated column expressions
* - check constraints
*/
protected def checkDependentExpressions(
sparkSession: SparkSession,
columnParts: Seq[String],
newMetadata: actions.Metadata,
protocol: Protocol,
operationName: String): Unit = {
if (!sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS)) {
return
}
// check if the column to change is referenced by check constraints
val dependentConstraints =
Constraints.findDependentConstraints(sparkSession, columnParts, newMetadata)
if (dependentConstraints.nonEmpty) {
throw DeltaErrors.foundViolatingConstraintsForColumnChange(
operationName, UnresolvedAttribute(columnParts).name, dependentConstraints)
}
// check if the column to change is referenced by any generated columns
val dependentGenCols = SchemaUtils.findDependentGeneratedColumns(
sparkSession, columnParts, protocol, newMetadata.schema)
if (dependentGenCols.nonEmpty) {
throw DeltaErrors.foundViolatingGeneratedColumnsForColumnChange(
operationName, UnresolvedAttribute(columnParts).name, dependentGenCols.toList)
}
}
}

/**
Expand Down Expand Up @@ -235,6 +264,60 @@ case class AlterTableAddColumnsDeltaCommand(
}
}

/**
* A command that drop columns from a Delta table.
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* DROP COLUMN(S) (col_name_1, col_name_2, ...);
* }}}
*/
case class AlterTableDropColumnsDeltaCommand(
table: DeltaTableV2,
columnsToDrop: Seq[Seq[String]])
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
if (!sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_ALTER_TABLE_DROP_COLUMN_ENABLED)) {
// this featue is still behind the flag and not ready for release.
throw DeltaErrors.dropColumnNotSupported(suggestUpgrade = false)
}
val deltaLog = table.deltaLog
recordDeltaOperation(deltaLog, "delta.ddl.alter.dropColumns") {
val txn = startTransaction()
val metadata = txn.metadata
if (txn.metadata.columnMappingMode == NoMapping) {
throw DeltaErrors.dropColumnNotSupported(suggestUpgrade = true)
}
val newSchema = columnsToDrop.foldLeft(metadata.schema) { case (schema, columnPath) =>
val (parentPosition, _) =
SchemaUtils.findColumnPosition(
columnPath, schema, sparkSession.sessionState.conf.resolver)
SchemaUtils.dropColumn(schema, parentPosition)._1
}

// in case any of the dropped column is partition columns
val droppedColumnSet = columnsToDrop.map(UnresolvedAttribute(_).name).toSet
val droppingPartitionCols = metadata.partitionColumns.filter(droppedColumnSet.contains(_))
if (droppingPartitionCols.nonEmpty) {
throw DeltaErrors.dropPartitionColumnNotSupported(droppingPartitionCols)
}

val newMetadata = metadata.copy(schemaString = newSchema.json)

columnsToDrop.foreach { columnParts =>
checkDependentExpressions(sparkSession, columnParts, newMetadata, txn.protocol, "drop")
}

txn.updateMetadata(newMetadata)
txn.commit(Nil, DeltaOperations.DropColumns(columnsToDrop))

Seq.empty[Row]
}
}
}

/**
* A command to change the column for a Delta table, support changing the comment of a column and
* reordering columns.
Expand Down Expand Up @@ -308,41 +391,10 @@ case class AlterTableChangeColumnDeltaCommand(
val newMetadata = metadata.copy(
schemaString = newSchema.json, partitionColumns = newPartitionColumns)

// need to validate the changes if there is a column rename
if (newColumn.name != columnName &&
sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS)) {
val columnParts = columnPath :+ columnName
// if renaming column, need to check if the column is referenced by check constraints
val dependentConstraints = newMetadata.configuration.filter {
case (key, constraint) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
SchemaUtils.containsDependentExpression(sparkSession, columnParts, constraint, resolver)
case _ => false
}

if (dependentConstraints.nonEmpty) {
throw DeltaErrors.foundViolatingConstraintsForColumnChange(
"rename", UnresolvedAttribute(columnParts).name, dependentConstraints)
}

// if renaming column, need to check if the change affects any generated columns
if (GeneratedColumn.satisfyGeneratedColumnProtocol(txn.protocol) &&
GeneratedColumn.hasGeneratedColumns(newMetadata.schema)) {

val dependentGenCols = ArrayBuffer[StructField]()
SchemaMergingUtils.transformColumns(newMetadata.schema) { (_, field, _) =>
GeneratedColumn.getGenerationExpressionStr(field.metadata).foreach { exprStr =>
val needsToChangeExpr = SchemaUtils.containsDependentExpression(
sparkSession, columnParts, exprStr, resolver)
if (needsToChangeExpr) dependentGenCols += field
}
field
}
if (dependentGenCols.nonEmpty) {
throw DeltaErrors.foundViolatingGeneratedColumnsForColumnChange(
"rename", UnresolvedAttribute(columnParts).name, dependentGenCols.toList)
}
}
if (newColumn.name != columnName) {
// need to validate the changes if the column is renamed
checkDependentExpressions(
sparkSession, columnPath :+ columnName, newMetadata, txn.protocol, "rename")
}

txn.updateMetadata(newMetadata)
Expand Down Expand Up @@ -432,7 +484,7 @@ case class AlterTableChangeColumnDeltaCommand(
}

if (SchemaUtils.canChangeDataType(originalField.dataType, newColumn.dataType, resolver,
columnPath :+ originalField.name).nonEmpty) {
txn.metadata.columnMappingMode, columnPath :+ originalField.name).nonEmpty) {
throw DeltaErrors.alterTableChangeColumnException(
s"'${UnresolvedAttribute(columnPath :+ originalField.name).name}' with type " +
s"'${originalField.dataType}" +
Expand All @@ -444,7 +496,7 @@ case class AlterTableChangeColumnDeltaCommand(

if (columnName != newColumn.name) {
if (txn.metadata.columnMappingMode == NoMapping) {
throw DeltaErrors.columnRenameNotSupported(spark, txn.protocol)
throw DeltaErrors.columnRenameNotSupported
}
}

Expand Down Expand Up @@ -485,7 +537,8 @@ case class AlterTableReplaceColumnsDeltaCommand(
val resolver = sparkSession.sessionState.conf.resolver
val changingSchema = StructType(columns)

SchemaUtils.canChangeDataType(existingSchema, changingSchema, resolver).foreach { operation =>
SchemaUtils.canChangeDataType(existingSchema, changingSchema, resolver,
txn.metadata.columnMappingMode).foreach { operation =>
throw DeltaErrors.alterTableReplaceColumnsException(
existingSchema, changingSchema, operation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.constraints
import java.util.Locale

import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.schema.SchemaUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -86,4 +87,19 @@ object Constraints {
def checkConstraintPropertyName(constraintName: String): String = {
"delta.constraints." + constraintName.toLowerCase(Locale.ROOT)
}

/**
* Find all the check constraints that reference the given column name.
*/
def findDependentConstraints(
sparkSession: SparkSession,
columnName: Seq[String],
metadata: Metadata): Map[String, String] = {
metadata.configuration.filter {
case (key, constraint) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
SchemaUtils.containsDependentExpression(
sparkSession, columnName, constraint, sparkSession.sessionState.conf.resolver)
case _ => false
}
}
}
Loading