diff --git a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 84525c8d7c6..de221bb1b73 100644 --- a/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -21,8 +21,6 @@ import java.net.URI import java.util.Date import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ - import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -31,6 +29,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} @@ -120,6 +119,8 @@ object VacuumCommand extends VacuumCommandImpl { new SerializableConfiguration(sessionHadoopConf)) val basePath = fs.makeQualified(path).toString var isBloomFiltered = false + val parallelDeleteEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED) val validFiles = snapshot.state .mapPartitions { actions => @@ -224,7 +225,13 @@ object VacuumCommand extends VacuumCommandImpl { } logInfo(s"Deleting untracked files and empty directories in $path") - val filesDeleted = delete(diff, fs) + val toDelete = if (parallelDeleteEnabled) diff else diff.coalesce(1) + val filesDeleted = toDelete.mapPartitions { files => + val fs = new Path(basePath).getFileSystem(hadoopConf.value.value) + val filesDeletedPerPartition = + files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f)) + Iterator(filesDeletedPerPartition) + }.reduce(_ + _) val stats = DeltaVacuumStats( isDryRun = false, @@ -267,14 +274,6 @@ trait VacuumCommandImpl extends DeltaCommand { DeltaFileOperations.getAllSubDirectories(base, file)._1 } - /** - * Attempts to delete the list of candidate files. Returns the number of files deleted. - */ - protected def delete(diff: Dataset[String], fs: FileSystem): Long = { - val fileResultSet = diff.toLocalIterator().asScala - fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f)) - } - protected def stringToPath(path: String): Path = new Path(new URI(path)) protected def pathToString(path: Path): String = path.toUri.toString diff --git a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 9645d92e9f4..0589f8482e6 100644 --- a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -222,6 +222,14 @@ object DeltaSQLConf { .booleanConf .createWithDefault(true) + val DELTA_VACUUM_PARALLEL_DELETE_ENABLED = + buildConf("vacuum.parallelDelete.enabled") + .doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " + + "may result hitting rate limits on some storage backends. When enabled, parallelization " + + "is controlled by the default number of shuffle partitions.") + .booleanConf + .createWithDefault(false) + val DELTA_CHECKPOINT_PART_SIZE = buildConf("checkpoint.partSize") .internal()