Skip to content

Commit

Permalink
Change Data Feed - PR 2 - DELETE command
Browse files Browse the repository at this point in the history
See the project plan at(#1105).

This PR adds CDF write functionality to the DELETE command, as well as a test suite `DeleteCDCSuite`. At a high level, during the DELETE command, when we realize that we need to delete some rows in some files (but not the entire file), then instead of creating a new DataFrame which just contains the non-deleted rows (thus, in this new delta version, the previous rows were logically deleted), we instead partition the DataFrame into CDF-deleted columns and non-deleted columns.

Then, when writing out the parquet files, we write the CDF-deleted columns into their own CDF parquet file, and we write the non deleted rows into a standard main-table parquet file (same as usual). We then also add an extra `AddCDCFile` action to the transaction log.

Closes #1125.

GitOrigin-RevId: 7934de886589bf3d70ce81dcf9d7de598e35fb2e
  • Loading branch information
scottsand-db committed May 12, 2022
1 parent 8e06233 commit 7103115
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ private[delta] object DeltaOperationMetrics {
val DELETE = Set(
"numAddedFiles", // number of files added
"numRemovedFiles", // number of files removed
"numAddedChangeFiles", // number of CDC files
"numDeletedRows", // number of rows removed
"numCopiedRows", // number of rows copied in the process of deleting files
"executionTimeMs", // time taken to execute the entire operation
Expand All @@ -410,6 +411,7 @@ private[delta] object DeltaOperationMetrics {
*/
val DELETE_PARTITIONS = Set(
"numRemovedFiles", // number of files removed
"numAddedChangeFiles", // number of CDC files generated - generally 0 in this case
"executionTimeMs", // time taken to execute the entire operation
"scanTimeMs", // time taken to scan the files for matches
"rewriteTimeMs" // time taken to rewrite the matched files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,14 +910,30 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

// Post stats
// Here, we efficiently calculate various stats (number of each different action, number of
// bytes per action, etc.) by iterating over all actions, case matching by type, and updating
// variables. This is more efficient than a functional approach.
var numAbsolutePaths = 0
val distinctPartitions = new mutable.HashSet[Map[String, String]]
val adds = actions.collect {
var bytesNew: Long = 0L
var numAdd: Int = 0
var numRemove: Int = 0
var numCdcFiles: Int = 0
var cdcBytesNew: Long = 0L
actions.foreach {
case a: AddFile =>
numAdd += 1
if (a.pathAsUri.isAbsolute) numAbsolutePaths += 1
distinctPartitions += a.partitionValues
a
if (a.dataChange) bytesNew += a.size
case r: RemoveFile =>
numRemove += 1
case c: AddCDCFile =>
numCdcFiles += 1
cdcBytesNew += c.size
case _ =>
}

val needsCheckpoint = shouldCheckpoint(attemptVersion)
val stats = CommitStats(
startVersion = snapshot.version,
Expand All @@ -928,13 +944,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
fsWriteDurationMs = NANOSECONDS.toMillis(commitEndNano - fsWriteStartNano),
stateReconstructionDurationMs =
NANOSECONDS.toMillis(postCommitReconstructionTime - commitEndNano),
numAdd = adds.size,
numRemove = actions.collect { case r: RemoveFile => r }.size,
bytesNew = adds.filter(_.dataChange).map(_.size).sum,
numAdd = numAdd,
numRemove = numRemove,
bytesNew = bytesNew,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
numCdcFiles = 0,
cdcBytesNew = 0,
numCdcFiles = numCdcFiles,
cdcBytesNew = cdcBytesNew,
protocol = postCommitSnapshot.protocol,
commitSizeBytes = jsonActions.map(_.size).sum,
checkpointSizeBytes = postCommitSnapshot.checkpointSizeInBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction}
import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, InputFileName, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, If, InputFileName, Literal, Not}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{lit, typedLit, udf}

trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@transient private lazy val sc: SparkContext = SparkContext.getOrCreate()
Expand All @@ -53,7 +53,10 @@ trait DeleteCommandMetrics { self: LeafRunnableCommand =>
"numBytesRemoved" -> createMetric(sc, "number of bytes removed"),
"executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"),
"scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"),
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files")
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files"),
"numAddedChangeFiles" -> createMetric(sc, "number of change data capture files generated"),
"changeFileBytes" -> createMetric(sc, "total size of change data capture files generated"),
"numTouchedRows" -> createMetric(sc, "number of rows touched")
)
}

Expand Down Expand Up @@ -100,9 +103,11 @@ case class DeleteCommand(

var numRemovedFiles: Long = 0
var numAddedFiles: Long = 0
var numAddedChangeFiles: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
var numBytesAdded: Long = 0
var changeFileBytes: Long = 0
var numBytesRemoved: Long = 0
var numFilesBeforeSkipping: Long = 0
var numBytesBeforeSkipping: Long = 0
Expand Down Expand Up @@ -219,20 +224,12 @@ case class DeleteCommand(
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
val copiedRowCount = metrics("numCopiedRows")
val copiedRowUdf = udf { () =>
copiedRowCount += 1
true
}.asNondeterministic()
val targetDF = Dataset.ofRows(sparkSession, newTarget)
val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
val updatedDF = targetDF.filter(new Column(filterCond))
.filter(copiedRowUdf())

val rewrittenFiles = withStatusCode(
"DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {
txn.writeFiles(updatedDF)
}
val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
val (changeFiles, rewrittenFiles) = rewrittenActions
.partition(_.isInstanceOf[AddCDCFile])
numAddedFiles = rewrittenFiles.size
val removedFiles = filesToRewrite.map(f =>
getTouchedFile(deltaLog.dataPath, f, nameToAddFileMap))
val (removedBytes, removedPartitions) =
Expand All @@ -245,14 +242,15 @@ case class DeleteCommand(
numPartitionsRemovedFrom = Some(removedPartitions)
numPartitionsAddedTo = Some(rewrittenPartitions)
}
numAddedFiles = rewrittenFiles.size
numAddedChangeFiles = changeFiles.size
changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
numDeletedRows = Some(metrics("numDeletedRows").value)
numCopiedRows = Some(metrics("numCopiedRows").value)
numCopiedRows = Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value)

val operationTimestamp = System.currentTimeMillis()
removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++
rewrittenFiles
rewrittenActions
}
}
}
Expand All @@ -262,6 +260,8 @@ case class DeleteCommand(
metrics("executionTimeMs").set(executionTimeMs)
metrics("scanTimeMs").set(scanTimeMs)
metrics("rewriteTimeMs").set(rewriteTimeMs)
metrics("numAddedChangeFiles").set(numAddedChangeFiles)
metrics("changeFileBytes").set(changeFileBytes)
metrics("numBytesAdded").set(numBytesAdded)
metrics("numBytesRemoved").set(numBytesRemoved)
metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping)
Expand All @@ -287,7 +287,7 @@ case class DeleteCommand(
numAddedFiles,
numRemovedFiles,
numAddedFiles,
numAddedChangeFiles = 0,
numAddedChangeFiles = numAddedChangeFiles,
numFilesBeforeSkipping,
numBytesBeforeSkipping,
numFilesAfterSkipping,
Expand All @@ -299,13 +299,58 @@ case class DeleteCommand(
numDeletedRows,
numBytesAdded,
numBytesRemoved,
changeFileBytes = 0,
changeFileBytes = changeFileBytes,
scanTimeMs,
rewriteTimeMs)
)

deleteActions
}

/**
* Returns the list of [[AddFile]]s and [[AddCDCFile]]s that have been re-written.
*/
private def rewriteFiles(
txn: OptimisticTransaction,
baseData: DataFrame,
filterCondition: Expression,
numFilesToRewrite: Long): Seq[FileAction] = {
val shouldWriteCdc = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(txn.metadata)

// number of total rows that we have seen / are either copying or deleting (sum of both).
val numTouchedRows = metrics("numTouchedRows")
val numTouchedRowsUdf = udf { () =>
numTouchedRows += 1
true
}.asNondeterministic()

withStatusCode(
"DELTA", s"Rewriting $numFilesToRewrite files for DELETE operation") {
val dfToWrite = if (shouldWriteCdc) {
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
// The logic here ends up being surprisingly elegant, with all source rows ending up in
// the output. Recall that we flipped the user-provided delete condition earlier, before the
// call to `rewriteFiles`. All rows which match this latest `filterCondition` are retained
// as table data, while all rows which don't match are removed from the rewritten table data
// but do get included in the output as CDC events.
baseData
.filter(numTouchedRowsUdf())
.withColumn(
CDC_TYPE_COLUMN_NAME,
new Column(
If(filterCondition, typedLit[String](CDC_TYPE_NOT_CDC).expr,
lit(CDC_TYPE_DELETE).expr)
)
)
} else {
baseData
.filter(numTouchedRowsUdf())
.filter(new Column(filterCondition))
}

txn.writeFiles(dfToWrite)
}
}
}

object DeleteCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema._
Expand All @@ -31,13 +32,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.functions.to_json
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -215,18 +216,40 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
writeFiles(data, Nil)
}

/**
* Returns a tuple of (data, partition schema). For CDC writes, a `__is_cdc` column is added to
* the data and `__is_cdc=true/false` is added to the front of the partition schema.
*/
protected def performCDCPartition(inputData: Dataset[_]): (DataFrame, StructType) = {
// If this is a CDC write, we need to generate the CDC_PARTITION_COL in order to properly
// dispatch rows between the main table and CDC event records. This is a virtual partition
// and will be stripped out later in [[DelayedCommitProtocolEdge]].
// Note that the ordering of the partition schema is relevant - CDC_PARTITION_COL must
// come first in order to ensure CDC data lands in the right place.
if (CDCReader.isCDCEnabledOnTable(metadata) &&
inputData.schema.fieldNames.contains(CDCReader.CDC_TYPE_COLUMN_NAME)) {
val augmentedData = inputData.withColumn(
CDCReader.CDC_PARTITION_COL, col(CDCReader.CDC_TYPE_COLUMN_NAME).isNotNull)
val partitionSchema = StructType(
StructField(CDCReader.CDC_PARTITION_COL, StringType) +: metadata.physicalPartitionSchema)
(augmentedData, partitionSchema)
} else {
(inputData.toDF(), metadata.physicalPartitionSchema)
}
}

/**
* Writes out the dataframe after performing schema validation. Returns a list of
* actions to append these files to the reservoir.
*/
def writeFiles(
data: Dataset[_],
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
hasWritten = true

val spark = data.sparkSession
val partitionSchema = metadata.physicalPartitionSchema
val spark = inputData.sparkSession
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath

val (queryExecution, output, generatedColumnConstraints, _) =
Expand Down Expand Up @@ -333,6 +356,6 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
_.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats))
}

resultFiles.toSeq
resultFiles.toSeq ++ committer.changeFiles
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, GeneratedColumn, NoMapping}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.schema.SchemaMergingUtils._
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -190,6 +191,14 @@ object SchemaUtils {
if (dataFields.subsetOf(tableFields)) {
data.toDF()
} else {
// Allow the same shortcut logic (as the above `if` stmt) if the only extra fields are CDC
// metadata fields.
val nonCdcFields = dataFields.filterNot { f =>
f == CDCReader.CDC_PARTITION_COL || f == CDCReader.CDC_TYPE_COLUMN_NAME
}
if (nonCdcFields.subsetOf(tableFields)) {
return data.toDF()
}
// Check that nested columns don't need renaming. We can't handle that right now
val topLevelDataFields = dataFields.map(UnresolvedAttribute.parseAttributeName(_).head)
if (topLevelDataFields.subsetOf(tableFields)) {
Expand All @@ -203,6 +212,9 @@ object SchemaUtils {
val aliasExpressions = dataSchema.map { field =>
val originalCase: String = baseFields.get(field.name) match {
case Some(original) => original.name
// This is a virtual partition column used for doing CDC writes. It's not actually
// in the table schema.
case None if field.name == CDCReader.CDC_TYPE_COLUMN_NAME => field.name
case None =>
throw DeltaErrors.cannotResolveColumn(field, baseSchema)
}
Expand Down
Loading

0 comments on commit 7103115

Please sign in to comment.