Skip to content

Commit

Permalink
[DELTA-OSS-EXTERNAL] Added config option to enable parallel deletes f…
Browse files Browse the repository at this point in the history
…or vacuum command

Resolves #395

#416 hasn't been updated in over four months, and it would be a verify useful feature for us to have, so I took my own stab at it.

- A new config value is added `vacuum.parallelDelete.enabled` that defaults to false
- I updated the default behavior to be coalesce to 1 instead of iterate on the driver so that you can see something being done by spark in the UI/console instead of it just sitting there. I'm not sure if there's a reason this would cause issues, so happy to revert this back if you think it should be.
- If `vacuum.parallelDelete.enabled` is set to true, it maintains the existing partitions from the `diff` calculation. Because this is the result of a `join`, your partitions are then based off your `spark.sql.shuffle.partitions`. So your parallelism will be min(number of executors, shuffle partitions), and you can tweak your shuffle partitions if you want more/less parallelism

I removed the delete static method because the number of parameters that had to be passed to it made it seem like too much. Happy to move that back if that's not preferred.

Also happy to make any updates to the name or description of the new config.

Closes #522

Signed-off-by: Jose Torres <joseph.torres@databricks.com>

Author: Jose Torres <joseph.torres@databricks.com>
Author: Adam Binford <adamq43@gmail.com>

#12941 is resolved by jose-torres/ee2ucyf3.

GitOrigin-RevId: a73aa60a4820c4d6a37f0b21a0db31d72a09cfa5
  • Loading branch information
Kimahriman authored and brkyvz committed Oct 15, 2020
1 parent be42046 commit 40182f3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand All @@ -43,7 +44,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
* will be ignored. Then we take a diff of the files and delete directories that were already empty,
* and all files that are within the table that are no longer tracked.
*/
object VacuumCommand extends VacuumCommandImpl {
object VacuumCommand extends VacuumCommandImpl with Serializable {

/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
Expand Down Expand Up @@ -120,6 +121,8 @@ object VacuumCommand extends VacuumCommandImpl {
new SerializableConfiguration(sessionHadoopConf))
val basePath = fs.makeQualified(path).toString
var isBloomFiltered = false
val parallelDeleteEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)

val validFiles = snapshot.state
.mapPartitions { actions =>
Expand Down Expand Up @@ -224,7 +227,7 @@ object VacuumCommand extends VacuumCommandImpl {
}
logInfo(s"Deleting untracked files and empty directories in $path")

val filesDeleted = delete(diff, fs)
val filesDeleted = delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled)

val stats = DeltaVacuumStats(
isDryRun = false,
Expand Down Expand Up @@ -270,9 +273,25 @@ trait VacuumCommandImpl extends DeltaCommand {
/**
* Attempts to delete the list of candidate files. Returns the number of files deleted.
*/
protected def delete(diff: Dataset[String], fs: FileSystem): Long = {
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
protected def delete(
diff: Dataset[String],
spark: SparkSession,
basePath: String,
hadoopConf: Broadcast[SerializableConfiguration],
parallel: Boolean): Long = {
import spark.implicits._
if (parallel) {
diff.mapPartitions { files =>
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val filesDeletedPerPartition =
files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
Iterator(filesDeletedPerPartition)
}.reduce(_ + _)
} else {
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}
}

protected def stringToPath(path: String): Path = new Path(new URI(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ object DeltaSQLConf {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_PARALLEL_DELETE_ENABLED =
buildConf("vacuum.parallelDelete.enabled")
.doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " +
"may result hitting rate limits on some storage backends. When enabled, parallelization " +
"is controlled by the default number of shuffle partitions.")
.booleanConf
.createWithDefault(false)

val DELTA_SCHEMA_AUTO_MIGRATE =
buildConf("schema.autoMerge.enabled")
.doc("If true, enables schema merging on appends and on overwrites.")
Expand Down
20 changes: 20 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,26 @@ trait DeltaVacuumSuiteBase extends QueryTest
}
}

test("parallel file delete") {
withEnvironment { (tempDir, clock) =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock)
withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" -> "true") {
gcTest(deltaLog, clock)(
CreateFile("file1.txt", commitToActionLog = true),
CreateFile("file2.txt", commitToActionLog = true),
LogicallyDeleteFile("file1.txt"),
CheckFiles(Seq("file1.txt", "file2.txt")),
AdvanceClock(defaultTombstoneInterval + 1000),
GC(dryRun = false, Seq(tempDir)),
CheckFiles(Seq("file1.txt"), exist = false),
CheckFiles(Seq("file2.txt")),
GC(dryRun = false, Seq(tempDir)), // shouldn't throw an error with no files to delete
CheckFiles(Seq("file2.txt"))
)
}
}
}

test("retention duration must be greater than 0") {
withEnvironment { (tempDir, clock) =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock)
Expand Down

0 comments on commit 40182f3

Please sign in to comment.