Skip to content

Commit cc7aea0

Browse files
committed
Applying review comments (from Imran)
1 parent f0e141d commit cc7aea0

File tree

4 files changed

+8
-22
lines changed

4 files changed

+8
-22
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ private[spark] class ExecutorAllocationManager(
491491
newExecutorTotal = numExistingExecutors
492492
if (testing || executorsRemoved.nonEmpty) {
493493
executorsRemoved.foreach { removedExecutorId =>
494-
// If it is a cached block, it uses cachedExecutorIdleTimeoutS for timeout
494+
// If it has an exclusive cached block then cachedExecutorIdleTimeoutS is used for timeout
495495
val idleTimeout = if (blockManagerMaster.hasExclusiveCachedBlocks(removedExecutorId)) {
496496
cachedExecutorIdleTimeoutS
497497
} else {
@@ -605,9 +605,9 @@ private[spark] class ExecutorAllocationManager(
605605
private def onExecutorIdle(executorId: String): Unit = synchronized {
606606
if (executorIds.contains(executorId)) {
607607
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
608-
// Note that it is not necessary to query the executors since all the cached
609-
// blocks we are concerned with are reported to the driver. Note that this
610-
// does not include broadcast blocks.
608+
// Note that it is not necessary to query the executors since all the cached blocks we are
609+
// concerned with are reported to the driver. This does not include broadcast blocks and
610+
// non-exclusive blocks which are also available via the external shuffle service.
611611
val hasCachedBlocks = blockManagerMaster.hasExclusiveCachedBlocks(executorId)
612612
val now = clock.getTimeMillis()
613613
val timeout = {

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ class BlockManagerMaster(
223223
}
224224

225225
/**
226-
* Find out if the executor has cached blocks which are only available via this executor.
226+
* Find out if the executor has cached blocks which are not available via the external shuffle
227+
* service.
227228
* This method does not consider broadcast blocks, since they are not reported the master.
228229
*/
229230
def hasExclusiveCachedBlocks(executorId: String): Boolean = {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,8 @@ private[spark] class BlockManagerInfo(
545545
private val _blocks = new JHashMap[BlockId, BlockStatus]
546546

547547
/**
548-
* Cached blocks held exclusively by this BlockManager. This does not include broadcast blocks
549-
* and local disk persisted blocks when external shuffle service is enabled.
548+
* Cached blocks which are not available via the external shuffle service.
549+
* This does not include broadcast blocks.
550550
*/
551551
private val _exclusiveCachedBlocks = new mutable.HashSet[BlockId]
552552

core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
6363
}
6464
}
6565

66-
testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled, bmInfo) =>
67-
val rddId: BlockId = RDDBlockId(0, 0)
68-
bmInfo.updateBlockInfo(
69-
rddId, StorageLevel.MEMORY_AND_DISK, memSize = 200, diskSize = 400)
70-
assert(bmInfo.blocks.asScala ===
71-
Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
72-
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
73-
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
74-
assert(bmInfo.remainingMem === 29800)
75-
if (svcEnabled) {
76-
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
77-
Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
78-
}
79-
}
80-
8166
testWithShuffleServiceOnOff("RDD block with DISK_ONLY") { (svcEnabled, bmInfo) =>
8267
val rddId: BlockId = RDDBlockId(0, 0)
8368
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)

0 commit comments

Comments
 (0)