Skip to content

Commit b761a84

Browse files
committed
Check for existing executor
1 parent d2fbc02 commit b761a84

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,17 @@ class BlockManagerMasterEndpoint(
317317
val blocksToDeleteByShuffleService =
318318
new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
319319
mapOutputTracker.shuffleStatuses.get(shuffleId).map { shuffleStatus =>
320-
shuffleStatus.mapStatuses
321-
.filter(_.location.port == externalShuffleServicePort)
322-
.foreach { mapStatus =>
320+
shuffleStatus.mapStatuses.foreach { mapStatus =>
321+
// Port should always be external shuffle port if external shuffle is enabled
322+
val isShufflePort = mapStatus.location.port == externalShuffleServicePort
323+
val executorDeallocated = !blockManagerIdByExecutor.contains(mapStatus.location.executorId)
324+
if (isShufflePort && executorDeallocated) {
323325
val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
324326
new mutable.HashSet[BlockId])
325327
val blocksToDel =
326328
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId)
327329
blocksToDel.foreach(blocks.add(_))
330+
}
328331
}
329332
}
330333

0 commit comments

Comments
 (0)