From e57e270906da88599cd240bff3d0fce6334bd954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Wed, 3 Dec 2014 15:19:50 +0800 Subject: [PATCH 1/3] add check info is already remove or not while having gotten info.syn --- .../apache/spark/storage/BlockManager.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 308c59eda594d..1b7a1a326dc47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1010,9 +1010,9 @@ private[spark] class BlockManager( info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. - if (!info.waitForReady()) { + if (blockInfo.get(blockId).isEmpty || !info.waitForReady()) { // If we get here, the block write failed. - logWarning(s"Block $blockId was marked as failure. Nothing to drop") + logWarning(s"Block $blockId was marked as failure or already dropped. Nothing to drop") return None } @@ -1089,15 +1089,17 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or tachyon store") + if (blockInfo.get(blockId).isEmpty) { + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or tachyon store") + } + blockInfo.remove(blockId) } - blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { val status = getCurrentBlockStatus(blockId, info) reportBlockStatus(blockId, info, status) @@ -1126,12 +1128,14 @@ private[spark] class BlockManager( val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) if (time < cleanupTime && shouldDrop(id)) { info.synchronized { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { tachyonStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") + if (blockInfo.get(id).isEmpty) { + val level = info.level + if (level.useMemory) { memoryStore.remove(id) } + if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { tachyonStore.remove(id) } + iterator.remove() + logInfo(s"Dropped block $id") + } } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) From 55fa4ba1e41eb36b1c4f867efbdd35c9b8a4f131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Mon, 8 Dec 2014 13:09:55 +0800 Subject: [PATCH 2/3] refine code --- .../apache/spark/storage/BlockManager.scala | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1b7a1a326dc47..8199d1c3efa57 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1010,9 +1010,12 @@ private[spark] class BlockManager( info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. - if (blockInfo.get(blockId).isEmpty || !info.waitForReady()) { + if (blockInfo.get(blockId).isEmpty) { + logWarning(s"Block $blockId was already dropped.") + return None + } else if(!info.waitForReady()) { // If we get here, the block write failed. - logWarning(s"Block $blockId was marked as failure or already dropped. Nothing to drop") + logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None } @@ -1089,17 +1092,15 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - if (blockInfo.get(blockId).isEmpty) { - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or tachyon store") - } - blockInfo.remove(blockId) + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or tachyon store") } + blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { val status = getCurrentBlockStatus(blockId, info) reportBlockStatus(blockId, info, status) @@ -1128,14 +1129,12 @@ private[spark] class BlockManager( val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) if (time < cleanupTime && shouldDrop(id)) { info.synchronized { - if (blockInfo.get(id).isEmpty) { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { tachyonStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") - } + val level = info.level + if (level.useMemory) { memoryStore.remove(id) } + if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { tachyonStore.remove(id) } + iterator.remove() + logInfo(s"Dropped block $id") } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) From edb989dac31dd5672c7677090ae637a2d7328ed8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Tue, 9 Dec 2014 10:24:53 +0800 Subject: [PATCH 3/3] Refine code style and comments position --- .../scala/org/apache/spark/storage/BlockManager.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8199d1c3efa57..d7b184f8a10e9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1010,15 +1010,14 @@ private[spark] class BlockManager( info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. - if (blockInfo.get(blockId).isEmpty) { - logWarning(s"Block $blockId was already dropped.") - return None - } else if(!info.waitForReady()) { + if (!info.waitForReady()) { // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None + } else if (blockInfo.get(blockId).isEmpty) { + logWarning(s"Block $blockId was already dropped.") + return None } - var blockIsUpdated = false val level = info.level