Skip to content

Commit

Permalink
Delta Lake - CDF - UPDATE command
Browse files Browse the repository at this point in the history
See the project plan at #1105.

This PR adds CDF to the UPDATE command, during which we generate both preimage and postimage CDF data.

This PR also adds UpdateCDCSuite which adds basic tests for these CDF changes.

As a high-level overview of how this CDF-update operation is performed, when we find a row that satisfies the update condition, we `explode` an array containing the pre-image, post-image, and main-table updated rows.

The pre-image and post-image rows are appropriately typed with the corresponding CDF_TYPE, and the main-table updated row has CDF_TYPE `null`. Thus, the first two rows will be written to the cdf parquet file, with the latter is written to standard main-table data parquet file.

Closes #1146

GitOrigin-RevId: 47413c5345bb97c0e1303a7f4d4d06b89c35ab7a
  • Loading branch information
scottsand-db committed May 25, 2022
1 parent ae9c3f1 commit 47ca27f
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaTableUtils, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOperations, DeltaTableUtils, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC, CDC_TYPE_UPDATE_POSTIMAGE, CDC_TYPE_UPDATE_PREIMAGE}
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{input_file_name, udf}
import org.apache.spark.sql.functions.{array, col, explode, input_file_name, lit, struct, typedLit, udf}

/**
* Performs an Update using `updateExpression` on the rows that match `condition`
Expand Down Expand Up @@ -60,7 +62,9 @@ case class UpdateCommand(
"numCopiedRows" -> createMetric(sc, "number of rows copied."),
"executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"),
"scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"),
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files")
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files"),
"numAddedChangeFiles" -> createMetric(sc, "number of change data capture files generated"),
"changeFileBytes" -> createMetric(sc, "total size of change data capture files generated")
)

final override def run(sparkSession: SparkSession): Seq[Row] = {
Expand All @@ -83,6 +87,8 @@ case class UpdateCommand(

var numTouchedFiles: Long = 0
var numRewrittenFiles: Long = 0
var numAddedChangeFiles: Long = 0
var changeFileBytes: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0

Expand Down Expand Up @@ -136,7 +142,7 @@ case class UpdateCommand(

numTouchedFiles = filesToRewrite.length

val newAddActions = if (filesToRewrite.isEmpty) {
val newActions = if (filesToRewrite.isEmpty) {
// Do nothing if no row qualifies the UPDATE condition
Nil
} else {
Expand All @@ -148,7 +154,11 @@ case class UpdateCommand(
}

rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
numRewrittenFiles = newAddActions.size

val (changeActions, addActions) = newActions.partition(_.isInstanceOf[AddCDCFile])
numRewrittenFiles = addActions.size
numAddedChangeFiles = changeActions.size
changeFileBytes = changeActions.collect { case f: AddCDCFile => f.size }.sum

val totalActions = if (filesToRewrite.isEmpty) {
// Do nothing if no row qualifies the UPDATE condition
Expand All @@ -159,11 +169,13 @@ case class UpdateCommand(
val operationTimestamp = System.currentTimeMillis()
val deleteActions = filesToRewrite.map(_.removeWithTimestamp(operationTimestamp))

deleteActions ++ newAddActions
deleteActions ++ newActions
}

if (totalActions.nonEmpty) {
metrics("numAddedFiles").set(numRewrittenFiles)
metrics("numAddedChangeFiles").set(numAddedChangeFiles)
metrics("changeFileBytes").set(changeFileBytes)
metrics("numRemovedFiles").set(numTouchedFiles)
metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000)
metrics("scanTimeMs").set(scanTimeMs)
Expand Down Expand Up @@ -191,15 +203,20 @@ case class UpdateCommand(
numFilesTotal,
numTouchedFiles,
numRewrittenFiles,
numAddedChangeFiles = 0,
changeFileBytes = 0,
numAddedChangeFiles,
changeFileBytes,
scanTimeMs,
rewriteTimeMs)
)
}

/**
* Scan all the affected files and write out the updated files
* Scan all the affected files and write out the updated files.
*
* When CDF is enabled, includes the generation of CDC preimage and postimage columns for
* changed rows.
*
* @return the list of [[AddFile]]s and [[AddCDCFile]]s that have been written.
*/
private def rewriteFiles(
spark: SparkSession,
Expand All @@ -213,28 +230,103 @@ case class UpdateCommand(
spark, txn, "update", rootPath, inputLeafFiles, nameToAddFileMap)
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
val targetDf = Dataset.ofRows(spark, newTarget)
val updatedDataFrame = {
val updatedColumns = buildUpdatedColumns(condition)
targetDf.select(updatedColumns: _*)
}

val updatedDataFrame = UpdateCommand.withUpdatedColumns(
target,
updateExpressions,
condition,
targetDf.withColumn(UpdateCommand.CONDITION_COLUMN_NAME, new Column(condition)),
UpdateCommand.shouldOutputCdc(txn))

txn.writeFiles(updatedDataFrame)
}
}

object UpdateCommand {
val FILE_NAME_COLUMN = "_input_file_name_"
val CONDITION_COLUMN_NAME = "__condition__"

/**
* Whether or not CDC is enabled on this table and, thus, if we should output CDC data during this
* UPDATE operation.
*/
def shouldOutputCdc(txn: OptimisticTransaction): Boolean = {
DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(txn.metadata)
}

/**
* Build the new columns. If the condition matches, generate the new value using
* the corresponding UPDATE EXPRESSION; otherwise, keep the original column value
* the corresponding UPDATE EXPRESSION; otherwise, keep the original column value.
*
* When CDC is enabled, includes the generation of CDC pre-image and post-image columns for
* changed rows.
*
* @param target target we are updating into
* @param updateExpressions the update transformation to perform on the input DataFrame
* @param dfWithEvaluatedCondition source DataFrame on which we will apply the update expressions
* with an additional column CONDITION_COLUMN_NAME which is the
* true/false value of if the update condition is satisfied
* @param condition update condition
* @param shouldOutputCdc if we should output CDC data during this UPDATE operation.
* @return the updated DataFrame, with extra CDC columns if CDC is enabled
*/
private def buildUpdatedColumns(condition: Expression): Seq[Column] = {
updateExpressions.zip(target.output).map { case (update, original) =>
val updated = If(condition, update, original)
new Column(Alias(updated, original.name)())
def withUpdatedColumns(
target: LogicalPlan,
updateExpressions: Seq[Expression],
condition: Expression,
dfWithEvaluatedCondition: DataFrame,
shouldOutputCdc: Boolean): DataFrame = {
val resultDf = if (shouldOutputCdc) {
val namedUpdateCols = updateExpressions.zip(target.output).map {
case (expr, targetCol) => new Column(expr).as(targetCol.name)
}

// Build an array of output rows to be unpacked later. If the condition is matched, we
// generate CDC pre and postimages in addition to the final output row; if the condition
// isn't matched, we just generate a rewritten no-op row without any CDC events.
val preimageCols = target.output.map(new Column(_)) :+
lit(CDC_TYPE_UPDATE_PREIMAGE).as(CDC_TYPE_COLUMN_NAME)
val postimageCols = namedUpdateCols :+
lit(CDC_TYPE_UPDATE_POSTIMAGE).as(CDC_TYPE_COLUMN_NAME)
val updatedDataCols = namedUpdateCols :+
typedLit[String](CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME)
val noopRewriteCols = target.output.map(new Column(_)) :+
typedLit[String](CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME)
val packedUpdates = array(
struct(preimageCols: _*),
struct(postimageCols: _*),
struct(updatedDataCols: _*)
).expr

val packedData = if (condition == Literal.TrueLiteral) {
packedUpdates
} else {
If(
UnresolvedAttribute(CONDITION_COLUMN_NAME),
packedUpdates, // if it should be updated, then use `packagedUpdates`
array(struct(noopRewriteCols: _*)).expr) // else, this is a noop rewrite
}

// Explode the packed array, and project back out the final data columns.
val finalColNames = target.output.map(_.name) :+ CDC_TYPE_COLUMN_NAME
dfWithEvaluatedCondition
.select(explode(new Column(packedData)).as("packedData"))
.select(finalColNames.map { n => col(s"packedData.`$n`").as(s"$n") }: _*)
} else {
val finalCols = updateExpressions.zip(target.output).map { case (update, original) =>
val updated = if (condition == Literal.TrueLiteral) {
update
} else {
If(UnresolvedAttribute(CONDITION_COLUMN_NAME), update, original)
}
new Column(Alias(updated, original.name)())
}

dfWithEvaluatedCondition.select(finalCols: _*)
}
}
}

object UpdateCommand {
val FILE_NAME_COLUMN = "_input_file_name_"
resultDf.drop(CONDITION_COLUMN_NAME)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ abstract class UpdateSuiteBase
tableName: Option[String] = None): Unit = {
executeUpdate(tableName.getOrElse(s"delta.`$tempPath`"), setClauses, where = condition.orNull)
checkAnswer(
tableName.map(readDeltaTable(_)).getOrElse(readDeltaTableByPath(tempPath)),
tableName
.map(readDeltaTable(_))
.getOrElse(readDeltaTableByPath(tempPath))
.select("key", "value"),
expectedResults)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.cdc

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands.cdc.CDCReader

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class UpdateCDCSuite extends UpdateSQLSuite with DeltaColumnMappingTestUtils {
import testImplicits._

override protected def sparkConf: SparkConf = super.sparkConf
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")

test("CDC for unconditional update") {
append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value"))

checkUpdate(
condition = None,
setClauses = "value = -1",
expectedResults = Row(1, -1) :: Row(2, -1) :: Row(3, -1) :: Row(4, -1) :: Nil)

checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) ::
Row(2, 2, "update_preimage", 1) :: Row(2, -1, "update_postimage", 1) ::
Row(3, 3, "update_preimage", 1) :: Row(3, -1, "update_postimage", 1) ::
Row(4, 4, "update_preimage", 1) :: Row(4, -1, "update_postimage", 1) :: Nil)
}

test("CDC for conditional update on all rows") {
append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value"))

checkUpdate(
condition = Some("key < 10"),
setClauses = "value = -1",
expectedResults = Row(1, -1) :: Row(2, -1) :: Row(3, -1) :: Row(4, -1) :: Nil)

checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) ::
Row(2, 2, "update_preimage", 1) :: Row(2, -1, "update_postimage", 1) ::
Row(3, 3, "update_preimage", 1) :: Row(3, -1, "update_postimage", 1) ::
Row(4, 4, "update_preimage", 1) :: Row(4, -1, "update_postimage", 1) :: Nil)
}

test("CDC for point update") {
append(Seq((1, 1), (2, 2), (3, 3), (4, 4)).toDF("key", "value"))

checkUpdate(
condition = Some("key = 1"),
setClauses = "value = -1",
expectedResults = Row(1, -1) :: Row(2, 2) :: Row(3, 3) :: Row(4, 4) :: Nil)

checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(1, 1, "update_preimage", 1) :: Row(1, -1, "update_postimage", 1) :: Nil)
}

test("CDC for partition-optimized update") {
append(
Seq((1, 1, 1), (2, 2, 0), (3, 3, 1), (4, 4, 0)).toDF("key", "value", "part"),
partitionBy = Seq("part"))

checkUpdate(
condition = Some("part = 1"),
setClauses = "value = -1",
expectedResults = Row(1, -1) :: Row(2, 2) :: Row(3, -1) :: Row(4, 4) :: Nil)

checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempPath), 1, 1, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(1, 1, 1, "update_preimage", 1) :: Row(1, -1, 1, "update_postimage", 1) ::
Row(3, 3, 1, "update_preimage", 1) :: Row(3, -1, 1, "update_postimage", 1) :: Nil)
}


test("update a partitioned CDC enabled table to set the partition column to null") {
val tableName = "part_table_test"
withTable(tableName) {
Seq((0, 0, 0), (1, 1, 1), (2, 2, 2))
.toDF("key", "partition_column", "value")
.write
.partitionBy("partition_column")
.format("delta")
.saveAsTable(tableName)
sql(s"INSERT INTO $tableName VALUES (4, 4, 4)")
sql(s"UPDATE $tableName SET partition_column = null WHERE partition_column = 4")
checkAnswer(
CDCReader.changesToBatchDF(
DeltaLog.forTable(
spark,
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
), 1, 3, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(4, 4, 4, "insert", 1) ::
Row(4, 4, 4, "update_preimage", 2) ::
Row(4, null, 4, "update_postimage", 2) :: Nil)
}
}
}

0 comments on commit 47ca27f

Please sign in to comment.