Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added config option to enable parallel deletes for vacuum command #522

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand All @@ -43,7 +44,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
* will be ignored. Then we take a diff of the files and delete directories that were already empty,
* and all files that are within the table that are no longer tracked.
*/
object VacuumCommand extends VacuumCommandImpl {
object VacuumCommand extends VacuumCommandImpl with Serializable {

/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
Expand Down Expand Up @@ -120,6 +121,8 @@ object VacuumCommand extends VacuumCommandImpl {
new SerializableConfiguration(sessionHadoopConf))
val basePath = fs.makeQualified(path).toString
var isBloomFiltered = false
val parallelDeleteEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)

val validFiles = snapshot.state
.mapPartitions { actions =>
Expand Down Expand Up @@ -224,7 +227,7 @@ object VacuumCommand extends VacuumCommandImpl {
}
logInfo(s"Deleting untracked files and empty directories in $path")

val filesDeleted = delete(diff, fs)
val filesDeleted = delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled)

val stats = DeltaVacuumStats(
isDryRun = false,
Expand Down Expand Up @@ -270,9 +273,25 @@ trait VacuumCommandImpl extends DeltaCommand {
/**
* Attempts to delete the list of candidate files. Returns the number of files deleted.
*/
protected def delete(diff: Dataset[String], fs: FileSystem): Long = {
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
protected def delete(
diff: Dataset[String],
spark: SparkSession,
basePath: String,
hadoopConf: Broadcast[SerializableConfiguration],
parallel: Boolean): Long = {
import spark.implicits._
if (parallel) {
diff.mapPartitions { files =>
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val filesDeletedPerPartition =
files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
Iterator(filesDeletedPerPartition)
}.reduce(_ + _)
} else {
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}
}

protected def stringToPath(path: String): Path = new Path(new URI(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ object DeltaSQLConf {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_PARALLEL_DELETE_ENABLED =
buildConf("vacuum.parallelDelete.enabled")
.doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " +
"may result hitting rate limits on some storage backends. When enabled, parallelization " +
"is controlled by the default number of shuffle partitions.")
.booleanConf
.createWithDefault(false)

val DELTA_SCHEMA_AUTO_MIGRATE =
buildConf("schema.autoMerge.enabled")
.doc("If true, enables schema merging on appends and on overwrites.")
Expand Down
20 changes: 20 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,26 @@ trait DeltaVacuumSuiteBase extends QueryTest
}
}

test("parallel file delete") {
withEnvironment { (tempDir, clock) =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock)
withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" -> "true") {
gcTest(deltaLog, clock)(
CreateFile("file1.txt", commitToActionLog = true),
CreateFile("file2.txt", commitToActionLog = true),
LogicallyDeleteFile("file1.txt"),
CheckFiles(Seq("file1.txt", "file2.txt")),
AdvanceClock(defaultTombstoneInterval + 1000),
GC(dryRun = false, Seq(tempDir)),
CheckFiles(Seq("file1.txt"), exist = false),
CheckFiles(Seq("file2.txt")),
GC(dryRun = false, Seq(tempDir)), // shouldn't throw an error with no files to delete
CheckFiles(Seq("file2.txt"))
)
}
}
}

test("retention duration must be greater than 0") {
withEnvironment { (tempDir, clock) =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath, clock)
Expand Down