diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 49a75b26d79b1..6c18cf1b184b6 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new HashMap[String, Long] - private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf) + private val executorTimeoutMs = sc.conf.get( + config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT + ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b3eecdee8f1de..d7f7eedc7f33b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -82,15 +82,6 @@ class BlockManagerMasterEndpoint( mapper } - private val executorTimeoutMs = Utils.executorTimeoutMs(conf) - private val blockManagerInfoCleaner = { - val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L) - val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner") - executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay, - TimeUnit.MILLISECONDS) - executor - } - val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) @@ -199,12 +190,12 @@ class BlockManagerMasterEndpoint( } } bmIdsExecutor.foreach { bmId => - aliveBlockManagerInfo(bmId).foreach { bmInfo => + blockManagerInfo.get(bmId).foreach { bmInfo => bmInfo.removeBlock(blockId) } } } - val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo => + val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo => bmInfo.slaveEndpoint.ask[Int](removeMsg).recover { case e: IOException => logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " + @@ -233,7 +224,7 @@ class BlockManagerMasterEndpoint( // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) Future.sequence( - allAliveBlockManagerInfos.map { bm => + blockManagerInfo.values.map { bm => bm.slaveEndpoint.ask[Boolean](removeMsg) }.toSeq ) @@ -246,7 +237,7 @@ class BlockManagerMasterEndpoint( */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) - val requiredBlockManagers = allAliveBlockManagerInfos.filter { info => + val requiredBlockManagers = blockManagerInfo.values.filter { info => removeFromDriver || !info.blockManagerId.isDriver } val futures = requiredBlockManagers.map { bm => @@ -264,23 +255,12 @@ class BlockManagerMasterEndpoint( private def removeBlockManager(blockManagerId: BlockManagerId): Unit = { val info = blockManagerInfo(blockManagerId) - // SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal - // timestamp of the executor in BlockManagerInfo. This info will be removed from - // blockManagerInfo map by the blockManagerInfoCleaner once - // now() - info.executorRemovalTs > executorTimeoutMs. - // - // We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration - // while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping - // of executors in Spark. - // Delaying this removal until blockManagerInfoCleaner decides to remove it ensures - // BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed - // executor to reregister on BlockManagerHeartbeat message. - info.setExecutorRemovalTs() - // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - // remove all the blocks. + // Remove it from blockManagerInfo and remove all the blocks. + blockManagerInfo.remove(blockManagerId) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -301,7 +281,7 @@ class BlockManagerMasterEndpoint( val i = (new Random(blockId.hashCode)).nextInt(locations.size) val blockLocations = locations.toSeq val candidateBMId = blockLocations(i) - aliveBlockManagerInfo(candidateBMId).foreach { bm => + blockManagerInfo.get(candidateBMId).foreach { bm => val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) bm.slaveEndpoint.ask[Boolean](replicateMsg) @@ -325,11 +305,12 @@ class BlockManagerMasterEndpoint( val locations = blockLocations.get(blockId) if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => - aliveBlockManagerInfo(blockManagerId).foreach { bm => - // Remove the block from the BlockManager. + val blockManager = blockManagerInfo.get(blockManagerId) + if (blockManager.isDefined) { + // Remove the block from the slave's BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. - bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) + blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) } } } @@ -337,14 +318,14 @@ class BlockManagerMasterEndpoint( // Return a map from the block manager id to max memory and remaining memory. private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { - allAliveBlockManagerInfos.map { info => - (info.blockManagerId, (info.maxMem, info.remainingMem)) + blockManagerInfo.map { case(blockManagerId, info) => + (blockManagerId, (info.maxMem, info.remainingMem)) }.toMap } private def storageStatus: Array[StorageStatus] = { - allAliveBlockManagerInfos.map { info => - new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem), + blockManagerInfo.map { case (blockManagerId, info) => + new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem), Some(info.maxOffHeapMem), info.blocks.asScala) }.toArray } @@ -366,7 +347,7 @@ class BlockManagerMasterEndpoint( * Futures to avoid potential deadlocks. This can arise if there exists a block manager * that is also waiting for this master endpoint's response to a previous message. */ - allAliveBlockManagerInfos.map { info => + blockManagerInfo.values.map { info => val blockStatusFuture = if (askSlaves) { info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus) @@ -390,7 +371,7 @@ class BlockManagerMasterEndpoint( askSlaves: Boolean): Future[Seq[BlockId]] = { val getMatchingBlockIds = GetMatchingBlockIds(filter) Future.sequence( - allAliveBlockManagerInfos.map { info => + blockManagerInfo.values.map { info => val future = if (askSlaves) { info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) @@ -452,10 +433,9 @@ class BlockManagerMasterEndpoint( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) - - listenerBus.post(SparkListenerBlockManagerAdded(time, id, - maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, + Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) id } @@ -525,7 +505,7 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { Option(blockStatusByShuffleService(bmId).get(blockId)) } else { - aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) + blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId)) } } @@ -536,7 +516,8 @@ class BlockManagerMasterEndpoint( // can be used to access this block even when the original executor is already stopped. loc.host == requesterHost && (loc.port == externalShuffleServicePort || - aliveBlockManagerInfo(loc) + blockManagerInfo + .get(loc) .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) .getOrElse(false)) }.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) } @@ -553,7 +534,7 @@ class BlockManagerMasterEndpoint( /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet + val blockManagerIds = blockManagerInfo.keySet if (blockManagerIds.contains(blockManagerId)) { blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { @@ -567,7 +548,7 @@ class BlockManagerMasterEndpoint( private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); - info <- aliveBlockManagerInfo(blockManagerId) + info <- blockManagerInfo.get(blockManagerId) ) yield { info.slaveEndpoint } @@ -575,27 +556,7 @@ class BlockManagerMasterEndpoint( override def onStop(): Unit = { askThreadPool.shutdownNow() - blockManagerInfoCleaner.shutdownNow() - } - - private def cleanBlockManagerInfo(): Unit = { - logDebug("Cleaning blockManagerInfo") - val now = System.currentTimeMillis() - val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) => - // bmInfo.executorRemovalTs.get cannot be None when BM is not alive - !bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs - }.keys - expiredBmIds.foreach { bmId => - logInfo(s"Cleaning expired $bmId from blockManagerInfo") - blockManagerInfo.remove(bmId) - } } - - @inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] = - blockManagerInfo.get(bmId).filter(_.isAlive) - - @inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] = - blockManagerInfo.values.filter(_.isAlive) } @DeveloperApi @@ -623,7 +584,6 @@ private[spark] class BlockManagerInfo( private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem - private var _executorRemovalTs: Option[Long] = None // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] @@ -739,16 +699,4 @@ private[spark] class BlockManagerInfo( def clear(): Unit = { _blocks.clear() } - - def executorRemovalTs: Option[Long] = _executorRemovalTs - - def isAlive: Boolean = _executorRemovalTs.isEmpty - - def setExecutorRemovalTs(): Unit = { - if (!isAlive) { - logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}") - } else { - _executorRemovalTs = Some(System.currentTimeMillis()) - } - } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 137d68116b0d5..b13ce4c9d550c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2934,13 +2934,6 @@ private[spark] object Utils extends Logging { props.forEach((k, v) => resultProps.put(k, v)) resultProps } - - def executorTimeoutMs(conf: SparkConf): Long = { - // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses - // "milliseconds" - conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT) - .getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s")) - } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8246da656cd25..9cf531383ce31 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration._ -import scala.language.{implicitConversions, postfixOps} +import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils @@ -93,7 +93,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(MEMORY_STORAGE_FRACTION, 0.999) .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) - .set(STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "5s") } private def makeBlockManager( @@ -482,7 +481,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt()) } - test("no reregistration on heart beat until executor timeout") { + test("reregistration on heart beat") { val store = makeBlockManager(2000) val a1 = new Array[Byte](400) @@ -493,15 +492,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister == false, "master told to re-register") - - eventually(timeout(10 seconds), interval(1 seconds)) { - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( - BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister, "master did not tell to re-register") - } + assert(reregister) } test("reregistration on block update") { @@ -515,12 +509,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - eventually(timeout(10 seconds), interval(1 seconds)) { - val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( - BlockManagerHeartbeat(store.blockManagerId)) - assert(reregister, "master did not tell to re-register") - } - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister()