From f9819d5cab83d56532ab924a29c7affc7c369dda Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 20 Jan 2017 16:00:18 +0800 Subject: [PATCH 1/2] Fix inconsistent state in DiskBlockObject when expection occurred Change-Id: I837b5135dd67034d74a9832133dc29800c88f089 --- .../spark/storage/DiskBlockObjectWriter.scala | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 3cb12fca7dcc..fb6bbe8612c5 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter( */ private def closeResources(): Unit = { if (initialized) { - mcs.manualClose() - channel = null - mcs = null - bs = null - fos = null - ts = null - objOut = null - initialized = false - streamOpen = false - hasBeenClosed = true + Utils.tryWithSafeFinally { + mcs.manualClose() + } { + channel = null + mcs = null + bs = null + fos = null + ts = null + objOut = null + initialized = false + streamOpen = false + hasBeenClosed = true + } } } @@ -206,18 +209,22 @@ private[spark] class DiskBlockObjectWriter( streamOpen = false closeResources() } + } catch { + case e: Exception => + logError("Uncaught exception while closing file " + file, e) + } - val truncateStream = new FileOutputStream(file, true) - try { - truncateStream.getChannel.truncate(committedPosition) - file - } finally { - truncateStream.close() - } + var truncateStream: FileOutputStream = null + try { + truncateStream = new FileOutputStream(file, true) + truncateStream.getChannel.truncate(committedPosition) + file } catch { case e: Exception => logError("Uncaught exception while reverting partial writes to file " + file, e) file + } finally { + truncateStream.close() } } From b0fe795157a41925ba38bba02ee10a79518c8e42 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 22 Jan 2017 15:09:16 +0800 Subject: [PATCH 2/2] Address the comments Change-Id: I6f88a75cf6467508c77cbfc28c25e4fc7c172373 --- .../spark/storage/DiskBlockObjectWriter.scala | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index fb6bbe8612c5..eb3ff926372a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -202,30 +202,29 @@ private[spark] class DiskBlockObjectWriter( def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. - try { + Utils.tryWithSafeFinally { if (initialized) { writeMetrics.decBytesWritten(reportedPosition - committedPosition) writeMetrics.decRecordsWritten(numRecordsWritten) streamOpen = false closeResources() } - } catch { - case e: Exception => - logError("Uncaught exception while closing file " + file, e) - } - - var truncateStream: FileOutputStream = null - try { - truncateStream = new FileOutputStream(file, true) - truncateStream.getChannel.truncate(committedPosition) - file - } catch { - case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) - file - } finally { - truncateStream.close() + } { + var truncateStream: FileOutputStream = null + try { + truncateStream = new FileOutputStream(file, true) + truncateStream.getChannel.truncate(committedPosition) + } catch { + case e: Exception => + logError("Uncaught exception while reverting partial writes to file " + file, e) + } finally { + if (truncateStream != null) { + truncateStream.close() + truncateStream = null + } + } } + file } /**