From 7103115962ab795272d9a259b0c069c277777939 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Thu, 12 May 2022 15:04:28 -0700 Subject: [PATCH] Change Data Feed - PR 2 - DELETE command See the project plan at(https://github.com/delta-io/delta/issues/1105). This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns. Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log. Closes delta-io/delta#1125. GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e --- .../spark/sql/delta/DeltaOperations.scala | 2 + .../sql/delta/OptimisticTransaction.scala | 30 +++- .../sql/delta/commands/DeleteCommand.scala | 89 +++++++++--- .../sql/delta/files/TransactionalWrite.scala | 37 ++++- .../spark/sql/delta/schema/SchemaUtils.scala | 12 ++ .../spark/sql/delta/cdc/DeleteCDCSuite.scala | 128 ++++++++++++++++++ .../sql/delta/schema/SchemaUtilsSuite.scala | 10 ++ 7 files changed, 272 insertions(+), 36 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/cdc/DeleteCDCSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 6baabe414d7..f180ec5ca4e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -397,6 +397,7 @@ private[delta] object DeltaOperationMetrics { val DELETE = Set( "numAddedFiles", // number of files added "numRemovedFiles", // number of files removed + "numAddedChangeFiles", // number of CDC files "numDeletedRows", // number of rows removed "numCopiedRows", // number of rows copied in the process of deleting files "executionTimeMs", // time taken to execute the entire operation @@ -410,6 +411,7 @@ private[delta] object DeltaOperationMetrics { */ val DELETE_PARTITIONS = Set( "numRemovedFiles", // number of files removed + "numAddedChangeFiles", // number of CDC files generated - generally 0 in this case "executionTimeMs", // time taken to execute the entire operation "scanTimeMs", // time taken to scan the files for matches "rewriteTimeMs" // time taken to rewrite the matched files diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index cd9652f9db5..260aa3a7c7b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -910,14 +910,30 @@ trait OptimisticTransactionImpl extends TransactionalWrite } // Post stats + // Here, we efficiently calculate various stats (number of each different action, number of + // bytes per action, etc.) by iterating over all actions, case matching by type, and updating + // variables. This is more efficient than a functional approach. var numAbsolutePaths = 0 val distinctPartitions = new mutable.HashSet[Map[String, String]] - val adds = actions.collect { + var bytesNew: Long = 0L + var numAdd: Int = 0 + var numRemove: Int = 0 + var numCdcFiles: Int = 0 + var cdcBytesNew: Long = 0L + actions.foreach { case a: AddFile => + numAdd += 1 if (a.pathAsUri.isAbsolute) numAbsolutePaths += 1 distinctPartitions += a.partitionValues - a + if (a.dataChange) bytesNew += a.size + case r: RemoveFile => + numRemove += 1 + case c: AddCDCFile => + numCdcFiles += 1 + cdcBytesNew += c.size + case _ => } + val needsCheckpoint = shouldCheckpoint(attemptVersion) val stats = CommitStats( startVersion = snapshot.version, @@ -928,13 +944,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite fsWriteDurationMs = NANOSECONDS.toMillis(commitEndNano - fsWriteStartNano), stateReconstructionDurationMs = NANOSECONDS.toMillis(postCommitReconstructionTime - commitEndNano), - numAdd = adds.size, - numRemove = actions.collect { case r: RemoveFile => r }.size, - bytesNew = adds.filter(_.dataChange).map(_.size).sum, + numAdd = numAdd, + numRemove = numRemove, + bytesNew = bytesNew, numFilesTotal = postCommitSnapshot.numOfFiles, sizeInBytesTotal = postCommitSnapshot.sizeInBytes, - numCdcFiles = 0, - cdcBytesNew = 0, + numCdcFiles = numCdcFiles, + cdcBytesNew = cdcBytesNew, protocol = postCommitSnapshot.protocol, commitSizeBytes = jsonActions.map(_.size).sum, checkpointSizeBytes = postCommitSnapshot.checkpointSizeInBytes(), diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index b6d4b2d61cd..5fd43ad1aab 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -17,22 +17,22 @@ package org.apache.spark.sql.delta.commands import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction} import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues import org.apache.spark.sql.delta.files.TahoeBatchFileIndex import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.SparkContext -import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, InputFileName, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, If, InputFileName, Literal, Not} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric -import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.functions.{lit, typedLit, udf} trait DeleteCommandMetrics { self: LeafRunnableCommand => @transient private lazy val sc: SparkContext = SparkContext.getOrCreate() @@ -53,7 +53,10 @@ trait DeleteCommandMetrics { self: LeafRunnableCommand => "numBytesRemoved" -> createMetric(sc, "number of bytes removed"), "executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"), "scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"), - "rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files") + "rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files"), + "numAddedChangeFiles" -> createMetric(sc, "number of change data capture files generated"), + "changeFileBytes" -> createMetric(sc, "total size of change data capture files generated"), + "numTouchedRows" -> createMetric(sc, "number of rows touched") ) } @@ -100,9 +103,11 @@ case class DeleteCommand( var numRemovedFiles: Long = 0 var numAddedFiles: Long = 0 + var numAddedChangeFiles: Long = 0 var scanTimeMs: Long = 0 var rewriteTimeMs: Long = 0 var numBytesAdded: Long = 0 + var changeFileBytes: Long = 0 var numBytesRemoved: Long = 0 var numFilesBeforeSkipping: Long = 0 var numBytesBeforeSkipping: Long = 0 @@ -219,20 +224,12 @@ case class DeleteCommand( // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) - val copiedRowCount = metrics("numCopiedRows") - val copiedRowUdf = udf { () => - copiedRowCount += 1 - true - }.asNondeterministic() val targetDF = Dataset.ofRows(sparkSession, newTarget) val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral)) - val updatedDF = targetDF.filter(new Column(filterCond)) - .filter(copiedRowUdf()) - - val rewrittenFiles = withStatusCode( - "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") { - txn.writeFiles(updatedDF) - } + val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length) + val (changeFiles, rewrittenFiles) = rewrittenActions + .partition(_.isInstanceOf[AddCDCFile]) + numAddedFiles = rewrittenFiles.size val removedFiles = filesToRewrite.map(f => getTouchedFile(deltaLog.dataPath, f, nameToAddFileMap)) val (removedBytes, removedPartitions) = @@ -245,14 +242,15 @@ case class DeleteCommand( numPartitionsRemovedFrom = Some(removedPartitions) numPartitionsAddedTo = Some(rewrittenPartitions) } - numAddedFiles = rewrittenFiles.size + numAddedChangeFiles = changeFiles.size + changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs numDeletedRows = Some(metrics("numDeletedRows").value) - numCopiedRows = Some(metrics("numCopiedRows").value) + numCopiedRows = Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value) val operationTimestamp = System.currentTimeMillis() removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ - rewrittenFiles + rewrittenActions } } } @@ -262,6 +260,8 @@ case class DeleteCommand( metrics("executionTimeMs").set(executionTimeMs) metrics("scanTimeMs").set(scanTimeMs) metrics("rewriteTimeMs").set(rewriteTimeMs) + metrics("numAddedChangeFiles").set(numAddedChangeFiles) + metrics("changeFileBytes").set(changeFileBytes) metrics("numBytesAdded").set(numBytesAdded) metrics("numBytesRemoved").set(numBytesRemoved) metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping) @@ -287,7 +287,7 @@ case class DeleteCommand( numAddedFiles, numRemovedFiles, numAddedFiles, - numAddedChangeFiles = 0, + numAddedChangeFiles = numAddedChangeFiles, numFilesBeforeSkipping, numBytesBeforeSkipping, numFilesAfterSkipping, @@ -299,13 +299,58 @@ case class DeleteCommand( numDeletedRows, numBytesAdded, numBytesRemoved, - changeFileBytes = 0, + changeFileBytes = changeFileBytes, scanTimeMs, rewriteTimeMs) ) deleteActions } + + /** + * Returns the list of [[AddFile]]s and [[AddCDCFile]]s that have been re-written. + */ + private def rewriteFiles( + txn: OptimisticTransaction, + baseData: DataFrame, + filterCondition: Expression, + numFilesToRewrite: Long): Seq[FileAction] = { + val shouldWriteCdc = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(txn.metadata) + + // number of total rows that we have seen / are either copying or deleting (sum of both). + val numTouchedRows = metrics("numTouchedRows") + val numTouchedRowsUdf = udf { () => + numTouchedRows += 1 + true + }.asNondeterministic() + + withStatusCode( + "DELTA", s"Rewriting $numFilesToRewrite files for DELETE operation") { + val dfToWrite = if (shouldWriteCdc) { + import org.apache.spark.sql.delta.commands.cdc.CDCReader._ + // The logic here ends up being surprisingly elegant, with all source rows ending up in + // the output. Recall that we flipped the user-provided delete condition earlier, before the + // call to `rewriteFiles`. All rows which match this latest `filterCondition` are retained + // as table data, while all rows which don't match are removed from the rewritten table data + // but do get included in the output as CDC events. + baseData + .filter(numTouchedRowsUdf()) + .withColumn( + CDC_TYPE_COLUMN_NAME, + new Column( + If(filterCondition, typedLit[String](CDC_TYPE_NOT_CDC).expr, + lit(CDC_TYPE_DELETE).expr) + ) + ) + } else { + baseData + .filter(numTouchedRowsUdf()) + .filter(new Column(filterCondition)) + } + + txn.writeFiles(dfToWrite) + } + } } object DeleteCommand { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index ced53dd49e3..0f794c5d2dd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema._ @@ -31,13 +32,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} -import org.apache.spark.sql.functions.to_json -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.functions.{col, to_json} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration /** @@ -215,18 +216,40 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl writeFiles(data, Nil) } + /** + * Returns a tuple of (data, partition schema). For CDC writes, a `__is_cdc` column is added to + * the data and `__is_cdc=true/false` is added to the front of the partition schema. + */ + protected def performCDCPartition(inputData: Dataset[_]): (DataFrame, StructType) = { + // If this is a CDC write, we need to generate the CDC_PARTITION_COL in order to properly + // dispatch rows between the main table and CDC event records. This is a virtual partition + // and will be stripped out later in [[DelayedCommitProtocolEdge]]. + // Note that the ordering of the partition schema is relevant - CDC_PARTITION_COL must + // come first in order to ensure CDC data lands in the right place. + if (CDCReader.isCDCEnabledOnTable(metadata) && + inputData.schema.fieldNames.contains(CDCReader.CDC_TYPE_COLUMN_NAME)) { + val augmentedData = inputData.withColumn( + CDCReader.CDC_PARTITION_COL, col(CDCReader.CDC_TYPE_COLUMN_NAME).isNotNull) + val partitionSchema = StructType( + StructField(CDCReader.CDC_PARTITION_COL, StringType) +: metadata.physicalPartitionSchema) + (augmentedData, partitionSchema) + } else { + (inputData.toDF(), metadata.physicalPartitionSchema) + } + } + /** * Writes out the dataframe after performing schema validation. Returns a list of * actions to append these files to the reservoir. */ def writeFiles( - data: Dataset[_], + inputData: Dataset[_], writeOptions: Option[DeltaOptions], additionalConstraints: Seq[Constraint]): Seq[FileAction] = { hasWritten = true - val spark = data.sparkSession - val partitionSchema = metadata.physicalPartitionSchema + val spark = inputData.sparkSession + val (data, partitionSchema) = performCDCPartition(inputData) val outputPath = deltaLog.dataPath val (queryExecution, output, generatedColumnConstraints, _) = @@ -333,6 +356,6 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats)) } - resultFiles.toSeq + resultFiles.toSeq ++ committer.changeFiles } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 58b042ad267..78b766b13d9 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, GeneratedColumn, NoMapping} import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.schema.SchemaMergingUtils._ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -190,6 +191,14 @@ object SchemaUtils { if (dataFields.subsetOf(tableFields)) { data.toDF() } else { + // Allow the same shortcut logic (as the above `if` stmt) if the only extra fields are CDC + // metadata fields. + val nonCdcFields = dataFields.filterNot { f => + f == CDCReader.CDC_PARTITION_COL || f == CDCReader.CDC_TYPE_COLUMN_NAME + } + if (nonCdcFields.subsetOf(tableFields)) { + return data.toDF() + } // Check that nested columns don't need renaming. We can't handle that right now val topLevelDataFields = dataFields.map(UnresolvedAttribute.parseAttributeName(_).head) if (topLevelDataFields.subsetOf(tableFields)) { @@ -203,6 +212,9 @@ object SchemaUtils { val aliasExpressions = dataSchema.map { field => val originalCase: String = baseFields.get(field.name) match { case Some(original) => original.name + // This is a virtual partition column used for doing CDC writes. It's not actually + // in the table schema. + case None if field.name == CDCReader.CDC_TYPE_COLUMN_NAME => field.name case None => throw DeltaErrors.cannotResolveColumn(field, baseSchema) } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/cdc/DeleteCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/cdc/DeleteCDCSuite.scala new file mode 100644 index 00000000000..b497beb5b11 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/cdc/DeleteCDCSuite.scala @@ -0,0 +1,128 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.cdc + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.commands.cdc.CDCReader._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.lit + +class DeleteCDCSuite extends DeleteSQLSuite { + import testImplicits._ + + override protected def sparkConf: SparkConf = super.sparkConf + .set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true") + + protected def testCDCDelete(name: String)( + initialData: => Dataset[_], + partitionColumns: Seq[String] = Seq.empty, + deleteCondition: String, + expectedData: => Dataset[_], + expectedChangeData: => Dataset[_] + ): Unit = { + test(s"CDC - $name") { + withSQLConf( + (DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")) { + withTempDir { dir => + val path = dir.getAbsolutePath + initialData.write.format("delta").partitionBy(partitionColumns: _*) + .save(path) + if (deleteCondition.nonEmpty) { + sql(s"DELETE FROM delta.`$path` WHERE $deleteCondition") + } else { + sql(s"DELETE FROM delta.`$path`") + } + + val log = DeltaLog.forTable(spark, dir) + + checkAnswer( + spark.read.format("delta").load(path), + expectedData.toDF()) + // The timestamp is nondeterministic so we drop it when comparing results. + checkAnswer( + CDCReader.changesToBatchDF(log, 1, 1, spark).drop(CDC_COMMIT_TIMESTAMP), + expectedChangeData.toDF()) + } + } + } + } + + testCDCDelete("unconditional")( + initialData = spark.range(10), + deleteCondition = "", + expectedData = spark.range(0), + expectedChangeData = spark.range(10) + .withColumn(CDC_TYPE_COLUMN_NAME, lit("delete")) + .withColumn(CDC_COMMIT_VERSION, lit(1)) + ) + + testCDCDelete("conditional covering all rows")( + initialData = spark.range(10), + deleteCondition = "id < 100", + expectedData = spark.range(0), + expectedChangeData = spark.range(10) + .withColumn(CDC_TYPE_COLUMN_NAME, lit("delete")) + .withColumn(CDC_COMMIT_VERSION, lit(1)) + ) + + testCDCDelete("two random rows")( + initialData = spark.range(10), + deleteCondition = "id = 2 OR id = 8", + expectedData = Seq(0, 1, 3, 4, 5, 6, 7, 9).toDF(), + expectedChangeData = Seq(2, 8).toDF() + .withColumn(CDC_TYPE_COLUMN_NAME, lit("delete")) + .withColumn(CDC_COMMIT_VERSION, lit(1)) + ) + + testCDCDelete("delete unconditionally - partitioned table")( + initialData = spark.range(100).selectExpr("id % 10 as part", "id"), + partitionColumns = Seq("part"), + deleteCondition = "", + expectedData = Seq.empty[(Long, Long)].toDF("part", "id"), + expectedChangeData = + spark.range(100) + .selectExpr("id % 10 as part", "id", "'delete' as _change_type", "1 as _commit_version") + ) + + testCDCDelete("delete all rows by condition - partitioned table")( + initialData = spark.range(100).selectExpr("id % 10 as part", "id"), + partitionColumns = Seq("part"), + deleteCondition = "id < 1000", + expectedData = Seq.empty[(Long, Long)].toDF("part", "id"), + expectedChangeData = + spark.range(100) + .selectExpr("id % 10 as part", "id", "'delete' as _change_type", "1 as _commit_version") + ) + + + testCDCDelete("partition-optimized delete")( + initialData = spark.range(100).selectExpr("id % 10 as part", "id"), + partitionColumns = Seq("part"), + deleteCondition = "part = 3", + expectedData = + spark.range(100).selectExpr("id % 10 as part", "id").where("part != 3"), + expectedChangeData = + Range(0, 10).map(x => x * 10 + 3).toDF("id") + .selectExpr("3 as part", "id", "'delete' as _change_type", "1 as _commit_version")) + +} + diff --git a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index d161fd2cb1b..67c1a769c2c 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.delta.schema import java.util.Locale import java.util.regex.Pattern +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.schema.SchemaMergingUtils._ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -1002,6 +1003,15 @@ class SchemaUtilsSuite extends QueryTest assert(normalizeColumnNames(schema, df).schema.fieldNames === Seq("a", "b")) } + test("can normalize CDC type column") { + val df = Seq((1, 2, 3, 4)).toDF("Abc", "def", "gHi", CDCReader.CDC_TYPE_COLUMN_NAME) + val schema = new StructType() + .add("abc", IntegerType) + .add("Def", IntegerType) + .add("ghi", IntegerType) + assert(normalizeColumnNames(schema, df).schema.fieldNames === + schema.fieldNames :+ CDCReader.CDC_TYPE_COLUMN_NAME) + } //////////////////////////// // mergeSchemas