diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 9a5ac6f287be..da7a51854cc1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -289,10 +289,7 @@ public Option stop(boolean success) { try { for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - File file = writer.revertPartialWritesAndClose(); - if (!file.delete()) { - logger.error("Error while deleting file {}", file.getAbsolutePath()); - } + writer.closeAndDelete(); } } finally { partitionWriters = null; diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 417060949739..3bdae2fe74fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream} import java.nio.channels.{ClosedByInterruptException, FileChannel} +import java.nio.file.Files import java.util.zip.Checksum import org.apache.spark.errors.SparkCoreErrors @@ -118,6 +119,11 @@ private[spark] class DiskBlockObjectWriter( */ private var numRecordsWritten = 0 + /** + * Keep track the number of written records committed. + */ + private var numRecordsCommitted = 0L + /** * Set the checksum that the checksumOutputStream should use */ @@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter( // In certain compression codecs, more bytes are written after streams are closed writeMetrics.incBytesWritten(committedPosition - reportedPosition) reportedPosition = committedPosition + numRecordsCommitted += numRecordsWritten numRecordsWritten = 0 fileSegment } else { @@ -272,6 +279,25 @@ private[spark] class DiskBlockObjectWriter( file } + /** + * Reverts write metrics and delete the file held by current `DiskBlockObjectWriter`. + * Callers should invoke this function when there are runtime exceptions in file + * writing process and the file is no longer needed. + */ + def closeAndDelete(): Unit = { + Utils.tryWithSafeFinally { + if (initialized) { + writeMetrics.decBytesWritten(reportedPosition) + writeMetrics.decRecordsWritten(numRecordsCommitted + numRecordsWritten) + closeResources() + } + } { + if (!Files.deleteIfExists(file.toPath)) { + logWarning(s"Error deleting $file") + } + } + } + /** * Writes a key-value pair. */ diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 731131b688ca..f24c44b2f84e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.revertPartialWritesAndClose() - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting ${file}") - } - } + writer.closeAndDelete() } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eda408afa7ce..284e70e2b05d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.revertPartialWritesAndClose() - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting ${file}") - } - } + writer.closeAndDelete() } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index cea55012c1de..b1f9032c247a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -184,4 +184,23 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.close() assert(segment.length === 0) } + + test("calling closeAndDelete() on a partial write file") { + val (writer, file, writeMetrics) = createWriter() + + for (i <- 1 to 1000) { + writer.write(i, i) + } + val firstSegment = writer.commitAndGet() + assert(firstSegment.length === file.length()) + assert(writeMetrics.bytesWritten === file.length()) + + for (i <- 1 to 500) { + writer.write(i, i) + } + writer.closeAndDelete() + assert(!file.exists()) + assert(writeMetrics.bytesWritten == 0) + assert(writeMetrics.recordsWritten == 0) + } }