diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 6c18cf1b184b..49a75b26d79b 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -80,9 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new HashMap[String, Long] - private val executorTimeoutMs = sc.conf.get( - config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d7f7eedc7f33..b3eecdee8f1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -82,6 +82,15 @@ class BlockManagerMasterEndpoint( mapper } + private val executorTimeoutMs = Utils.executorTimeoutMs(conf) + private val blockManagerInfoCleaner = { + val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L) + val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner") + executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay, + TimeUnit.MILLISECONDS) + executor + } + val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) @@ -190,12 +199,12 @@ class BlockManagerMasterEndpoint( } } bmIdsExecutor.foreach { bmId => - blockManagerInfo.get(bmId).foreach { bmInfo => + aliveBlockManagerInfo(bmId).foreach { bmInfo => bmInfo.removeBlock(blockId) } } } - val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo => + val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo => bmInfo.slaveEndpoint.ask[Int](removeMsg).recover { case e: IOException => logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " + @@ -224,7 +233,7 @@ class BlockManagerMasterEndpoint( // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) Future.sequence( - blockManagerInfo.values.map { bm => + allAliveBlockManagerInfos.map { bm => bm.slaveEndpoint.ask[Boolean](removeMsg) }.toSeq ) @@ -237,7 +246,7 @@ class BlockManagerMasterEndpoint( */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) - val requiredBlockManagers = blockManagerInfo.values.filter { info => + val requiredBlockManagers = allAliveBlockManagerInfos.filter { info => removeFromDriver || !info.blockManagerId.isDriver } val futures = requiredBlockManagers.map { bm => @@ -255,12 +264,23 @@ class BlockManagerMasterEndpoint( private def removeBlockManager(blockManagerId: BlockManagerId): Unit = { val info = blockManagerInfo(blockManagerId) + // SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal + // timestamp of the executor in BlockManagerInfo. This info will be removed from + // blockManagerInfo map by the blockManagerInfoCleaner once + // now() - info.executorRemovalTs > executorTimeoutMs. + // + // We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration + // while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping + // of executors in Spark. + // Delaying this removal until blockManagerInfoCleaner decides to remove it ensures + // BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed + // executor to reregister on BlockManagerHeartbeat message. + info.setExecutorRemovalTs() + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - // Remove it from blockManagerInfo and remove all the blocks. - blockManagerInfo.remove(blockManagerId) - + // remove all the blocks. val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -281,7 +301,7 @@ class BlockManagerMasterEndpoint( val i = (new Random(blockId.hashCode)).nextInt(locations.size) val blockLocations = locations.toSeq val candidateBMId = blockLocations(i) - blockManagerInfo.get(candidateBMId).foreach { bm => + aliveBlockManagerInfo(candidateBMId).foreach { bm => val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) bm.slaveEndpoint.ask[Boolean](replicateMsg) @@ -305,12 +325,11 @@ class BlockManagerMasterEndpoint( val locations = blockLocations.get(blockId) if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => - val blockManager = blockManagerInfo.get(blockManagerId) - if (blockManager.isDefined) { - // Remove the block from the slave's BlockManager. + aliveBlockManagerInfo(blockManagerId).foreach { bm => + // Remove the block from the BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. - blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) + bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) } } } @@ -318,14 +337,14 @@ class BlockManagerMasterEndpoint( // Return a map from the block manager id to max memory and remaining memory. private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { - blockManagerInfo.map { case(blockManagerId, info) => - (blockManagerId, (info.maxMem, info.remainingMem)) + allAliveBlockManagerInfos.map { info => + (info.blockManagerId, (info.maxMem, info.remainingMem)) }.toMap } private def storageStatus: Array[StorageStatus] = { - blockManagerInfo.map { case (blockManagerId, info) => - new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem), + allAliveBlockManagerInfos.map { info => + new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem), Some(info.maxOffHeapMem), info.blocks.asScala) }.toArray } @@ -347,7 +366,7 @@ class BlockManagerMasterEndpoint( * Futures to avoid potential deadlocks. This can arise if there exists a block manager * that is also waiting for this master endpoint's response to a previous message. */ - blockManagerInfo.values.map { info => + allAliveBlockManagerInfos.map { info => val blockStatusFuture = if (askSlaves) { info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus) @@ -371,7 +390,7 @@ class BlockManagerMasterEndpoint( askSlaves: Boolean): Future[Seq[BlockId]] = { val getMatchingBlockIds = GetMatchingBlockIds(filter) Future.sequence( - blockManagerInfo.values.map { info => + allAliveBlockManagerInfos.map { info => val future = if (askSlaves) { info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) @@ -433,9 +452,10 @@ class BlockManagerMasterEndpoint( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) + + listenerBus.post(SparkListenerBlockManagerAdded(time, id, + maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, - Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) id } @@ -505,7 +525,7 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) + aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) } } @@ -516,8 +536,7 @@ class BlockManagerMasterEndpoint( // can be used to access this block even when the original executor is already stopped. loc.host == requesterHost && (loc.port == externalShuffleServicePort || - blockManagerInfo - .get(loc) + aliveBlockManagerInfo(loc) .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) .getOrElse(false)) }.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) } @@ -534,7 +553,7 @@ class BlockManagerMasterEndpoint( /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = blockManagerInfo.keySet + val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet if (blockManagerIds.contains(blockManagerId)) { blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { @@ -548,7 +567,7 @@ class BlockManagerMasterEndpoint( private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); - info <- blockManagerInfo.get(blockManagerId) + info <- aliveBlockManagerInfo(blockManagerId) ) yield { info.slaveEndpoint } @@ -556,7 +575,27 @@ class BlockManagerMasterEndpoint( override def onStop(): Unit = { askThreadPool.shutdownNow() + blockManagerInfoCleaner.shutdownNow() + } + + private def cleanBlockManagerInfo(): Unit = { + logDebug("Cleaning blockManagerInfo") + val now = System.currentTimeMillis() + val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) => + // bmInfo.executorRemovalTs.get cannot be None when BM is not alive + !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs + }.keys + expiredBmIds.foreach { bmId => + logInfo(s"Cleaning expired $bmId from blockManagerInfo") + blockManagerInfo.remove(bmId) + } } + + @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] = + blockManagerInfo.get(bmId).filter(_.isAlive) + + @inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] = + blockManagerInfo.values.filter(_.isAlive) } @DeveloperApi @@ -584,6 +623,7 @@ private[spark] class BlockManagerInfo( private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem + private var _executorRemovalTs: Option[Long] = None // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] @@ -699,4 +739,16 @@ private[spark] class BlockManagerInfo( def clear(): Unit = { _blocks.clear() } + + def executorRemovalTs: Option[Long] = _executorRemovalTs + + def isAlive: Boolean = _executorRemovalTs.isEmpty + + def setExecutorRemovalTs(): Unit = { + if (!isAlive) { + logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}") + } else { + _executorRemovalTs = Some(System.currentTimeMillis()) + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b13ce4c9d550..137d68116b0d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2934,6 +2934,13 @@ private[spark] object Utils extends Logging { props.forEach((k, v) => resultProps.put(k, v)) resultProps } + + def executorTimeoutMs(conf: SparkConf): Long = { + // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses + // "milliseconds" + conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT) + .getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s")) + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9cf531383ce3..8246da656cd2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration._ -import scala.language.implicitConversions +import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils @@ -93,6 +93,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(MEMORY_STORAGE_FRACTION, 0.999) .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) + .set(STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "5s") } private def makeBlockManager( @@ -481,7 +482,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt()) } - test("reregistration on heart beat") { + test("no reregistration on heart beat until executor timeout") { val store = makeBlockManager(2000) val a1 = new Array[Byte](400) @@ -492,10 +493,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister) + assert(reregister == false, "master told to re-register") + + eventually(timeout(10 seconds), interval(1 seconds)) { + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( + BlockManagerHeartbeat(store.blockManagerId)) + assert(reregister, "master did not tell to re-register") + } } test("reregistration on block update") { @@ -509,6 +515,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + eventually(timeout(10 seconds), interval(1 seconds)) { + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( + BlockManagerHeartbeat(store.blockManagerId)) + assert(reregister, "master did not tell to re-register") + } + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister()