From 9f5afd4f9e8528da06907a6a2489a754b9cb409e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 4 Aug 2021 11:27:21 +0800 Subject: [PATCH 1/8] add a new method to avoid file truncate --- .../sort/BypassMergeSortShuffleWriter.java | 5 +---- .../spark/storage/DiskBlockObjectWriter.scala | 21 +++++++++++++++++++ .../collection/ExternalAppendOnlyMap.scala | 7 +------ .../util/collection/ExternalSorter.scala | 7 +------ 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 9a5ac6f287be..22448988eb6f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -289,10 +289,7 @@ public Option stop(boolean success) { try { for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - File file = writer.revertPartialWritesAndClose(); - if (!file.delete()) { - logger.error("Error while deleting file {}", file.getAbsolutePath()); - } + writer.deleteHeldFile(); } } finally { partitionWriters = null; diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index f5d8c0219dc8..eb3fad634c80 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -262,6 +262,27 @@ private[spark] class DiskBlockObjectWriter( file } + /** + * Reverts write metrics that haven't been committed yet and delete the file held + * by current `DiskBlockObjectWriter`. Callers should invoke this function when there + * are runtime exceptions in file writing process and the file is no longer needed. + */ + def deleteHeldFile(): Unit = { + Utils.tryWithSafeFinally { + if (initialized) { + writeMetrics.decBytesWritten(reportedPosition - committedPosition) + writeMetrics.decRecordsWritten(numRecordsWritten) + closeResources() + } + } { + if (file.exists()) { + if (!file.delete()) { + logWarning(s"Error deleting $file") + } + } + } + } + /** * Writes a key-value pair. */ diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 731131b688ca..c59ea809c53f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.revertPartialWritesAndClose() - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting ${file}") - } - } + writer.deleteHeldFile() } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eda408afa7ce..2be512a23355 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.revertPartialWritesAndClose() - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting ${file}") - } - } + writer.deleteHeldFile() } } From 25828f2489447817644f18fc0d05a2827512b452 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 4 Aug 2021 17:19:48 +0800 Subject: [PATCH 2/8] fix mridulm comments --- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 2 +- .../apache/spark/storage/DiskBlockObjectWriter.scala | 10 +++++----- .../spark/util/collection/ExternalAppendOnlyMap.scala | 2 +- .../apache/spark/util/collection/ExternalSorter.scala | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 22448988eb6f..da7a51854cc1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -289,7 +289,7 @@ public Option stop(boolean success) { try { for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - writer.deleteHeldFile(); + writer.closeAndDelete(); } } finally { partitionWriters = null; diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index eb3fad634c80..8daaf133981d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} import java.nio.channels.{ClosedByInterruptException, FileChannel} +import java.nio.file.Files import java.util.zip.Checksum import org.apache.spark.internal.Logging @@ -28,6 +29,7 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.util.Utils import org.apache.spark.util.collection.PairsWriter + /** * A class for writing JVM objects directly to a file on disk. This class allows data to be appended * to an existing block. For efficiency, it retains the underlying file channel across @@ -267,7 +269,7 @@ private[spark] class DiskBlockObjectWriter( * by current `DiskBlockObjectWriter`. Callers should invoke this function when there * are runtime exceptions in file writing process and the file is no longer needed. */ - def deleteHeldFile(): Unit = { + def closeAndDelete(): Unit = { Utils.tryWithSafeFinally { if (initialized) { writeMetrics.decBytesWritten(reportedPosition - committedPosition) @@ -275,10 +277,8 @@ private[spark] class DiskBlockObjectWriter( closeResources() } } { - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting $file") - } + if (!Files.deleteIfExists(file.toPath)) { + logWarning(s"Error deleting $file") } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c59ea809c53f..f24c44b2f84e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -250,7 +250,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.deleteHeldFile() + writer.closeAndDelete() } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2be512a23355..284e70e2b05d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -321,7 +321,7 @@ private[spark] class ExternalSorter[K, V, C]( if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further - writer.deleteHeldFile() + writer.closeAndDelete() } } From 4b3b72b5a38b344d0cf4a8c942320fcbf58cf39c Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 17 Dec 2021 17:15:51 +0800 Subject: [PATCH 3/8] Add a test case for closeAndDelete --- .../storage/DiskBlockObjectWriterSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index cea55012c1de..c1f5d7ba7f51 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -184,4 +184,20 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.close() assert(segment.length === 0) } + + test("calling closeAndDelete() on a partial write file") { + val (writer, file, writeMetrics) = createWriter() + + writer.write(Long.box(20), Long.box(30)) + val firstSegment = writer.commitAndGet() + assert(firstSegment.length === file.length()) + assert(writeMetrics.bytesWritten === file.length()) + + writer.write(Long.box(40), Long.box(50)) + + writer.closeAndDelete() + assert(!file.exists()) + assert(writeMetrics.bytesWritten === firstSegment.length) + assert(writeMetrics.recordsWritten == 1) + } } From 9db7115fc980c80ee517f46e6844b39d76c93559 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 17 Dec 2021 19:46:43 +0800 Subject: [PATCH 4/8] record and revert metrics to 0 after closeAndDelete --- .../apache/spark/storage/DiskBlockObjectWriter.scala | 10 ++++++++-- .../spark/storage/DiskBlockObjectWriterSuite.scala | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index f170520854d2..ad75ba97863a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -120,6 +120,11 @@ private[spark] class DiskBlockObjectWriter( */ private var numRecordsWritten = 0 + /** + * Keep track of total number of records written. + */ + private var numRecordsTotalWritten = 0L + /** * Set the checksum that the checksumOutputStream should use */ @@ -282,8 +287,8 @@ private[spark] class DiskBlockObjectWriter( def closeAndDelete(): Unit = { Utils.tryWithSafeFinally { if (initialized) { - writeMetrics.decBytesWritten(reportedPosition - committedPosition) - writeMetrics.decRecordsWritten(numRecordsWritten) + writeMetrics.decBytesWritten(reportedPosition) + writeMetrics.decRecordsWritten(numRecordsTotalWritten) closeResources() } } { @@ -321,6 +326,7 @@ private[spark] class DiskBlockObjectWriter( */ def recordWritten(): Unit = { numRecordsWritten += 1 + numRecordsTotalWritten +=1 writeMetrics.incRecordsWritten(1) if (numRecordsWritten % 16384 == 0) { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index c1f5d7ba7f51..91af956202ca 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -189,15 +189,17 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { val (writer, file, writeMetrics) = createWriter() writer.write(Long.box(20), Long.box(30)) + writer.write(Long.box(10), Long.box(50)) val firstSegment = writer.commitAndGet() assert(firstSegment.length === file.length()) assert(writeMetrics.bytesWritten === file.length()) writer.write(Long.box(40), Long.box(50)) + writer.write(Long.box(30), Long.box(30)) writer.closeAndDelete() assert(!file.exists()) - assert(writeMetrics.bytesWritten === firstSegment.length) - assert(writeMetrics.recordsWritten == 1) + assert(writeMetrics.bytesWritten == 0) + assert(writeMetrics.recordsWritten == 0) } } From a1bad58687e94f3263a32b487f723490c599fa16 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 17 Dec 2021 23:55:49 +0800 Subject: [PATCH 5/8] refactor --- .../org/apache/spark/storage/DiskBlockObjectWriter.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index ad75ba97863a..df75183ceaa0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -121,9 +121,9 @@ private[spark] class DiskBlockObjectWriter( private var numRecordsWritten = 0 /** - * Keep track of total number of records written. + * Keep track the number of written records committed. */ - private var numRecordsTotalWritten = 0L + private var numRecordsCommitted = 0L /** * Set the checksum that the checksumOutputStream should use @@ -230,6 +230,7 @@ private[spark] class DiskBlockObjectWriter( // In certain compression codecs, more bytes are written after streams are closed writeMetrics.incBytesWritten(committedPosition - reportedPosition) reportedPosition = committedPosition + numRecordsCommitted += numRecordsWritten numRecordsWritten = 0 fileSegment } else { @@ -288,7 +289,7 @@ private[spark] class DiskBlockObjectWriter( Utils.tryWithSafeFinally { if (initialized) { writeMetrics.decBytesWritten(reportedPosition) - writeMetrics.decRecordsWritten(numRecordsTotalWritten) + writeMetrics.decRecordsWritten(numRecordsCommitted + numRecordsWritten) closeResources() } } { @@ -326,7 +327,6 @@ private[spark] class DiskBlockObjectWriter( */ def recordWritten(): Unit = { numRecordsWritten += 1 - numRecordsTotalWritten +=1 writeMetrics.incRecordsWritten(1) if (numRecordsWritten % 16384 == 0) { From a1e94b56e9675514ad2546fdcac8a5af36aa2b62 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 17 Dec 2021 23:57:15 +0800 Subject: [PATCH 6/8] remove a empty line --- .../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index df75183ceaa0..aa34273c8830 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -30,7 +30,6 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.util.Utils import org.apache.spark.util.collection.PairsWriter - /** * A class for writing JVM objects directly to a file on disk. This class allows data to be appended * to an existing block. For efficiency, it retains the underlying file channel across From 8fefa0379fb270184a134946416564983a2ae602 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sat, 18 Dec 2021 00:55:10 +0800 Subject: [PATCH 7/8] refactor test case --- .../spark/storage/DiskBlockObjectWriterSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 91af956202ca..b1f9032c247a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -188,15 +188,16 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { test("calling closeAndDelete() on a partial write file") { val (writer, file, writeMetrics) = createWriter() - writer.write(Long.box(20), Long.box(30)) - writer.write(Long.box(10), Long.box(50)) + for (i <- 1 to 1000) { + writer.write(i, i) + } val firstSegment = writer.commitAndGet() assert(firstSegment.length === file.length()) assert(writeMetrics.bytesWritten === file.length()) - writer.write(Long.box(40), Long.box(50)) - writer.write(Long.box(30), Long.box(30)) - + for (i <- 1 to 500) { + writer.write(i, i) + } writer.closeAndDelete() assert(!file.exists()) assert(writeMetrics.bytesWritten == 0) From c98ff1dd4fd69ed80fdf2d05852abd7825085ef5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 20 Dec 2021 10:27:11 +0800 Subject: [PATCH 8/8] fix comments --- .../org/apache/spark/storage/DiskBlockObjectWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index aa34273c8830..3bdae2fe74fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -280,9 +280,9 @@ private[spark] class DiskBlockObjectWriter( } /** - * Reverts write metrics that haven't been committed yet and delete the file held - * by current `DiskBlockObjectWriter`. Callers should invoke this function when there - * are runtime exceptions in file writing process and the file is no longer needed. + * Reverts write metrics and delete the file held by current `DiskBlockObjectWriter`. + * Callers should invoke this function when there are runtime exceptions in file + * writing process and the file is no longer needed. */ def closeAndDelete(): Unit = { Utils.tryWithSafeFinally {