From 55c0a01c873d9b0445433c0e73187019f3c8689b Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 4 Mar 2024 16:23:27 +0100 Subject: [PATCH 1/7] Record type widening metadata --- .../spark/sql/delta/ConflictChecker.scala | 23 + .../sql/delta/OptimisticTransaction.scala | 2 +- .../sql/delta/TypeWideningMetadata.scala | 195 +++++++ .../commands/alterDeltaTableCommands.scala | 13 +- .../sql/delta/schema/SchemaMergingUtils.scala | 46 ++ .../DeltaTypeWideningMetadataSuite.scala | 502 ++++++++++++++++++ .../sql/delta/DeltaTypeWideningSuite.scala | 243 ++++++++- 7 files changed, 1014 insertions(+), 10 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index 56fdb10726c..54d59677619 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -161,6 +161,9 @@ private[delta] class ConflictChecker( reassignOverlappingRowIds() reassignRowCommitVersions() + // Update the table version in newly added type widening metadata. + updateTypeWideningMetadata() + // Data file checks. checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn() checkForDeletedFilesAgainstCurrentTxnReadFiles() @@ -486,6 +489,26 @@ private[delta] class ConflictChecker( actions = updatedActions) } + /** + * Metadata is recorded in the table schema on type changes. This includes the table version that + * the change was made in, which needs to be updated when there's a conflict. + */ + private def updateTypeWideningMetadata(): Unit = { + if (!TypeWidening.isEnabled(currentTransactionInfo.protocol, currentTransactionInfo.metadata)) { + return + } + val newActions = currentTransactionInfo.actions.map { + case metadata: Metadata => + val updatedSchema = TypeWideningMetadata.updateTypeChangeVersion( + schema = metadata.schema, + fromVersion = winningCommitVersion, + toVersion = winningCommitVersion + 1L) + metadata.copy(schemaString = updatedSchema.json) + case a => a + } + currentTransactionInfo = currentTransactionInfo.copy(actions = newActions) + } + /** * Checks whether the Row IDs assigned by the current transaction overlap with the Row IDs * assigned by the winning transaction. I.e. this function checks whether both the winning and the diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 40b370026e4..0f958677ff5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1963,7 +1963,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite } /** Returns the version that the first attempt will try to commit at. */ - protected def getFirstAttemptVersion: Long = readVersion + 1L + private[delta] def getFirstAttemptVersion: Long = readVersion + 1L /** Returns the conflicting commit information */ protected def getConflictingVersions(previousAttemptVersion: Long): Seq[FileStatus] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala new file mode 100644 index 00000000000..82accd624ac --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -0,0 +1,195 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.util.ScalaExtensions._ + +import org.apache.spark.sql.types._ + +/** + * Information corresponding to a single type change. + * @param version The version of the table where the type change was made. + * @param fromType The original type before the type change. + * @param toType The new type after the type change. + * @param fieldPath The path inside nested maps and arrays to the field where the type change was + * made. Each path element is either `key`/`value` for maps or `element` for + * arrays. The path is empty if the type change was applied inside a map or array. + */ +private[delta] case class TypeChange( + version: Long, + fromType: DataType, + toType: DataType, + fieldPath: Seq[String]) { + import TypeChange._ + + /** Serialize this type change to a [[Metadata]] object. */ + def toMetadata: Metadata = { + val builder = new MetadataBuilder() + .putLong(TABLE_VERSION_METADATA_KEY, version) + .putString(FROM_TYPE_METADATA_KEY, fromType.typeName) + .putString(TO_TYPE_METADATA_KEY, toType.typeName) + if (fieldPath.nonEmpty) { + builder.putString(FIELD_PATH_METADATA_KEY, fieldPath.mkString(".")) + } + builder.build() + } +} + +private[delta] object TypeChange { + val TABLE_VERSION_METADATA_KEY: String = "tableVersion" + val FROM_TYPE_METADATA_KEY: String = "fromType" + val TO_TYPE_METADATA_KEY: String = "toType" + val FIELD_PATH_METADATA_KEY: String = "fieldPath" + + /** Deserialize this type change from a [[Metadata]] object. */ + def fromMetadata(metadata: Metadata): TypeChange = { + val fieldPath = if (metadata.contains(FIELD_PATH_METADATA_KEY)) { + metadata.getString(FIELD_PATH_METADATA_KEY).split("\\.").toSeq + } else { + Seq.empty + } + TypeChange( + version = metadata.getLong(TABLE_VERSION_METADATA_KEY), + fromType = DataType.fromDDL(metadata.getString(FROM_TYPE_METADATA_KEY)), + toType = DataType.fromDDL(metadata.getString(TO_TYPE_METADATA_KEY)), + fieldPath + ) + } +} + +/** + * Represents all type change information for a single struct field + * @param typeChanges The type changes that have been applied to the field. + */ +private[delta] case class TypeWideningMetadata(typeChanges: Seq[TypeChange]) { + + import TypeWideningMetadata._ + + /** + * Add the type changes to the metadata of the given field, preserving any pre-existing type + * widening metadata. + */ + def appendToField(field: StructField): StructField = { + if (typeChanges.isEmpty) return field + + val existingTypeChanges = fromField(field).map(_.typeChanges).getOrElse(Seq.empty) + val allTypeChanges = existingTypeChanges ++ typeChanges + + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + .putMetadataArray(TYPE_CHANGES_METADATA_KEY, allTypeChanges.map(_.toMetadata).toArray) + .build() + field.copy(metadata = newMetadata) + } +} + +private[delta] object TypeWideningMetadata { + val TYPE_CHANGES_METADATA_KEY: String = "delta.typeChanges" + + /** Read the type widening metadata from the given field. */ + def fromField(field: StructField): Option[TypeWideningMetadata] = { + Option.when(field.metadata.contains(TYPE_CHANGES_METADATA_KEY)) { + val typeChanges = field.metadata.getMetadataArray(TYPE_CHANGES_METADATA_KEY) + .map { changeMetadata => + TypeChange.fromMetadata(changeMetadata) + }.toSeq + TypeWideningMetadata(typeChanges) + } + } + + /** + * Computes the type changes from `oldSchema` to `schema` and adds corresponding type change + * metadata to `schema`. + */ + def addTypeWideningMetadata( + txn: OptimisticTransaction, + schema: StructType, + oldSchema: StructType): StructType = { + + if (!TypeWidening.isEnabled(txn.protocol, txn.metadata)) return schema + + if (DataType.equalsIgnoreNullability(schema, oldSchema)) return schema + + SchemaMergingUtils.transformColumns(schema, oldSchema) { + case (_, newField, Some(oldField), _) => + // Record the version the transaction will attempt to use in the type change metadata. If + // there's a conflict with another transaction, the version in the metadata will be updated + // during conflict resolution. See [[ConflictChecker.updateTypeWideningMetadata()]]. + val typeChanges = + collectTypeChanges(oldField.dataType, newField.dataType, txn.getFirstAttemptVersion) + TypeWideningMetadata(typeChanges).appendToField(newField) + case (_, newField, None, _) => + // The field was just added, no need to process. + newField + } + } + + /** + * Recursively collect primitive type changes inside nested maps and arrays between `fromType` and + * `toType`. The `version` is the version of the table where the type change was made. + */ + private def collectTypeChanges(fromType: DataType, toType: DataType, version: Long) + : Seq[TypeChange] = (fromType, toType) match { + case (from: MapType, to: MapType) => + collectTypeChanges(from.keyType, to.keyType, version).map { typeChange => + typeChange.copy(fieldPath = "key" +: typeChange.fieldPath) + } ++ + collectTypeChanges(from.valueType, to.valueType, version).map { typeChange => + typeChange.copy(fieldPath = "value" +: typeChange.fieldPath) + } + case (from: ArrayType, to: ArrayType) => + collectTypeChanges(from.elementType, to.elementType, version).map { typeChange => + typeChange.copy(fieldPath = "element" +: typeChange.fieldPath) + } + case (fromType: AtomicType, toType: AtomicType) if fromType != toType => + Seq(TypeChange( + version, + fromType, + toType, + fieldPath = Seq.empty + )) + case (_: AtomicType, _: AtomicType) => Seq.empty + // Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct + // fields instead to only collect type changes inside these fields. + case (_: StructType, _: StructType) => Seq.empty + } + + /** + * Change the `tableVersion` value in the type change metadata present in `schema`. Used during + * conflict resolution to update the version associated with the transaction is incremented. + */ + def updateTypeChangeVersion(schema: StructType, fromVersion: Long, toVersion: Long): StructType = + SchemaMergingUtils.transformColumns(schema) { + case (_, field, _) => + fromField(field) match { + case Some(typeWideningMetadata) => + val updatedTypeChanges = typeWideningMetadata.typeChanges.map { + case typeChange if typeChange.version == fromVersion => + typeChange.copy(version = toVersion) + case olderTypeChange => olderTypeChange + } + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + .putMetadataArray( + TYPE_CHANGES_METADATA_KEY, + updatedTypeChanges.map(_.toMetadata).toArray) + .build() + field.copy(metadata = newMetadata) + + case None => field + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 7e237ab0d96..3f3e353a618 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -614,8 +614,11 @@ case class AlterTableChangeColumnDeltaCommand( val newConfiguration = metadata.configuration ++ StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath) + val newSchemaWithTypeWideningMetadata = + TypeWideningMetadata.addTypeWideningMetadata(txn, schema = newSchema, oldSchema = oldSchema) + val newMetadata = metadata.copy( - schemaString = newSchema.json, + schemaString = newSchemaWithTypeWideningMetadata.json, partitionColumns = newPartitionColumns, configuration = newConfiguration ) @@ -816,7 +819,13 @@ case class AlterTableReplaceColumnsDeltaCommand( SchemaMergingUtils.checkColumnNameDuplication(newSchema, "in replacing columns") SchemaUtils.checkSchemaFieldNames(newSchema, metadata.columnMappingMode) - val newMetadata = metadata.copy(schemaString = newSchema.json) + val newSchemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata( + txn, + schema = newSchema, + oldSchema = existingSchema + ) + + val newMetadata = metadata.copy(schemaString = newSchemaWithTypeWideningMetadata.json) txn.updateMetadata(newMetadata) txn.commit(Nil, DeltaOperations.ReplaceColumns(columns)) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala index ab65cad2bd0..6aac07492e2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -333,6 +333,52 @@ object SchemaMergingUtils { transform(Seq.empty, schema) } + /** + * Transform (nested) columns in `schema` by walking down `schema` and `other` simultaneously. + * This allows comparing the two schemas and transforming `schema` based on the comparison. + * Columns or fields present only in `other` are ignored while `None` is passed to the transform + * function for columns or fields missing in `other`. + * @param schema Schema to transform. + * @param other Schema to compare with. + * @param tf Function to apply. The function arguments are the full path of the current field to + * transform, the current field in `schema` and, if present, the corresponding field in + * `other`. + */ + def transformColumns( + schema: StructType, + other: StructType)( + tf: (Seq[String], StructField, Option[StructField], Resolver) => StructField): StructType = { + def transform[E <: DataType](path: Seq[String], dt: E, otherDt: E): E = { + val newDt = (dt, otherDt) match { + case (struct: StructType, otherStruct: StructType) => + val otherFields = SchemaMergingUtils.toFieldMap(otherStruct.fields, caseSensitive = true) + StructType(struct.map { field => + val otherField = otherFields.get(field.name) + val newField = tf(path, field, otherField, DELTA_COL_RESOLVER) + otherField match { + case Some(other) => + newField.copy( + dataType = transform(path :+ field.name, field.dataType, other.dataType) + ) + case None => newField + } + }) + case (map: MapType, otherMap: MapType) => + map.copy( + keyType = transform(path :+ "key", map.keyType, otherMap.keyType), + valueType = transform(path :+ "value", map.valueType, otherMap.valueType) + ) + case (array: ArrayType, otherArray: ArrayType) => + array.copy( + elementType = transform(path :+ "element", array.elementType, otherArray.elementType) + ) + case _ => dt + } + newDt.asInstanceOf[E] + } + transform(Seq.empty, schema, other) + } + /** * * Taken from DataType diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala new file mode 100644 index 00000000000..24dc88ed03c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala @@ -0,0 +1,502 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.types._ + +/** + * Suite covering the[[TypeWideningMetadata]] and [[TypeChange]] classes used to handle the metadata + * recorded by the Type Widening table feature in the table schema. + */ +class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest { + private val testTableName: String = "delta_type_widening_metadata_test" + + /** A dummy transaction to be used by tests covering `addTypeWideningMetadata`. */ + private lazy val txn: OptimisticTransaction = { + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testTableName)) + DeltaLog.forTable(spark, TableIdentifier(testTableName)) + .startTransaction(catalogTableOpt = Some(table)) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + sql(s"CREATE TABLE $testTableName (a int) USING delta " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") + } + + override protected def afterAll(): Unit = { + sql(s"DROP TABLE IF EXISTS $testTableName") + super.afterAll() + } + + /** + * Short-hand to build the metadata for a type change to cut down on repetition. + */ + private def typeChangeMetadata( + version: Long, + fromType: String, + toType: String, + path: String = ""): Metadata = { + val builder = new MetadataBuilder() + .putLong("tableVersion", version) + .putString("fromType", fromType) + .putString("toType", toType) + if (path.nonEmpty) { + builder.putString("fieldPath", path) + } + builder.build() + } + + test("toMetadata/fromMetadata with empty path") { + val typeChange = TypeChange(version = 1, IntegerType, LongType, Seq.empty) + assert(typeChange.toMetadata === typeChangeMetadata(version = 1, "integer", "long")) + assert(TypeChange.fromMetadata(typeChange.toMetadata) === typeChange) + } + + test("toMetadata/fromMetadata with non-empty path") { + val typeChange = TypeChange(10, DateType, TimestampNTZType, Seq("key", "element")) + assert(typeChange.toMetadata === + typeChangeMetadata(version = 10, "date", "timestamp_ntz", "key.element")) + assert(TypeChange.fromMetadata(typeChange.toMetadata) === typeChange) + } + + test("fromField with no type widening metadata") { + val field = StructField("a", IntegerType) + assert(TypeWideningMetadata.fromField(field) === None) + } + + test("fromField with empty type widening metadata") { + val field = StructField("a", IntegerType, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array.empty[Metadata]) + .build() + ) + assert(TypeWideningMetadata.fromField(field) === Some(TypeWideningMetadata(Seq.empty))) + val otherField = StructField("a", IntegerType) + // Empty type widening metadata is discarded. + assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === + StructField("a", IntegerType)) + } + + test("fromField with single type change") { + val field = StructField("a", IntegerType, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )).build() + ) + assert(TypeWideningMetadata.fromField(field) === + Some(TypeWideningMetadata(Seq(TypeChange(1, IntegerType, LongType, Seq.empty))))) + val otherField = StructField("a", IntegerType) + assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === field) + } + + test("fromField with multiple type changes") { + val field = StructField("a", IntegerType, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long"), + typeChangeMetadata(version = 10, "decimal(5,0)", "decimal(10,2)", "element.element") + )).build() + ) + assert(TypeWideningMetadata.fromField(field) === + Some(TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty), + TypeChange(10, DecimalType(5, 0), DecimalType(10, 2), Seq("element", "element")))))) + val otherField = StructField("a", IntegerType) + assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === field) + } + + test("appendToField on field with no type widening metadata") { + val field = StructField("a", IntegerType) + // Adding empty type widening metadata should not change the field. + val emptyMetadata = TypeWideningMetadata(Seq.empty) + assert(emptyMetadata.appendToField(field) === field) + assert(TypeWideningMetadata.fromField(emptyMetadata.appendToField(field)).isEmpty) + + // Adding single type change should add the metadata to the field and not otherwise change it. + val singleMetadata = TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty))) + assert(singleMetadata.appendToField(field) === field.copy(metadata = + new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )).build() + ) + ) + val singleMetadataFromField = + TypeWideningMetadata.fromField(singleMetadata.appendToField(field)) + assert(singleMetadataFromField.contains(singleMetadata)) + + // Adding multiple type changes should add the metadata to the field and not otherwise change + // it. + val multipleMetadata = TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty), + TypeChange(6, FloatType, DoubleType, Seq("value")))) + assert(multipleMetadata.appendToField(field) === field.copy(metadata = + new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long"), + typeChangeMetadata(version = 6, "float", "double", "value") + )).build() + ) + ) + + val multipleMetadataFromField = + TypeWideningMetadata.fromField(multipleMetadata.appendToField(field)) + assert(multipleMetadataFromField.contains(multipleMetadata)) + } + + test("appendToField on field with existing type widening metadata") { + val field = StructField("a", IntegerType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )).build() + ) + // Adding empty type widening metadata should not change the field. + val emptyMetadata = TypeWideningMetadata(Seq.empty) + assert(emptyMetadata.appendToField(field) === field) + assert(TypeWideningMetadata.fromField(emptyMetadata.appendToField(field)).contains( + TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty))) + )) + + // Adding single type change should add the metadata to the field and not otherwise change it. + val singleMetadata = TypeWideningMetadata(Seq( + TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) + + assert(singleMetadata.appendToField(field) === field.copy( + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long"), + typeChangeMetadata(version = 5, "decimal(18,0)", "decimal(19,0)") + )).build() + )) + val singleMetadataFromField = + TypeWideningMetadata.fromField(singleMetadata.appendToField(field)) + + assert(singleMetadataFromField.contains(TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty), + TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) + )) + + // Adding multiple type changes should add the metadata to the field and not otherwise change + // it. + val multipleMetadata = TypeWideningMetadata(Seq( + TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), + TypeChange(6, FloatType, DoubleType, Seq("value")))) + + assert(multipleMetadata.appendToField(field) === field.copy( + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long"), + typeChangeMetadata(version = 5, "decimal(18,0)", "decimal(19,0)"), + typeChangeMetadata(version = 6, "float", "double", "value") + )).build() + )) + val multipleMetadataFromField = + TypeWideningMetadata.fromField(multipleMetadata.appendToField(field)) + + assert(multipleMetadataFromField.contains(TypeWideningMetadata(Seq( + TypeChange(1, IntegerType, LongType, Seq.empty), + TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), + TypeChange(6, FloatType, DoubleType, Seq("value")))) + )) + } + + test("addTypeWideningMetadata with no type changes") { + for { + (oldSchema, newSchema) <- Seq( + ("a short", "a short"), + ("a short", "a short NOT NULL"), + ("a short NOT NULL", "a short"), + ("a short NOT NULL", "a short COMMENT 'a comment'"), + ("a string, b int", "b int, a string"), + ("a struct", "a struct"), + ("a struct", "a struct"), + ("a struct", "a struct"), + ("a map", "m map"), + ("a array", "a array"), + ("a map, int>", "a map, int>"), + ("a array>", "a array>") + ).map { case (oldStr, newStr) => StructType.fromDDL(oldStr) -> StructType.fromDDL(newStr) } + } { + withClue(s"oldSchema = $oldSchema, newSchema = $newSchema") { + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) + assert(schema === newSchema) + } + } + } + + test("addTypeWideningMetadata on top-level fields") { + var schema = + StructType.fromDDL("i long, d decimal(15, 4), a array, m map") + val firstOldSchema = + StructType.fromDDL("i short, d decimal(6, 2), a array, m map") + val secondOldSchema = + StructType.fromDDL("i int, d decimal(10, 4), a array, m map") + + schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + + assert(schema("i") === StructField("i", LongType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "short", "long") + )).build() + )) + + assert(schema("d") === StructField("d", DecimalType(15, 4), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "decimal(6,2)", "decimal(15,4)") + )).build() + )) + + assert(schema("a") === StructField("a", ArrayType(DoubleType), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "double", "element") + )).build() + )) + + assert(schema("m") === StructField("m", MapType(ShortType, IntegerType), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "short", "key") + )).build() + )) + + // Second type change on all fields. + schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) + + assert(schema("i") === StructField("i", LongType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "short", "long"), + typeChangeMetadata(version = 1, "integer", "long") + )).build() + )) + + assert(schema("d") === StructField("d", DecimalType(15, 4), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "decimal(6,2)", "decimal(15,4)"), + typeChangeMetadata(version = 1, "decimal(10,4)", "decimal(15,4)") + )).build() + )) + + assert(schema("a") === StructField("a", ArrayType(DoubleType), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "double", "element"), + typeChangeMetadata(version = 1, "integer", "double", "element") + )).build() + )) + + assert(schema("m") === StructField("m", MapType(ShortType, IntegerType), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "short", "key"), + typeChangeMetadata(version = 1, "byte", "integer", "value") + )).build() + )) + } + + test("addTypeWideningMetadata on nested fields") { + var schema = StructType.fromDDL( + "s struct>, m: map, array>>") + val firstOldSchema = StructType.fromDDL( + "s struct>, m: map, array>>") + val secondOldSchema = StructType.fromDDL( + "s struct>, m: map, array>>") + + // First type change on all struct fields. + schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + var struct = schema("s").dataType.asInstanceOf[StructType] + + assert(struct("i") === StructField("i", LongType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "short", "long") + )).build() + )) + + assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "integer", "element.key") + )).build() + )) + + assert(struct("m") === StructField("m", + MapType(MapType(LongType, IntegerType), ArrayType(LongType)), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long", "key.key") + )).build() + )) + + // Second type change on all struct fields. + schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) + struct = schema("s").dataType.asInstanceOf[StructType] + + assert(struct("i") === StructField("i", LongType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "short", "long"), + typeChangeMetadata(version = 1, "integer", "long") + )).build() + )) + + assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "byte", "integer", "element.key"), + typeChangeMetadata(version = 1, "integer", "long", "element.value") + )).build() + )) + + assert(struct("m") === StructField("m", + MapType(MapType(LongType, IntegerType), ArrayType(LongType)), + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long", "key.key"), + typeChangeMetadata(version = 1, "integer", "long", "value.element") + )).build() + )) + } + + test("addTypeWideningMetadata with added and removed fields") { + val newSchema = StructType.fromDDL("a int, b long, d int") + val oldSchema = StructType.fromDDL("a int, b int, c int") + + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) + assert(schema("a") === StructField("a", IntegerType)) + assert(schema("d") === StructField("d", IntegerType)) + assert(!schema.contains("c")) + + assert(schema("b") === StructField("b", LongType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )).build() + )) + } + + test("addTypeWideningMetadata with different field position") { + val initialSchema = StructType.fromDDL("a short, b int, s struct") + val secondSchema = StructType.fromDDL("b int, a short, s struct") + + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, initialSchema, secondSchema) + // No type widening metadata is added. + assert(schema("a") === StructField("a", ShortType)) + assert(schema("b") === StructField("b", IntegerType)) + assert(schema("s") === + StructField("s", new StructType() + .add("c", IntegerType) + .add("d", LongType))) + } + + test("updateTypeChangeVersion with no type changes") { + val schema = new StructType().add("a", IntegerType) + assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === schema) + } + + test("updateTypeChangeVersion with field with single type change") { + val schema = new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )) + .build() + ) + + assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === + new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 4, "integer", "long") + )) + .build() + ) + ) + } + + test("updateTypeChangeVersion with field with multiple type changes") { + val schema = new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long"), + typeChangeMetadata(version = 6, "float", "double", "value") + )) + .build() + ) + + // Update matching one of the type changes. + assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === + new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 4, "integer", "long"), + typeChangeMetadata(version = 6, "float", "double", "value") + )) + .build() + ) + ) + + // Update doesn't match any of the recorded type changes. + assert( + TypeWideningMetadata.updateTypeChangeVersion(schema, 3, 4) === schema + ) + } + + test("updateTypeChangeVersion with multiple fields with a type change") { + val schema = new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "integer", "long") + )) + .build()) + .add("b", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 1, "short", "integer", "element") + )) + .build()) + + // Update both type changes. + assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === + new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 4, "integer", "long") + )) + .build()) + .add("b", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + typeChangeMetadata(version = 4, "short", "integer", "element") + )) + .build()) + ) + + // Update doesn't match any of the recorded type changes. + assert( + TypeWideningMetadata.updateTypeChangeVersion(schema, 3, 4) === schema + ) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index b47158fc3a3..fca8e3e046f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} @@ -38,6 +39,7 @@ class DeltaTypeWideningSuite with DeltaTypeWideningTestMixin with DeltaTypeWideningAlterTableTests with DeltaTypeWideningNestedFieldsTests + with DeltaTypeWideningMetadataTests with DeltaTypeWideningTableFeatureTests /** @@ -249,7 +251,16 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { append(Seq(1, 2).toDF("value").select($"value".cast(ShortType))) assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType)) sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)") - assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType)) + assert(readDeltaTable(tempPath).schema === + new StructType() + .add("value", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "short") + .putLong("tableVersion", 1) + .build() + )).build())) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) append(Seq(3, 4).toDF("value")) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) @@ -337,9 +348,39 @@ trait DeltaTypeWideningNestedFieldsTests { sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a.element TYPE int") assert(readDeltaTable(tempPath).schema === new StructType() - .add("s", new StructType().add("a", ShortType)) - .add("m", MapType(IntegerType, IntegerType)) - .add("a", ArrayType(IntegerType))) + .add("s", new StructType() + .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "short") + .putString("fromType", "byte") + .putLong("tableVersion", 2) + .build() + )).build())) + .add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "byte") + .putLong("tableVersion", 3) + .putString("fieldPath", "key") + .build(), + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "short") + .putLong("tableVersion", 4) + .putString("fieldPath", "value") + .build() + )).build()) + .add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "short") + .putLong("tableVersion", 5) + .putString("fieldPath", "element") + .build() + )).build())) append(Seq((5, 6, 7, 8)) .toDF("a", "b", "c", "d") @@ -357,9 +398,39 @@ trait DeltaTypeWideningNestedFieldsTests { sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS " + "(s struct, m map, a array)") assert(readDeltaTable(tempPath).schema === new StructType() - .add("s", new StructType().add("a", ShortType)) - .add("m", MapType(IntegerType, IntegerType)) - .add("a", ArrayType(IntegerType))) + .add("s", new StructType() + .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "short") + .putString("fromType", "byte") + .putLong("tableVersion", 2) + .build() + )).build())) + .add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "byte") + .putLong("tableVersion", 2) + .putString("fieldPath", "key") + .build(), + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "short") + .putLong("tableVersion", 2) + .putString("fieldPath", "value") + .build() + )).build()) + .add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "integer") + .putString("fromType", "short") + .putLong("tableVersion", 2) + .putString("fieldPath", "element") + .build() + )).build())) append(Seq((5, 6, 7, 8)) .toDF("a", "b", "c", "d") @@ -373,6 +444,164 @@ trait DeltaTypeWideningNestedFieldsTests { } } +/** + * Tests related to recording type change information as metadata in the table schema. For + * lower-level tests, see [[DeltaTypeWideningMetadataSuite]]. + */ +trait DeltaTypeWideningMetadataTests { + self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + def testTypeWideningMetadata(name: String)( + initialSchema: String, + typeChanges: Seq[(String, String)], + expectedJsonSchema: String): Unit = + test(name) { + sql(s"CREATE TABLE delta.`$tempPath` ($initialSchema) USING DELTA") + typeChanges.foreach { case (fieldName, newType) => + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN $fieldName TYPE $newType") + } + + // Parse the schemas as JSON to ignore whitespaces and field order. + val actualSchema = JsonUtils.fromJson[Map[String, Any]](readDeltaTable(tempPath).schema.json) + val expectedSchema = JsonUtils.fromJson[Map[String, Any]](expectedJsonSchema) + assert(actualSchema === expectedSchema, + s"${readDeltaTable(tempPath).schema.prettyJson} did not equal $expectedJsonSchema" + ) + } + + testTypeWideningMetadata("change top-level column type short->int")( + initialSchema = "a short", + typeChanges = Seq("a" -> "int"), + expectedJsonSchema = + """{ + "type": "struct", + "fields": [{ + "name": "a", + "type": "integer", + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "integer", + "fromType": "short", + "tableVersion": 1 + }] + } + }]}""".stripMargin) + + testTypeWideningMetadata("change top-level column type twice byte->short->int")( + initialSchema = "a byte", + typeChanges = Seq("a" -> "short", "a" -> "int"), + expectedJsonSchema = + """{ + "type": "struct", + "fields": [{ + "name": "a", + "type": "integer", + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "short", + "fromType": "byte", + "tableVersion": 1 + },{ + "toType": "integer", + "fromType": "short", + "tableVersion": 2 + }] + } + }]}""".stripMargin) + + testTypeWideningMetadata("change type in map key and in struct in map value")( + initialSchema = "a map>", + typeChanges = Seq("a.key" -> "int", "a.value.b" -> "short"), + expectedJsonSchema = + """{ + "type": "struct", + "fields": [{ + "name": "a", + "type": { + "type": "map", + "keyType": "integer", + "valueType": { + "type": "struct", + "fields": [{ + "name": "b", + "type": "short", + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "short", + "fromType": "byte", + "tableVersion": 2 + }] + } + }] + }, + "valueContainsNull": true + }, + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "integer", + "fromType": "byte", + "tableVersion": 1, + "fieldPath": "key" + }] + } + } + ]}""".stripMargin) + + + testTypeWideningMetadata("change type in array and in struct in array")( + initialSchema = "a array, b array>", + typeChanges = Seq("a.element" -> "short", "b.element.c" -> "int"), + expectedJsonSchema = + """{ + "type": "struct", + "fields": [{ + "name": "a", + "type": { + "type": "array", + "elementType": "short", + "containsNull": true + }, + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "short", + "fromType": "byte", + "tableVersion": 1, + "fieldPath": "element" + }] + } + }, + { + "name": "b", + "type": { + "type": "array", + "elementType":{ + "type": "struct", + "fields": [{ + "name": "c", + "type": "integer", + "nullable": true, + "metadata": { + "delta.typeChanges": [{ + "toType": "integer", + "fromType": "short", + "tableVersion": 2 + }] + } + }] + }, + "containsNull": true + }, + "nullable": true, + "metadata": { } + } + ]}""".stripMargin) +} + trait DeltaTypeWideningTableFeatureTests { self: QueryTest with ParquetTest with DeltaDMLTestUtils with DeltaTypeWideningTestMixin with SharedSparkSession => From e2601a6e049f82f8e7fc68f3284d7b9efcffa54b Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 5 Mar 2024 11:28:10 +0100 Subject: [PATCH 2/7] Drop type widening table feature --- .../spark/sql/delta/ConflictChecker.scala | 4 +- .../sql/delta/DefaultRowCommitVersion.scala | 3 +- .../PreDowngradeTableFeatureCommand.scala | 74 ++++++- .../apache/spark/sql/delta/TableFeature.scala | 16 +- .../apache/spark/sql/delta/TypeWidening.scala | 29 ++- .../sql/delta/TypeWideningMetadata.scala | 63 +++++- .../commands/DeltaReorgTableCommand.scala | 24 ++- .../delta/commands/OptimizeTableCommand.scala | 2 +- .../DeltaTypeWideningMetadataSuite.scala | 59 ++++-- .../sql/delta/DeltaTypeWideningSuite.scala | 186 +++++++++++++++++- 10 files changed, 424 insertions(+), 36 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index 54d59677619..8f04ba1de0a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -559,7 +559,9 @@ private[delta] class ConflictChecker( * to handle the row tracking feature being enabled by the winning transaction. */ private def reassignRowCommitVersions(): Unit = { - if (!RowTracking.isSupported(currentTransactionInfo.protocol)) { + if (!RowTracking.isSupported(currentTransactionInfo.protocol) && + // Type widening relies on default row commit versions to be set. + !TypeWidening.isSupported(currentTransactionInfo.protocol)) { return } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala index 0cba4a05c09..e7046ee42fe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala @@ -24,7 +24,8 @@ object DefaultRowCommitVersion { protocol: Protocol, actions: Iterator[Action], version: Long): Iterator[Action] = { - if (!RowTracking.isSupported(protocol)) { + // Type Widening relies on default row commit versions to be set. + if (!RowTracking.isSupported(protocol) && !TypeWidening.isSupported(protocol)) { return actions } actions.map { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 9e43ac3e6d1..8f4de3d013a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand} +import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} @@ -126,3 +126,75 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2) true } } + +case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand + with DeltaLogging { + + /** + * Unset the type widening table property to prevent new type changes to be applied to the table, + * then removes traces of the feature: + * - Rewrite files that have columns or fields with a different type than in the current table + * schema. These are all files added or modified after the last type change. + * - Remove the type widening metadata attached to fields in the current table schema. + * + * @return Return true if files were rewritten or metadata was removed. False otherwise. + */ + override def removeFeatureTracesIfNeeded(): Boolean = { + if (TypeWideningTableFeature.validateRemoval(table.initialSnapshot)) return false + + val startTimeNs = System.nanoTime() + val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key) + AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + val numFilesRewritten = rewriteFilesIfNeeded() + val metadataRemoved = removeMetadataIfNeeded() + + recordDeltaEvent( + table.deltaLog, + opType = "delta.typeWideningFeatureRemovalMetrics", + data = Map( + "downgradeTimeMs" -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs), + "numFilesRewritten" -> numFilesRewritten, + "metadataRemoved" -> metadataRemoved + ) + ) + numFilesRewritten > 0 || metadataRemoved + } + + /** + * Rewrite files that have columns or fields with a different type than in the current table + * schema. These are all files added or modified after the last type change. + * @return Return the number of files rewritten. + */ + private def rewriteFilesIfNeeded(): Long = { + val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) + if (numFilesToRewrite == 0L) return 0L + + val reorg = DeltaReorgTableCommand( + table.toLogicalRelation, + DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) + )(Nil) + + reorg.run(table.spark) + numFilesToRewrite + } + + /** + * Remove the type widening metadata attached to fields in the current table schema. + * @return Return true if any metadata was removed. False otherwise. + */ + private def removeMetadataIfNeeded(): Boolean = { + if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) { + return false + } + + val txn = table.startTransaction() + val metadata = txn.metadata + val (cleanedSchema, changes) = + TypeWideningMetadata.removeTypeWideningMetadata(metadata.schema) + txn.commit( + metadata.copy(schemaString = cleanedSchema.json) :: Nil, + DeltaOperations.UpdateColumnMetadata("DROP FEATURE", changes)) + true + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 430c6f0e76c..5167efa22a6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -627,7 +627,8 @@ object ManagedCommitTableFeature } object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev") - with FeatureAutomaticallyEnabledByMetadata { + with FeatureAutomaticallyEnabledByMetadata + with RemovableFeature { override def automaticallyUpdateProtocolOfExistingTables: Boolean = true private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean = @@ -636,6 +637,19 @@ object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata) + + override def validateRemoval(snapshot: Snapshot): Boolean = + !isTypeWideningSupportNeededByMetadata(snapshot.metadata) && + !TypeWideningMetadata.containsTypeWideningMetadata(snapshot.metadata.schema) + + override def actionUsesFeature(action: Action): Boolean = + action match { + case m: Metadata => TypeWideningMetadata.containsTypeWideningMetadata(m.schema) + case _ => false + } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + TypeWideningPreDowngradeCommand(table) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index b3dd6b2b1cc..333af5a94d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -16,9 +16,10 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types._ object TypeWidening { @@ -60,4 +61,30 @@ object TypeWidening { case (ByteType | ShortType, IntegerType) => true case _ => false } + + /** + * Filter the given list of files to only keep files that were written before the latest type + * change and that then contain a column or field with a type that is different from the current + * table schema. + */ + def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = + TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match { + case Some(latestVersion) => + files.filter(_.defaultRowCommitVersion match { + case Some(version) => version < latestVersion + case None => false + }) + case None => + Seq.empty + } + + + + /** + * Return the number of files that were written before the latest type change and that then + * contain a column or field with a type that is different from the current able schema. + */ + def numFilesRequiringRewrite(snapshot: Snapshot): Long = { + filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 82accd624ac..55a5734ff29 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -16,7 +16,9 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import scala.collection.mutable + +import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.spark.sql.types._ @@ -156,12 +158,12 @@ private[delta] object TypeWideningMetadata { typeChange.copy(fieldPath = "element" +: typeChange.fieldPath) } case (fromType: AtomicType, toType: AtomicType) if fromType != toType => - Seq(TypeChange( - version, - fromType, - toType, - fieldPath = Seq.empty - )) + Seq(TypeChange( + version, + fromType, + toType, + fieldPath = Seq.empty + )) case (_: AtomicType, _: AtomicType) => Seq.empty // Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct // fields instead to only collect type changes inside these fields. @@ -192,4 +194,51 @@ private[delta] object TypeWideningMetadata { case None => field } } + + /** + * Remove the type widening metadata from all the fields in the given schema. + * Return the cleaned schema and a list of fields with their path that had type widening metadata. + */ + def removeTypeWideningMetadata(schema: StructType) + : (StructType, Seq[(Seq[String], StructField)]) = { + if (!containsTypeWideningMetadata(schema)) return (schema, Seq.empty) + + val changes = mutable.Buffer.empty[(Seq[String], StructField)] + val newSchema = SchemaMergingUtils.transformColumns(schema) { + case (fieldPath: Seq[String], field: StructField, _) + if field.metadata.contains(TYPE_CHANGES_METADATA_KEY) => + changes.append((fieldPath, field)) + val cleanMetadata = new MetadataBuilder() + .withMetadata(field.metadata) + .remove(TYPE_CHANGES_METADATA_KEY) + .build() + field.copy(metadata = cleanMetadata) + case (_, field: StructField, _) => field + } + newSchema -> changes.toSeq + } + + /** Check whether any struct field in the schema contains type widening metadata. */ + def containsTypeWideningMetadata(schema: StructType): Boolean = + schema.existsRecursively { + case s: StructType => s.exists(_.metadata.contains(TYPE_CHANGES_METADATA_KEY)) + case _ => false + } + + /** Return the version of the latest type change recorded in the schema metadata */ + def getLatestTypeChangeVersion(schema: StructType): Option[Long] = { + val fields = + SchemaUtils.filterRecursively(schema, checkComplexTypes = true) { + _.metadata.contains(TypeWideningMetadata.TYPE_CHANGES_METADATA_KEY) + }.map(_._2) + + val versions = fields.flatMap { field => + field.metadata + .getMetadataArray(TypeWideningMetadata.TYPE_CHANGES_METADATA_KEY) + .map { typeChange => + typeChange.getLong(TypeChange.TABLE_VERSION_METADATA_KEY) + } + } + if (versions.nonEmpty) Some(versions.max) else None + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index c21efc5ddf0..a71987ed70c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -16,13 +16,14 @@ package org.apache.spark.sql.delta.commands +import org.apache.spark.sql.delta.{Snapshot, TypeWidening} import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand} object DeltaReorgTableMode extends Enumeration { - val PURGE, UNIFORM_ICEBERG = Value + val PURGE, UNIFORM_ICEBERG, REWRITE_TYPE_WIDENING = Value } case class DeltaReorgTableSpec( @@ -70,11 +71,13 @@ case class DeltaReorgTableCommand( } override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match { - case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) => + case DeltaReorgTableSpec( + DeltaReorgTableMode.PURGE | DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => optimizeByReorg(sparkSession) case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => val table = getDeltaTable(target, "REORG") upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion) + } protected def reorgOperation: DeltaReorgOperation = reorgTableSpec match { @@ -82,6 +85,8 @@ case class DeltaReorgTableCommand( new DeltaPurgeOperation() case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => new DeltaUpgradeUniformOperation(icebergCompatVersion) + case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => + new DeltaRewriteTypeWideningOperation() } } @@ -93,14 +98,14 @@ sealed trait DeltaReorgOperation { * Collects files that need to be processed by the reorg operation from the list of candidate * files. */ - def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] + def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] } /** * Reorg operation to purge files with soft deleted rows. */ class DeltaPurgeOperation extends DeltaReorgOperation { - override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = files.filter { file => (file.deletionVector != null && file.numPhysicalRecords.isEmpty) || file.numDeletedRecords > 0L @@ -111,7 +116,7 @@ class DeltaPurgeOperation extends DeltaReorgOperation { * Reorg operation to upgrade the iceberg compatibility version of a table. */ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation { - override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = { + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = { def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = { if (file.tags == null) return true val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0") @@ -120,3 +125,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg files.filter(shouldRewriteToBeIcebergCompatible) } } + +/** + * Internal reorg operation to rewrite files to conform to the current table schema when dropping + * the type widening table feature. + */ +class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation { + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = + TypeWidening.filterFilesRequiringRewrite(snapshot, files) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 5e474423716..a894f914236 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -264,7 +264,7 @@ class OptimizeExecutor( val partitionSchema = txn.metadata.partitionSchema val filesToProcess = optimizeContext.reorg match { - case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles) + case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles) case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala index 24dc88ed03c..ef1b9835a81 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala @@ -220,7 +220,7 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )) } - test("addTypeWideningMetadata with no type changes") { + test("addTypeWideningMetadata/removeTypeWideningMetadata with no type changes") { for { (oldSchema, newSchema) <- Seq( ("a short", "a short"), @@ -240,19 +240,21 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest withClue(s"oldSchema = $oldSchema, newSchema = $newSchema") { val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) assert(schema === newSchema) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === schema -> Seq.empty) } } } - test("addTypeWideningMetadata on top-level fields") { - var schema = + test("addTypeWideningMetadata/removeTypeWideningMetadata on top-level fields") { + val schemaWithoutMetadata = StructType.fromDDL("i long, d decimal(15, 4), a array, m map") val firstOldSchema = StructType.fromDDL("i short, d decimal(6, 2), a array, m map") val secondOldSchema = StructType.fromDDL("i int, d decimal(10, 4), a array, m map") - schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + var schema = + TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) assert(schema("i") === StructField("i", LongType, metadata = new MetadataBuilder() @@ -282,6 +284,13 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq.empty -> schema("i"), + Seq.empty -> schema("d"), + Seq.empty -> schema("a"), + Seq.empty -> schema("m") + )) // Second type change on all fields. schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) @@ -316,10 +325,18 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "byte", "integer", "value") )).build() )) + + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq.empty -> schema("i"), + Seq.empty -> schema("d"), + Seq.empty -> schema("a"), + Seq.empty -> schema("m") + )) } - test("addTypeWideningMetadata on nested fields") { - var schema = StructType.fromDDL( + test("addTypeWideningMetadata/removeTypeWideningMetadata on nested fields") { + val schemaWithoutMetadata = StructType.fromDDL( "s struct>, m: map, array>>") val firstOldSchema = StructType.fromDDL( "s struct>, m: map, array>>") @@ -327,7 +344,8 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest "s struct>, m: map, array>>") // First type change on all struct fields. - schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + var schema = + TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) var struct = schema("s").dataType.asInstanceOf[StructType] assert(struct("i") === StructField("i", LongType, @@ -352,6 +370,13 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq("s") -> struct("i"), + Seq("s") -> struct("a"), + Seq("s") -> struct("m") + )) + // Second type change on all struct fields. schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) struct = schema("s").dataType.asInstanceOf[StructType] @@ -380,9 +405,15 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "integer", "long", "value.element") )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq("s") -> struct("i"), + Seq("s") -> struct("a"), + Seq("s") -> struct("m") + )) } - test("addTypeWideningMetadata with added and removed fields") { + test("addTypeWideningMetadata/removeTypeWideningMetadata with added and removed fields") { val newSchema = StructType.fromDDL("a int, b long, d int") val oldSchema = StructType.fromDDL("a int, b int, c int") @@ -397,13 +428,16 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "integer", "long") )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + newSchema -> Seq(Seq.empty -> schema("b")) + ) } - test("addTypeWideningMetadata with different field position") { - val initialSchema = StructType.fromDDL("a short, b int, s struct") - val secondSchema = StructType.fromDDL("b int, a short, s struct") + test("addTypeWideningMetadata/removeTypeWideningMetadata with different field position") { + val newSchema = StructType.fromDDL("a short, b int, s struct") + val oldSchema = StructType.fromDDL("b int, a short, s struct") - val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, initialSchema, secondSchema) + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) // No type widening metadata is added. assert(schema("a") === StructField("a", ShortType)) assert(schema("b") === StructField("b", IntegerType)) @@ -411,6 +445,7 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest StructField("s", new StructType() .add("c", IntegerType) .add("d", LongType))) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === newSchema -> Seq.empty) } test("updateTypeChangeVersion with no type changes") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index fca8e3e046f..c3db22b86ed 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -16,8 +16,15 @@ package org.apache.spark.sql.delta +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} @@ -27,6 +34,7 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.util.ManualClock /** * Suite covering the type widening table feature. @@ -602,20 +610,98 @@ trait DeltaTypeWideningMetadataTests { ]}""".stripMargin) } -trait DeltaTypeWideningTableFeatureTests { - self: QueryTest with ParquetTest with DeltaDMLTestUtils with DeltaTypeWideningTestMixin - with SharedSparkSession => +/** + * Tests covering adding and removing the type widening table feature. Dropping the table feature + * also includes rewriting data files with the old type and removing type widening metadata. + */ +trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { + self: QueryTest + with ParquetTest + with DeltaDMLTestUtils + with DeltaTypeWideningTestMixin => + + /** Clock used to advance past the retention period when dropping the table feature. */ + var clock: ManualClock = _ + + override protected def beforeEach(): Unit = { + super.beforeEach() + clock = new ManualClock(System.currentTimeMillis()) + // Override the (cached) delta log with one using our manual clock. + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, new Path(tempPath), clock) + } def isTypeWideningSupported: Boolean = { - val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot - TypeWidening.isSupported(snapshot.protocol) + TypeWidening.isSupported(deltaLog.update().protocol) } def isTypeWideningEnabled: Boolean = { - val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot + val snapshot = deltaLog.update() TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) } + /** Expected outcome of dropping the type widening table feature. */ + object ExpectedOutcome extends Enumeration { + val SUCCESS, FAIL_CURRENT_VERSION_USES_FEATURE, FAIL_HISTORICAL_VERSION_USES_FEATURE = Value + } + + /** Helper method to drop the type widening table feature and check for an expected outcome. */ + def dropTableFeature(expectedOutcome: ExpectedOutcome.Value): Unit = { + // Need to directly call ALTER TABLE command to pass our deltaLog with manual clock. + val dropFeature = AlterTableDropFeatureDeltaCommand( + DeltaTableV2(spark, deltaLog.dataPath), + TypeWideningTableFeature.name) + + expectedOutcome match { + case ExpectedOutcome.SUCCESS => + dropFeature.run(spark) + case ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + case ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + } + } + + /** + * Use this after dropping the table feature to artificially move the current time to after + * the table retention period. + */ + def advancePastRetentionPeriod(): Unit = { + clock.advance( + deltaLog.deltaRetentionMillis(deltaLog.update().metadata) + + TimeUnit.MINUTES.toMillis(5)) + } + + /** Get the number of AddFile actions committed since the given table version (included). */ + def getNumAddFilesSinceVersion(version: Long): Long = + deltaLog + .getChanges(startVersion = version) + .flatMap { case (_, actions) => actions } + .collect { case a: AddFile => a } + .size + test("enable type widening at table creation then disable it") { sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") @@ -711,4 +797,92 @@ trait DeltaTypeWideningTableFeatureTests { enableTypeWidening(tempPath, enabled = false) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") } + + test("drop unused table feature on empty table") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version = 0) === 0) + checkAnswer(readDeltaTable(tempPath), Seq.empty) + } + + test("drop unused table feature on table with data") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test("drop table feature on table with data added only after type change") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + val version = deltaLog.update().version + + // We could actually drop the table feature directly here instead of failing by checking that + // there were no files added before the type change. This may be an expensive check for a rare + // scenario so we don't do it. + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test("drop table feature on table with data added before type change") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test("drop table feature on table with data added before type change and fully rewritten after") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + sql(s"UPDATE delta.`$tempDir` SET a = a + 10") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // The file was already rewritten in UPDATE. + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13))) + } + + test("drop table feature on table with data added before type change and partially rewritten " + + "after") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"INSERT INTO delta.`$tempDir` VALUES (4), (5), (6)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + sql(s"UPDATE delta.`$tempDir` SET a = a + 10 WHERE a < 4") + assert(getNumAddFilesSinceVersion(version = 0) === 3) + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // One file was already rewritten in UPDATE, leaving 1 file to rewrite. + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13), Row(4), Row(5), Row(6))) + } } From 4c50a1ace8f94e141390930eb3423a2a594b6fe4 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 5 Mar 2024 17:25:29 +0100 Subject: [PATCH 3/7] Address comments --- .../apache/spark/sql/delta/TypeWidening.scala | 10 +- .../sql/delta/TypeWideningMetadata.scala | 24 +- .../sql/delta/DeltaTypeWideningSuite.scala | 218 ++++++++++++------ 3 files changed, 161 insertions(+), 91 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 333af5a94d5..fdf3261ca2b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -64,15 +64,19 @@ object TypeWidening { /** * Filter the given list of files to only keep files that were written before the latest type - * change and that then contain a column or field with a type that is different from the current - * table schema. + * change, if any. These older files contain a column or field with a type that is different than + * in the current table schema and must be rewritten when dropping the type widening table feature + * to make the table readable by readers that don't support the feature. */ def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match { case Some(latestVersion) => files.filter(_.defaultRowCommitVersion match { case Some(version) => version < latestVersion - case None => false + // Files written before the type widening table feature was added to the table don't + // have a defaultRowCommitVersion. That does mean they were written before the latest + // type change. + case None => true }) case None => Seq.empty diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 55a5734ff29..87905035301 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -218,7 +218,7 @@ private[delta] object TypeWideningMetadata { newSchema -> changes.toSeq } - /** Check whether any struct field in the schema contains type widening metadata. */ + /** Recursively checks whether any struct field in the schema contains type widening metadata. */ def containsTypeWideningMetadata(schema: StructType): Boolean = schema.existsRecursively { case s: StructType => s.exists(_.metadata.contains(TYPE_CHANGES_METADATA_KEY)) @@ -227,18 +227,16 @@ private[delta] object TypeWideningMetadata { /** Return the version of the latest type change recorded in the schema metadata */ def getLatestTypeChangeVersion(schema: StructType): Option[Long] = { - val fields = - SchemaUtils.filterRecursively(schema, checkComplexTypes = true) { - _.metadata.contains(TypeWideningMetadata.TYPE_CHANGES_METADATA_KEY) - }.map(_._2) - - val versions = fields.flatMap { field => - field.metadata - .getMetadataArray(TypeWideningMetadata.TYPE_CHANGES_METADATA_KEY) - .map { typeChange => - typeChange.getLong(TypeChange.TABLE_VERSION_METADATA_KEY) - } - } + val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) { + _ => true + }.map(_._2) + + // Collect all type change versions from all struct fields. + val versions = allStructFields + .flatMap(TypeWideningMetadata.fromField) + .flatMap(_.typeChanges) + .map(_.version) + if (versions.nonEmpty) Some(versions.max) else None } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index c3db22b86ed..fb9c63554dd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit + import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand +import org.apache.spark.sql.delta.rowtracking.RowTrackingTestUtils import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.Path @@ -43,6 +45,7 @@ class DeltaTypeWideningSuite extends QueryTest with ParquetTest with DeltaDMLTestUtils + with RowTrackingTestUtils with DeltaSQLCommandTest with DeltaTypeWideningTestMixin with DeltaTypeWideningAlterTableTests @@ -618,6 +621,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { self: QueryTest with ParquetTest with DeltaDMLTestUtils + with RowTrackingTestUtils with DeltaTypeWideningTestMixin => /** Clock used to advance past the retention period when dropping the table feature. */ @@ -805,84 +809,148 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { checkAnswer(readDeltaTable(tempPath), Seq.empty) } - test("drop unused table feature on table with data") { - sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") - assert(getNumAddFilesSinceVersion(version = 0) === 1) - val version = deltaLog.update().version - dropTableFeature(ExpectedOutcome.SUCCESS) - assert(getNumAddFilesSinceVersion(version + 1) === 0) - checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) - } + // Rewriting the data when dropping the table feature relies on the default row commit version + // being set even when row tracking isn't enabled. + for(rowTrackingEnabled <- BOOLEAN_DOMAIN) { + test(s"drop unused table feature on table with data, rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } - test("drop table feature on table with data added only after type change") { - sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") - sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") - assert(getNumAddFilesSinceVersion(version = 0) === 1) - val version = deltaLog.update().version - - // We could actually drop the table feature directly here instead of failing by checking that - // there were no files added before the type change. This may be an expensive check for a rare - // scenario so we don't do it. - dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) - assert(getNumAddFilesSinceVersion(version + 1) === 0) - assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) - dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) - advancePastRetentionPeriod() - dropTableFeature(ExpectedOutcome.SUCCESS) - checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) - } + test(s"drop unused table feature on table with data inserted before adding the table feature," + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + enableTypeWidening(tempPath) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } - test("drop table feature on table with data added before type change") { - sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") - sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") - assert(getNumAddFilesSinceVersion(version = 0) === 1) - val version = deltaLog.update().version - dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) - assert(getNumAddFilesSinceVersion(version + 1) === 1) - assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) - dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) - advancePastRetentionPeriod() - dropTableFeature(ExpectedOutcome.SUCCESS) - checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) - } + test(s"drop table feature on table with data added only after type change, " + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + // We could actually drop the table feature directly here instead of failing by checking that + // there were no files added before the type change. This may be an expensive check for a rare + // scenario so we don't do it. + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + advancePastRetentionPeriod() + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } - test("drop table feature on table with data added before type change and fully rewritten after") { - sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") - sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") - sql(s"UPDATE delta.`$tempDir` SET a = a + 10") - assert(getNumAddFilesSinceVersion(version = 0) === 2) - val version = deltaLog.update().version - dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) - // The file was already rewritten in UPDATE. - assert(getNumAddFilesSinceVersion(version + 1) === 0) - assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) - dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) - advancePastRetentionPeriod() - dropTableFeature(ExpectedOutcome.SUCCESS) - checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13))) - } + test(s"drop table feature on table with data added before type change, " + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } - test("drop table feature on table with data added before type change and partially rewritten " + - "after") { - sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") - sql(s"INSERT INTO delta.`$tempDir` VALUES (4), (5), (6)") - sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") - assert(getNumAddFilesSinceVersion(version = 0) === 2) - sql(s"UPDATE delta.`$tempDir` SET a = a + 10 WHERE a < 4") - assert(getNumAddFilesSinceVersion(version = 0) === 3) - val version = deltaLog.update().version - dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) - // One file was already rewritten in UPDATE, leaving 1 file to rewrite. - assert(getNumAddFilesSinceVersion(version + 1) === 1) - assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) - dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) - advancePastRetentionPeriod() - dropTableFeature(ExpectedOutcome.SUCCESS) - checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13), Row(4), Row(5), Row(6))) + test(s"drop table feature on table with data added before type change and fully rewritten " + + s"after, rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + sql(s"UPDATE delta.`$tempDir` SET a = a + 10") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // The file was already rewritten in UPDATE. + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13))) + } + + test(s"drop table feature on table with data added before type change and partially " + + s"rewritten after, rowTrackingEnabled=$rowTrackingEnabled") { + withRowTrackingEnabled(rowTrackingEnabled) { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + sql(s"INSERT INTO delta.`$tempDir` VALUES (4), (5), (6)") + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + sql(s"UPDATE delta.`$tempDir` SET a = a + 10 WHERE a < 4") + assert(getNumAddFilesSinceVersion(version = 0) === 3) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // One file was already rewritten in UPDATE, leaving 1 file to rewrite. + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer( + readDeltaTable(tempPath), + Seq(Row(11), Row(12), Row(13), Row(4), Row(5), Row(6))) + } + } } } From ecc188a884461a604a1acb9e2a6858f4b9a5679e Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 6 Mar 2024 12:16:42 +0100 Subject: [PATCH 4/7] Fix reorg command + force writing a single file in tests --- .../PreDowngradeTableFeatureCommand.scala | 16 +++++++++++++- .../sql/delta/DeltaTypeWideningSuite.scala | 21 ++++++++++++------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 8f4de3d013a..373bbfcc5b0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit +import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -170,8 +171,21 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) if (numFilesToRewrite == 0L) return 0L + // Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg + // command. + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val tableId = table.spark + .sessionState + .sqlParser + .parseTableIdentifier(table.name).nameParts.asIdentifier + val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val reorg = DeltaReorgTableCommand( - table.toLogicalRelation, + ResolvedTable.create( + catalog, + tableId, + table + ), DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) )(Nil) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index fb9c63554dd..292e3821586 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.datasources.parquet.ParquetTest -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -624,6 +624,8 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { with RowTrackingTestUtils with DeltaTypeWideningTestMixin => + import testImplicits._ + /** Clock used to advance past the retention period when dropping the table feature. */ var clock: ManualClock = _ @@ -698,6 +700,9 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { TimeUnit.MINUTES.toMillis(5)) } + def addSingleFile[T: Encoder](values: Seq[T], dataType: DataType): Unit = + append(values.toDF("a").select(col("a").cast(dataType)).repartition(1)) + /** Get the number of AddFile actions committed since the given table version (included). */ def getNumAddFilesSinceVersion(version: Long): Long = deltaLog @@ -814,7 +819,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { for(rowTrackingEnabled <- BOOLEAN_DOMAIN) { test(s"drop unused table feature on table with data, rowTrackingEnabled=$rowTrackingEnabled") { sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + addSingleFile(Seq(1, 2, 3), ByteType) assert(getNumAddFilesSinceVersion(version = 0) === 1) val version = deltaLog.update().version @@ -827,7 +832,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { s"rowTrackingEnabled=$rowTrackingEnabled") { sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA " + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") - sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + addSingleFile(Seq(1, 2, 3), ByteType) enableTypeWidening(tempPath) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") assert(getNumAddFilesSinceVersion(version = 0) === 1) @@ -852,7 +857,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { s"rowTrackingEnabled=$rowTrackingEnabled") { sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") - sql(s"INSERT INTO delta.`$tempPath` VALUES (1), (2), (3)") + addSingleFile(Seq(1, 2, 3), IntegerType) assert(getNumAddFilesSinceVersion(version = 0) === 1) // We could actually drop the table feature directly here instead of failing by checking that @@ -877,7 +882,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { test(s"drop table feature on table with data added before type change, " + s"rowTrackingEnabled=$rowTrackingEnabled") { sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + addSingleFile(Seq(1, 2, 3), ByteType) sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") assert(getNumAddFilesSinceVersion(version = 0) === 1) @@ -900,7 +905,7 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { test(s"drop table feature on table with data added before type change and fully rewritten " + s"after, rowTrackingEnabled=$rowTrackingEnabled") { sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") + addSingleFile(Seq(1, 2, 3), ByteType) sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") sql(s"UPDATE delta.`$tempDir` SET a = a + 10") assert(getNumAddFilesSinceVersion(version = 0) === 2) @@ -926,8 +931,8 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { s"rewritten after, rowTrackingEnabled=$rowTrackingEnabled") { withRowTrackingEnabled(rowTrackingEnabled) { sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") - sql(s"INSERT INTO delta.`$tempDir` VALUES (1), (2), (3)") - sql(s"INSERT INTO delta.`$tempDir` VALUES (4), (5), (6)") + addSingleFile(Seq(1, 2, 3), ByteType) + addSingleFile(Seq(4, 5, 6), ByteType) sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") assert(getNumAddFilesSinceVersion(version = 0) === 2) sql(s"UPDATE delta.`$tempDir` SET a = a + 10 WHERE a < 4") From 1efa91f0dd4bf016fdc785b296d8c0aaaa4fc911 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 14 Mar 2024 16:17:09 +0100 Subject: [PATCH 5/7] Formatting --- .../spark/sql/delta/PreDowngradeTableFeatureCommand.scala | 3 ++- .../main/scala/org/apache/spark/sql/delta/TypeWidening.scala | 1 - .../spark/sql/delta/commands/DeltaReorgTableCommand.scala | 4 ++-- .../org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala | 1 - 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 373bbfcc5b0..7cd3321512e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit -import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} +import org.apache.spark.sql.catalyst.analysis.ResolvedTable + /** * A base class for implementing a preparation command for removing table features. * Must implement a run method. Note, the run method must be implemented in a way that when diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index fdf3261ca2b..2efbdf11e47 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -83,7 +83,6 @@ object TypeWidening { } - /** * Return the number of files that were written before the latest type change and that then * contain a column or field with a type that is different from the current able schema. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index a71987ed70c..77113c3906f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -85,8 +85,8 @@ case class DeltaReorgTableCommand( new DeltaPurgeOperation() case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => new DeltaUpgradeUniformOperation(icebergCompatVersion) - case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => - new DeltaRewriteTypeWideningOperation() + case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => + new DeltaRewriteTypeWideningOperation() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 292e3821586..9fb52146ee2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit - import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand From 2c77a57fdb2fa1d4d1ac85172f04a61b43a18eab Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 14 Mar 2024 17:18:54 +0100 Subject: [PATCH 6/7] nit: formatting --- .../apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index 77113c3906f..f78ef60ae29 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -77,7 +77,6 @@ case class DeltaReorgTableCommand( case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => val table = getDeltaTable(target, "REORG") upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion) - } protected def reorgOperation: DeltaReorgOperation = reorgTableSpec match { From dcca709ae7551623464914a82432395c533e6b46 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 15 Mar 2024 10:12:00 +0100 Subject: [PATCH 7/7] nit: formatting --- .../spark/sql/delta/PreDowngradeTableFeatureCommand.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 7cd3321512e..97c7ab60e9d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -137,7 +137,7 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) * Unset the type widening table property to prevent new type changes to be applied to the table, * then removes traces of the feature: * - Rewrite files that have columns or fields with a different type than in the current table - * schema. These are all files added or modified after the last type change. + * schema. These are all files not added or modified after the last type change. * - Remove the type widening metadata attached to fields in the current table schema. * * @return Return true if files were rewritten or metadata was removed. False otherwise. @@ -165,7 +165,7 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) /** * Rewrite files that have columns or fields with a different type than in the current table - * schema. These are all files added or modified after the last type change. + * schema. These are all files not added or modified after the last type change. * @return Return the number of files rewritten. */ private def rewriteFilesIfNeeded(): Long = {