Skip to content

Commit

Permalink
Add REORG operation details in commit history
Browse files Browse the repository at this point in the history
## Description
`REORG` command was added as part of the delta-io#1732. The operation is recorded as `OPTIMIZE`. This PR changes it to `REORG`

Updated existing UTs to verify the Delta table history

Closes delta-io#1753

Signed-off-by: Venki Korukanti <venki.korukanti@databricks.com>
GitOrigin-RevId: d5d27e6b5710659d3766eed0b0c32ec5b716d644
  • Loading branch information
vkorukanti committed May 16, 2023
1 parent 9e71440 commit 9391568
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ object DeltaOperations {
sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression])
extends OperationWithPredicates(name, predicates)

/** operation name for REORG command */
val REORG_OPERATION_NAME = "REORG"
/** operation name for OPTIMIZE command */
val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"
/** parameter key to indicate which columns to z-order by */
Expand Down Expand Up @@ -483,6 +485,17 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END
}

/** Recorded when running REORG on the table. */
case class Reorg(
predicate: Seq[Expression],
applyPurge: Boolean = true) extends OptimizeOrReorg(REORG_OPERATION_NAME, predicate) {
override val parameters: Map[String, Any] = super.parameters ++ Map(
"applyPurge" -> applyPurge
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,8 @@ class OptimizeExecutor(
val removedFiles = updates.collect { case r: RemoveFile => r }
val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq
if (addedFiles.size > 0) {
val operation = DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
commitAndRetry(txn, getOperation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = candidateFiles.map(_.path).toSet
val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet
Expand Down Expand Up @@ -430,6 +429,15 @@ class OptimizeExecutor(
}
}

/** Create the appropriate [[Operation]] object for txn commit history */
private def getOperation(): Operation = {
if (optimizeContext.isPurge) {
DeltaOperations.Reorg(partitionPredicate)
} else {
DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
}
}

/** Create a map of SQL metrics for adding to the commit history. */
private def createMetrics(
sparkContext: SparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.optimize
import org.apache.spark.sql.delta.DeletionVectorsTestUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import io.delta.tables.DeltaTable

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -54,6 +55,13 @@ class DeltaReorgSuite extends QueryTest
checkAnswer(
sql(s"SELECT * FROM delta.`$path`"),
(1 to 98).toDF())

// Verify commit history and operation metrics
checkOpHistory(
tablePath = path,
expOpParams = Map("applyPurge" -> "true", "predicate" -> "[]"),
numFilesRemoved = 2,
numFilesAdded = 1)
}
}

Expand All @@ -74,6 +82,13 @@ class DeltaReorgSuite extends QueryTest
sql(s"SELECT * FROM delta.`$path`"),
(1 to 98).toDF())

// Verify commit history and operation metrics
checkOpHistory(
tablePath = path,
expOpParams = Map("applyPurge" -> "true", "predicate" -> "[]"),
numFilesRemoved = 2,
numFilesAdded = 1)

// Second purge is a noop
val versionBefore = log.update().version
executePurge(path)
Expand Down Expand Up @@ -108,6 +123,31 @@ class DeltaReorgSuite extends QueryTest
val (addFiles2, _) = getFileActionsInLastVersion(log)
assert(addFiles2.size === 2)
assert(addFiles2.forall(_.deletionVector === null))

// Verify commit history and operation metrics
checkOpHistory(
tablePath = path.toString,
expOpParams = Map("applyPurge" -> "true", "predicate" -> "[\"'part IN (0,2)\"]"),
numFilesRemoved = 2,
numFilesAdded = 2)
}
}

private def checkOpHistory(
tablePath: String,
expOpParams: Map[String, String],
numFilesRemoved: Long,
numFilesAdded: Long): Unit = {
val (opName, opParams, opMetrics) = DeltaTable.forPath(tablePath)
.history(1)
.select("operation", "operationParameters", "operationMetrics")
.as[(String, Map[String, String], Map[String, String])]
.head()
assert(opName === "REORG")
assert(opParams === expOpParams)
assert(opMetrics("numAddedFiles").toLong === numFilesAdded)
assert(opMetrics("numRemovedFiles").toLong === numFilesRemoved)
// Because each deleted file has a DV associated it which gets rewritten as part of PURGE
assert(opMetrics("numDeletionVectorsRemoved").toLong === numFilesRemoved)
}
}

0 comments on commit 9391568

Please sign in to comment.