Skip to content

Commit

Permalink
Added config option to enable parallel deletes for vacuum command
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Binford <adamq43@gmail.com>
  • Loading branch information
Kimahriman committed Sep 16, 2020
1 parent 3ed57d0 commit acdf936
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.net.URI
import java.util.Date
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -31,6 +29,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand Down Expand Up @@ -120,6 +119,8 @@ object VacuumCommand extends VacuumCommandImpl {
new SerializableConfiguration(sessionHadoopConf))
val basePath = fs.makeQualified(path).toString
var isBloomFiltered = false
val parallelDeleteEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)

val validFiles = snapshot.state
.mapPartitions { actions =>
Expand Down Expand Up @@ -224,7 +225,13 @@ object VacuumCommand extends VacuumCommandImpl {
}
logInfo(s"Deleting untracked files and empty directories in $path")

val filesDeleted = delete(diff, fs)
val toDelete = if (parallelDeleteEnabled) diff else diff.coalesce(1)
val filesDeleted = toDelete.mapPartitions { files =>
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val filesDeletedPerPartition =
files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
Iterator(filesDeletedPerPartition)
}.reduce(_ + _)

val stats = DeltaVacuumStats(
isDryRun = false,
Expand Down Expand Up @@ -267,14 +274,6 @@ trait VacuumCommandImpl extends DeltaCommand {
DeltaFileOperations.getAllSubDirectories(base, file)._1
}

/**
* Attempts to delete the list of candidate files. Returns the number of files deleted.
*/
protected def delete(diff: Dataset[String], fs: FileSystem): Long = {
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}

protected def stringToPath(path: String): Path = new Path(new URI(path))

protected def pathToString(path: Path): String = path.toUri.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ object DeltaSQLConf {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_PARALLEL_DELETE_ENABLED =
buildConf("vacuum.parallelDelete.enabled")
.doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " +
"may result hitting rate limits on some storage backends. When enabled, parallelization " +
"is controlled by the default number of shuffle partitions.")
.booleanConf
.createWithDefault(false)

val DELTA_CHECKPOINT_PART_SIZE =
buildConf("checkpoint.partSize")
.internal()
Expand Down

0 comments on commit acdf936

Please sign in to comment.