From 1f450aae6942cd03eff114a227377244ff227ce4 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Wed, 25 May 2022 08:33:36 -0700 Subject: [PATCH] Delta Lake - CDF - UPDATE command See the project plan at https://github.com/delta-io/delta/issues/1105. This PR adds CDF to the UPDATE command, during which we generate both preimage and postimage CDF data. This PR also adds UpdateCDCSuite which adds basic tests for these CDF changes. As a high-level overview of how this CDF-update operation is performed, when we find a row that satisfies the update condition, we `explode` an array containing the pre-image, post-image, and main-table updated rows. The pre-image and post-image rows are appropriately typed with the corresponding CDF_TYPE, and the main-table updated row has CDF_TYPE `null`. Thus, the first two rows will be written to the cdf parquet file, with the latter is written to standard main-table data parquet file. Closes delta-io/delta#1146 GitOrigin-RevId: 47413c5345bb97c0e1303a7f4d4d06b89c35ab7a --- .../sql/delta/commands/UpdateCommand.scala | 142 +++++++++++++++--- .../spark/sql/delta/UpdateSuiteBase.scala | 5 +- .../spark/sql/delta/cdc/UpdateCDCSuite.scala | 122 +++++++++++++++ 3 files changed, 243 insertions(+), 26 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala index 65c2b3a2e02..c88c6f4d68d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala @@ -16,14 +16,16 @@ package org.apache.spark.sql.delta.commands -import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaTableUtils, OptimisticTransaction} -import org.apache.spark.sql.delta.actions.{AddFile, FileAction} +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOperations, DeltaTableUtils, OptimisticTransaction} +import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} +import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC, CDC_TYPE_UPDATE_POSTIMAGE, CDC_TYPE_UPDATE_PREIMAGE} import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex} -import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, Literal} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -31,7 +33,7 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric -import org.apache.spark.sql.functions.{input_file_name, udf} +import org.apache.spark.sql.functions.{array, col, explode, input_file_name, lit, struct, typedLit, udf} /** * Performs an Update using `updateExpression` on the rows that match `condition` @@ -60,7 +62,9 @@ case class UpdateCommand( "numCopiedRows" -> createMetric(sc, "number of rows copied."), "executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"), "scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"), - "rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files") + "rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files"), + "numAddedChangeFiles" -> createMetric(sc, "number of change data capture files generated"), + "changeFileBytes" -> createMetric(sc, "total size of change data capture files generated") ) final override def run(sparkSession: SparkSession): Seq[Row] = { @@ -83,6 +87,8 @@ case class UpdateCommand( var numTouchedFiles: Long = 0 var numRewrittenFiles: Long = 0 + var numAddedChangeFiles: Long = 0 + var changeFileBytes: Long = 0 var scanTimeMs: Long = 0 var rewriteTimeMs: Long = 0 @@ -136,7 +142,7 @@ case class UpdateCommand( numTouchedFiles = filesToRewrite.length - val newAddActions = if (filesToRewrite.isEmpty) { + val newActions = if (filesToRewrite.isEmpty) { // Do nothing if no row qualifies the UPDATE condition Nil } else { @@ -148,7 +154,11 @@ case class UpdateCommand( } rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs - numRewrittenFiles = newAddActions.size + + val (changeActions, addActions) = newActions.partition(_.isInstanceOf[AddCDCFile]) + numRewrittenFiles = addActions.size + numAddedChangeFiles = changeActions.size + changeFileBytes = changeActions.collect { case f: AddCDCFile => f.size }.sum val totalActions = if (filesToRewrite.isEmpty) { // Do nothing if no row qualifies the UPDATE condition @@ -159,11 +169,13 @@ case class UpdateCommand( val operationTimestamp = System.currentTimeMillis() val deleteActions = filesToRewrite.map(_.removeWithTimestamp(operationTimestamp)) - deleteActions ++ newAddActions + deleteActions ++ newActions } if (totalActions.nonEmpty) { metrics("numAddedFiles").set(numRewrittenFiles) + metrics("numAddedChangeFiles").set(numAddedChangeFiles) + metrics("changeFileBytes").set(changeFileBytes) metrics("numRemovedFiles").set(numTouchedFiles) metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000) metrics("scanTimeMs").set(scanTimeMs) @@ -191,15 +203,20 @@ case class UpdateCommand( numFilesTotal, numTouchedFiles, numRewrittenFiles, - numAddedChangeFiles = 0, - changeFileBytes = 0, + numAddedChangeFiles, + changeFileBytes, scanTimeMs, rewriteTimeMs) ) } /** - * Scan all the affected files and write out the updated files + * Scan all the affected files and write out the updated files. + * + * When CDF is enabled, includes the generation of CDC preimage and postimage columns for + * changed rows. + * + * @return the list of [[AddFile]]s and [[AddCDCFile]]s that have been written. */ private def rewriteFiles( spark: SparkSession, @@ -213,28 +230,103 @@ case class UpdateCommand( spark, txn, "update", rootPath, inputLeafFiles, nameToAddFileMap) val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDf = Dataset.ofRows(spark, newTarget) - val updatedDataFrame = { - val updatedColumns = buildUpdatedColumns(condition) - targetDf.select(updatedColumns: _*) - } + + val updatedDataFrame = UpdateCommand.withUpdatedColumns( + target, + updateExpressions, + condition, + targetDf.withColumn(UpdateCommand.CONDITION_COLUMN_NAME, new Column(condition)), + UpdateCommand.shouldOutputCdc(txn)) txn.writeFiles(updatedDataFrame) } +} + +object UpdateCommand { + val FILE_NAME_COLUMN = "_input_file_name_" + val CONDITION_COLUMN_NAME = "__condition__" + + /** + * Whether or not CDC is enabled on this table and, thus, if we should output CDC data during this + * UPDATE operation. + */ + def shouldOutputCdc(txn: OptimisticTransaction): Boolean = { + DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(txn.metadata) + } /** * Build the new columns. If the condition matches, generate the new value using - * the corresponding UPDATE EXPRESSION; otherwise, keep the original column value + * the corresponding UPDATE EXPRESSION; otherwise, keep the original column value. + * + * When CDC is enabled, includes the generation of CDC pre-image and post-image columns for + * changed rows. + * + * @param target target we are updating into + * @param updateExpressions the update transformation to perform on the input DataFrame + * @param dfWithEvaluatedCondition source DataFrame on which we will apply the update expressions + * with an additional column CONDITION_COLUMN_NAME which is the + * true/false value of if the update condition is satisfied + * @param condition update condition + * @param shouldOutputCdc if we should output CDC data during this UPDATE operation. + * @return the updated DataFrame, with extra CDC columns if CDC is enabled */ - private def buildUpdatedColumns(condition: Expression): Seq[Column] = { - updateExpressions.zip(target.output).map { case (update, original) => - val updated = If(condition, update, original) - new Column(Alias(updated, original.name)()) + def withUpdatedColumns( + target: LogicalPlan, + updateExpressions: Seq[Expression], + condition: Expression, + dfWithEvaluatedCondition: DataFrame, + shouldOutputCdc: Boolean): DataFrame = { + val resultDf = if (shouldOutputCdc) { + val namedUpdateCols = updateExpressions.zip(target.output).map { + case (expr, targetCol) => new Column(expr).as(targetCol.name) + } + + // Build an array of output rows to be unpacked later. If the condition is matched, we + // generate CDC pre and postimages in addition to the final output row; if the condition + // isn't matched, we just generate a rewritten no-op row without any CDC events. + val preimageCols = target.output.map(new Column(_)) :+ + lit(CDC_TYPE_UPDATE_PREIMAGE).as(CDC_TYPE_COLUMN_NAME) + val postimageCols = namedUpdateCols :+ + lit(CDC_TYPE_UPDATE_POSTIMAGE).as(CDC_TYPE_COLUMN_NAME) + val updatedDataCols = namedUpdateCols :+ + typedLit[String](CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + val noopRewriteCols = target.output.map(new Column(_)) :+ + typedLit[String](CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + val packedUpdates = array( + struct(preimageCols: _*), + struct(postimageCols: _*), + struct(updatedDataCols: _*) + ).expr + + val packedData = if (condition == Literal.TrueLiteral) { + packedUpdates + } else { + If( + UnresolvedAttribute(CONDITION_COLUMN_NAME), + packedUpdates, // if it should be updated, then use `packagedUpdates` + array(struct(noopRewriteCols: _*)).expr) // else, this is a noop rewrite + } + + // Explode the packed array, and project back out the final data columns. + val finalColNames = target.output.map(_.name) :+ CDC_TYPE_COLUMN_NAME + dfWithEvaluatedCondition + .select(explode(new Column(packedData)).as("packedData")) + .select(finalColNames.map { n => col(s"packedData.`$n`").as(s"$n") }: _*) + } else { + val finalCols = updateExpressions.zip(target.output).map { case (update, original) => + val updated = if (condition == Literal.TrueLiteral) { + update + } else { + If(UnresolvedAttribute(CONDITION_COLUMN_NAME), update, original) + } + new Column(Alias(updated, original.name)()) + } + + dfWithEvaluatedCondition.select(finalCols: _*) } - } -} -object UpdateCommand { - val FILE_NAME_COLUMN = "_input_file_name_" + resultDf.drop(CONDITION_COLUMN_NAME) + } } /** diff --git a/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala index 7eb337dd4fe..7a67393c778 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala @@ -92,7 +92,10 @@ abstract class UpdateSuiteBase tableName: Option[String] = None): Unit = { executeUpdate(tableName.getOrElse(s"delta.`$tempPath`"), setClauses, where = condition.orNull) checkAnswer( - tableName.map(readDeltaTable(_)).getOrElse(readDeltaTableByPath(tempPath)), + tableName + .map(readDeltaTable(_)) + .getOrElse(readDeltaTableByPath(tempPath)) + .select("key", "value"), expectedResults) } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala new file mode 100644 index 00000000000..1d386e81696 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala @@ -0,0 +1,122 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.cdc + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.commands.cdc.CDCReader + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class UpdateCDCSuite extends UpdateSQLSuite with DeltaColumnMappingTestUtils { + import testImplicits._ + + override protected def sparkConf: SparkConf = super.sparkConf + .set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true") + + test("CDC for unconditional update") { + append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value")) + + checkUpdate( + condition = None, + setClauses = "value = -1", + expectedResults = Row(1, -1) :: Row(2, -1) :: Row(3, -1) :: Row(4, -1) :: Nil) + + checkAnswer( + CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) :: + Row(2, 2, "update_preimage", 1) :: Row(2, -1, "update_postimage", 1) :: + Row(3, 3, "update_preimage", 1) :: Row(3, -1, "update_postimage", 1) :: + Row(4, 4, "update_preimage", 1) :: Row(4, -1, "update_postimage", 1) :: Nil) + } + + test("CDC for conditional update on all rows") { + append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value")) + + checkUpdate( + condition = Some("key < 10"), + setClauses = "value = -1", + expectedResults = Row(1, -1) :: Row(2, -1) :: Row(3, -1) :: Row(4, -1) :: Nil) + + checkAnswer( + CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) :: + Row(2, 2, "update_preimage", 1) :: Row(2, -1, "update_postimage", 1) :: + Row(3, 3, "update_preimage", 1) :: Row(3, -1, "update_postimage", 1) :: + Row(4, 4, "update_preimage", 1) :: Row(4, -1, "update_postimage", 1) :: Nil) + } + + test("CDC for point update") { + append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value")) + + checkUpdate( + condition = Some("key = 1"), + setClauses = "value = -1", + expectedResults = Row(1, -1) :: Row(2, 2) :: Row(3, 3) :: Row(4, 4) :: Nil) + + checkAnswer( + CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) :: Nil) + } + + test("CDC for partition-optimized update") { + append( + Seq((1, 1, 1), (2, 2, 0), (3, 3, 1), (4, 4, 0)).toDF("key", "value", "part"), + partitionBy = Seq("part")) + + checkUpdate( + condition = Some("part = 1"), + setClauses = "value = -1", + expectedResults = Row(1, -1) :: Row(2, 2) :: Row(3, -1) :: Row(4, 4) :: Nil) + + checkAnswer( + CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(1, 1, 1, "update_preimage", 1) :: Row(1, -1, 1, "update_postimage", 1) :: + Row(3, 3, 1, "update_preimage", 1) :: Row(3, -1, 1, "update_postimage", 1) :: Nil) + } + + + test("update a partitioned CDC enabled table to set the partition column to null") { + val tableName = "part_table_test" + withTable(tableName) { + Seq((0, 0, 0), (1, 1, 1), (2, 2, 2)) + .toDF("key", "partition_column", "value") + .write + .partitionBy("partition_column") + .format("delta") + .saveAsTable(tableName) + sql(s"INSERT INTO $tableName VALUES (4, 4, 4)") + sql(s"UPDATE $tableName SET partition_column = null WHERE partition_column = 4") + checkAnswer( + CDCReader.changesToBatchDF( + DeltaLog.forTable( + spark, + spark.sessionState.sqlParser.parseTableIdentifier(tableName) + ), 1, 3, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(4, 4, 4, "insert", 1) :: + Row(4, 4, 4, "update_preimage", 2) :: + Row(4, null, 4, "update_postimage", 2) :: Nil) + } + } +} +