Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,7 @@ public Option<MapStatus> stop(boolean success) {
try {
for (DiskBlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
File file = writer.revertPartialWritesAndClose();
if (!file.delete()) {
logger.error("Error while deleting file {}", file.getAbsolutePath());
}
writer.closeAndDelete();
}
} finally {
partitionWriters = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
import java.nio.file.Files
import java.util.zip.Checksum

import org.apache.spark.errors.SparkCoreErrors
Expand Down Expand Up @@ -118,6 +119,11 @@ private[spark] class DiskBlockObjectWriter(
*/
private var numRecordsWritten = 0

/**
* Keep track the number of written records committed.
*/
private var numRecordsCommitted = 0L

/**
* Set the checksum that the checksumOutputStream should use
*/
Expand Down Expand Up @@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
numRecordsCommitted += numRecordsWritten
numRecordsWritten = 0
fileSegment
} else {
Expand Down Expand Up @@ -272,6 +279,25 @@ private[spark] class DiskBlockObjectWriter(
file
}

/**
* Reverts write metrics and delete the file held by current `DiskBlockObjectWriter`.
* Callers should invoke this function when there are runtime exceptions in file
* writing process and the file is no longer needed.
*/
def closeAndDelete(): Unit = {
Utils.tryWithSafeFinally {
if (initialized) {
writeMetrics.decBytesWritten(reportedPosition)
writeMetrics.decRecordsWritten(numRecordsCommitted + numRecordsWritten)
closeResources()
}
} {
if (!Files.deleteIfExists(file.toPath)) {
logWarning(s"Error deleting $file")
}
}
}

/**
* Writes a key-value pair.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C](
if (!success) {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
writer.closeAndDelete()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C](
if (!success) {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
writer.closeAndDelete()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,23 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
writer.close()
assert(segment.length === 0)
}

test("calling closeAndDelete() on a partial write file") {
val (writer, file, writeMetrics) = createWriter()

for (i <- 1 to 1000) {
writer.write(i, i)
}
val firstSegment = writer.commitAndGet()
assert(firstSegment.length === file.length())
assert(writeMetrics.bytesWritten === file.length())

for (i <- 1 to 500) {
writer.write(i, i)
}
writer.closeAndDelete()
assert(!file.exists())
assert(writeMetrics.bytesWritten == 0)
assert(writeMetrics.recordsWritten == 0)
}
}