From a5936ba700e0efef90b14e817817fc2fdc382bf5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Sep 2016 18:38:59 -0700 Subject: [PATCH 1/6] Add internal version of removeBlock that can be called while holding lock. --- .../apache/spark/storage/BlockManager.scala | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0614646771bd..faa8a0552d01 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1316,21 +1316,29 @@ private[spark] class BlockManager( // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") case Some(info) => - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or external block store") - } - blockInfoManager.removeBlock(blockId) - val removeBlockStatus = getCurrentBlockStatus(blockId, info) - if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, info, removeBlockStatus) - } - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus) - } + removeBlockInternal(blockId, info, tellMaster) + } + } + + /** + * Internal version of [[removeBlock()]] which assumes that the caller already holds a write + * lock on the block. + */ + private def removeBlockInternal(blockId: BlockId, info: BlockInfo, tellMaster: Boolean): Unit = { + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + if (!removedFromMemory && !removedFromDisk) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or external block store") + } + blockInfoManager.removeBlock(blockId) + val removeBlockStatus = getCurrentBlockStatus(blockId, info) + if (tellMaster && info.tellMaster) { + reportBlockStatus(blockId, info, removeBlockStatus) + } + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus) } } From f3890a53d969b09b8a9260c739c17d5610f895aa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 9 Sep 2016 10:57:08 -0700 Subject: [PATCH 2/6] BlockStatus should always be empty following removal. --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index faa8a0552d01..ef1804062db0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -374,7 +374,7 @@ private[spark] class BlockManager( info.synchronized { info.level match { case null => - BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) + BlockStatus.empty case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) @@ -1333,12 +1333,11 @@ private[spark] class BlockManager( "the disk, memory, or external block store") } blockInfoManager.removeBlock(blockId) - val removeBlockStatus = getCurrentBlockStatus(blockId, info) if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, info, removeBlockStatus) + reportBlockStatus(blockId, info, BlockStatus.empty) } Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus.empty) } } From dca6a8a5dd88b25ba764431af6514287ad2dbca4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 9 Sep 2016 11:19:03 -0700 Subject: [PATCH 3/6] Move if(info.tellMaster) check out of reportBlockStatus. --- .../apache/spark/storage/BlockManager.scala | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ef1804062db0..a1dd58e0d50b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -217,7 +217,7 @@ private[spark] class BlockManager( logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") for ((blockId, info) <- blockInfoManager.entries) { val status = getCurrentBlockStatus(blockId, info) - if (!tryToReportBlockStatus(blockId, info, status)) { + if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { logError(s"Failed to report $blockId to master; giving up.") return } @@ -333,10 +333,9 @@ private[spark] class BlockManager( */ private def reportBlockStatus( blockId: BlockId, - info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Unit = { - val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize) + val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize) if (needReregister) { logInfo(s"Got told to re-register updating block $blockId") // Re-registering will report our new block for free. @@ -352,17 +351,12 @@ private[spark] class BlockManager( */ private def tryToReportBlockStatus( blockId: BlockId, - info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Boolean = { - if (info.tellMaster) { - val storageLevel = status.storageLevel - val inMemSize = Math.max(status.memSize, droppedMemorySize) - val onDiskSize = status.diskSize - master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) - } else { - true - } + val storageLevel = status.storageLevel + val inMemSize = Math.max(status.memSize, droppedMemorySize) + val onDiskSize = status.diskSize + master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) } /** @@ -807,8 +801,8 @@ private[spark] class BlockManager( // Now that the block is in either the memory or disk store, // tell the master about it. info.size = size - if (tellMaster) { - reportBlockStatus(blockId, info, putBlockStatus) + if (tellMaster && info.tellMaster) { + reportBlockStatus(blockId, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) @@ -964,8 +958,8 @@ private[spark] class BlockManager( // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. info.size = size - if (tellMaster) { - reportBlockStatus(blockId, info, putBlockStatus) + if (tellMaster && info.tellMaster) { + reportBlockStatus(blockId, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) @@ -1271,7 +1265,7 @@ private[spark] class BlockManager( val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { - reportBlockStatus(blockId, info, status, droppedMemorySize) + reportBlockStatus(blockId, status, droppedMemorySize) } if (blockIsUpdated) { Option(TaskContext.get()).foreach { c => @@ -1334,7 +1328,7 @@ private[spark] class BlockManager( } blockInfoManager.removeBlock(blockId) if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, info, BlockStatus.empty) + reportBlockStatus(blockId, BlockStatus.empty) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus.empty) From 1406048601dc3c4504c421ba02922a18bcfdcdf4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 9 Sep 2016 16:28:09 -0700 Subject: [PATCH 4/6] Remove more references to the external block store --- .../scala/org/apache/spark/storage/BlockManager.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a1dd58e0d50b..01114f2ad142 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -298,7 +298,7 @@ private[spark] class BlockManager( /** * Get the BlockStatus for the block identified by the given ID, if it exists. - * NOTE: This is mainly for testing, and it doesn't fetch information from external block store. + * NOTE: This is mainly for testing. */ def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfoManager.get(blockId).map { info => @@ -955,8 +955,7 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory, externalBlockStore, or disk store, - // tell the master about it. + // Now that the block is in either the memory or disk store, tell the master about it. info.size = size if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) @@ -1323,8 +1322,7 @@ private[spark] class BlockManager( val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) if (!removedFromMemory && !removedFromDisk) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or external block store") + logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") } blockInfoManager.removeBlock(blockId) if (tellMaster && info.tellMaster) { From 41daee846abe8324bc6f8f3994a6edd61d72ec95 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 9 Sep 2016 17:29:03 -0700 Subject: [PATCH 5/6] Remove info parameter from removeBlockInternal() --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 01114f2ad142..706523533f44 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1309,7 +1309,7 @@ private[spark] class BlockManager( // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") case Some(info) => - removeBlockInternal(blockId, info, tellMaster) + removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) } } @@ -1317,7 +1317,7 @@ private[spark] class BlockManager( * Internal version of [[removeBlock()]] which assumes that the caller already holds a write * lock on the block. */ - private def removeBlockInternal(blockId: BlockId, info: BlockInfo, tellMaster: Boolean): Unit = { + private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = { // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) @@ -1325,7 +1325,7 @@ private[spark] class BlockManager( logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") } blockInfoManager.removeBlock(blockId) - if (tellMaster && info.tellMaster) { + if (tellMaster) { reportBlockStatus(blockId, BlockStatus.empty) } Option(TaskContext.get()).foreach { c => From c3cc277af163c86cf6c238f475b8374118753198 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 9 Sep 2016 17:33:01 -0700 Subject: [PATCH 6/6] Move TaskMetrics block status update into helper function. --- .../apache/spark/storage/BlockManager.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 706523533f44..9e63777caf03 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -804,9 +804,7 @@ private[spark] class BlockManager( if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) - } + addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { @@ -960,9 +958,7 @@ private[spark] class BlockManager( if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) - } + addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis @@ -1267,9 +1263,7 @@ private[spark] class BlockManager( reportBlockStatus(blockId, status, droppedMemorySize) } if (blockIsUpdated) { - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) - } + addUpdatedBlockStatusToTaskMetrics(blockId, status) } status.storageLevel } @@ -1310,6 +1304,7 @@ private[spark] class BlockManager( logWarning(s"Asked to remove block $blockId, which does not exist") case Some(info) => removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) + addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) } } @@ -1328,8 +1323,11 @@ private[spark] class BlockManager( if (tellMaster) { reportBlockStatus(blockId, BlockStatus.empty) } + } + + private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus.empty) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) } }