From d4b929b49b7962131e514783ab1ca1024244b48e Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Mon, 20 Oct 2014 15:30:46 +0800 Subject: [PATCH 1/4] [SPARK-4005][CORE] handle message replies in receive instead of in the individual private methods --- .../storage/BlockManagerMasterActor.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 088f06e389d83..3696eeddf3ad7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -73,9 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case UpdateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => - // TODO: Ideally we want to handle all the message replies in receive instead of in the - // individual private methods. - updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) + sender ! updateBlockInfo( + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) @@ -351,23 +350,23 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long) { + tachyonSize: Long): Boolean = { + var updated = true if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - sender ! true + // do nothing here, "updated == true". } else { - sender ! false + updated = false } - return + return updated } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - sender ! true - return + return updated } blockManagerInfo(blockManagerId).updateBlockInfo( @@ -391,7 +390,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus if (locations.size == 0) { blockLocations.remove(blockId) } - sender ! true + updated } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { From 242166b028af60624699f3a8410703a4505119c9 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 5 Dec 2014 09:20:52 +0800 Subject: [PATCH 2/4] modified accroding to the comments --- .../spark/storage/BlockManagerMasterActor.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 3696eeddf3ad7..0c678d54bc201 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -352,21 +352,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus diskSize: Long, tachyonSize: Long): Boolean = { - var updated = true if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - // do nothing here, "updated == true". + return true } else { - updated = false + return false } - return updated + return true } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - return updated + return true } blockManagerInfo(blockManagerId).updateBlockInfo( @@ -390,7 +389,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus if (locations.size == 0) { blockLocations.remove(blockId) } - updated + true } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { From bf518cd7004bdc767deda7f8484d28b60ef83925 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 5 Dec 2014 09:28:37 +0800 Subject: [PATCH 3/4] change the indent --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 0c678d54bc201..609b7b7214173 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -74,7 +74,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case UpdateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => sender ! updateBlockInfo( - blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) From 9b06f0ae7e69fba6a87f56ee97ffad6fd0a20e4b Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 5 Dec 2014 11:14:52 +0800 Subject: [PATCH 4/4] remove the unreachable code --- .../scala/org/apache/spark/storage/BlockManagerMasterActor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 609b7b7214173..37022f25759d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -360,7 +360,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } else { return false } - return true } if (blockId == null) {