From 32f0b75df434703b0fd253a75a07c06cbac8276a Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 5 Sep 2017 18:38:10 +0800 Subject: [PATCH 1/2] Uniform calling for getFile and print root cause for doPut --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 ++++++ .../scala/org/apache/spark/storage/DiskBlockManager.scala | 6 +++--- .../src/main/scala/org/apache/spark/storage/DiskStore.scala | 6 +++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aaacabe79ace4..9968a48647bd1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -988,6 +988,12 @@ private[spark] class BlockManager( logWarning(s"Putting block $blockId failed") } res + } catch { + // Since removeBlockInternal may throw exception, + // we should print exception first to show root cause. + case e: Throwable => + logWarning(s"Putting block $blockId failed due to exception $e.") + throw e } finally { // This cleanup is performed in a finally block rather than a `catch` to avoid having to // catch and properly re-throw InterruptedException. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3d43e3c367aac..3daa85d9bd1cf 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -53,7 +53,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). - def getFile(filename: String): File = { + private def getFileInternal(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -77,11 +77,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFile(blockId.name) + def getFile(blockId: BlockId): File = getFileInternal(blockId.name) /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() + getFileInternal(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 95d70479ef017..e0b57bb94a8be 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -100,7 +100,7 @@ private[spark] class DiskStore( } def getBytes(blockId: BlockId): BlockData = { - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) val blockSize = getSize(blockId) securityManager.getIOEncryptionKey() match { @@ -116,7 +116,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId.name) - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) if (file.exists()) { val ret = file.delete() if (!ret) { @@ -129,7 +129,7 @@ private[spark] class DiskStore( } def contains(blockId: BlockId): Boolean = { - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) file.exists() } From 082ab2a579e641c1f92e34d6814fad5ed176e1a5 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Sat, 9 Sep 2017 08:55:49 +0800 Subject: [PATCH 2/2] Split prs --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 ------ .../scala/org/apache/spark/storage/DiskBlockManager.scala | 6 +++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9968a48647bd1..aaacabe79ace4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -988,12 +988,6 @@ private[spark] class BlockManager( logWarning(s"Putting block $blockId failed") } res - } catch { - // Since removeBlockInternal may throw exception, - // we should print exception first to show root cause. - case e: Throwable => - logWarning(s"Putting block $blockId failed due to exception $e.") - throw e } finally { // This cleanup is performed in a finally block rather than a `catch` to avoid having to // catch and properly re-throw InterruptedException. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3daa85d9bd1cf..e7320b2c46ada 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -53,7 +53,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). - private def getFileInternal(filename: String): File = { + private def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -77,11 +77,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFileInternal(blockId.name) + def getFile(blockId: BlockId): File = getFile(blockId.name) /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFileInternal(blockId.name).exists() + getFile(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */