From 3aba9d2bfed3b3a58dcd536886ea2de2dbfd8c62 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 2 Dec 2022 15:53:47 +0800 Subject: [PATCH 1/3] fix --- .../apache/spark/storage/BlockManager.scala | 8 ++-- .../apache/spark/storage/BlockManagerId.scala | 2 + .../spark/storage/BlockManagerMaster.scala | 20 +++++++-- .../storage/BlockManagerMasterEndpoint.scala | 41 +++++++++++++++---- .../spark/storage/BlockManagerMessages.scala | 3 +- .../spark/storage/BlockManagerSuite.scala | 38 +++++++++++++++++ 6 files changed, 98 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d5fde96b146b..3fd8f8770f01 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -637,9 +637,11 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, - maxOffHeapMemory, storageEndpoint) - reportAllBlocks() + val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, + maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true) + if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) { + reportAllBlocks() + } } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index c6a4457d8f91..12e416bbb368 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -147,4 +147,6 @@ private[spark] object BlockManagerId { } private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger" + + private[spark] val INVALID_EXECUTOR_ID = "invalid" } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 40008e6afbff..0ee3dc249d5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -74,11 +74,25 @@ class BlockManagerMaster( localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - storageEndpoint: RpcEndpointRef): BlockManagerId = { + storageEndpoint: RpcEndpointRef, + isReRegister: Boolean = false): BlockManagerId = { logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint)) - logInfo(s"Registered BlockManager $updatedId") + RegisterBlockManager( + id, + localDirs, + maxOnHeapMemSize, + maxOffHeapMemSize, + storageEndpoint, + isReRegister + ) + ) + if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) { + assert(isReRegister, "Got invalid executor id from non re-register case") + logInfo(s"Re-register BlockManager $id failed") + } else { + logInfo(s"Registered BlockManager $updatedId") + } updatedId } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index adeb507941c0..486360c04cf6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -117,8 +117,10 @@ class BlockManagerMasterEndpoint( RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) => - context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint)) + case RegisterBlockManager( + id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) => + context.reply( + register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -572,7 +574,8 @@ class BlockManagerMasterEndpoint( localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - storageEndpoint: RpcEndpointRef): BlockManagerId = { + storageEndpoint: RpcEndpointRef, + isReRegister: Boolean): BlockManagerId = { // the dummy id is not expected to contain the topology information. // we get that info here and respond back with a more fleshed out block manager id val id = BlockManagerId( @@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint( val time = System.currentTimeMillis() executorIdToLocalDirs.put(id.executorId, localDirs) - if (!blockManagerInfo.contains(id)) { + // SPARK-41360: For the block manager re-registration, we should only allow it when + // the executor is recognized as active by the scheduler backend. Otherwise, this kind + // of re-registration from the terminating/stopped executor is meaningless and harmful. + lazy val isExecutorAlive = + driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId)) + if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) @@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint( if (pushBasedShuffleEnabled) { addMergerLocation(id) } + listenerBus.post(SparkListenerBlockManagerAdded(time, id, + maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, - Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) - id + val updatedId = if (isReRegister && !isExecutorAlive) { + assert(!blockManagerInfo.contains(id), + "BlockManager re-registration shouldn't succeed when the executor is lost") + + logInfo(s"BlockManager ($id) re-registration is rejected since " + + s"the executor (${id.executorId}) has been lost") + + // Use "invalid" as the return executor id to indicate the block manager that + // re-registration failed. It's a bit hacky but fine since the returned block + // manager id won't be accessed in the case of re-registration. And we'll use + // this "invalid" executor id to print better logs and avoid blocks reporting. + BlockManagerId( + BlockManagerId.INVALID_EXECUTOR_ID, + id.host, + id.port, + id.topologyInfo) + } else { + id + } + updatedId } private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index afe416a55ed0..e047b61fcb1c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages { localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - sender: RpcEndpointRef) + sender: RpcEndpointRef, + isReRegister: Boolean) extends ToBlockManagerMaster case class UpdateBlockInfo( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c8914761b949..23a652daa378 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -361,6 +361,44 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe master.removeShuffle(0, true) } + test("SPARK-41360: Avoid block manager re-registration if the executor has been lost") { + // Set up a DriverEndpoint which always returns isExecutorAlive=false + rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, + new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) => + context.reply(true) + case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) => + // always return false + context.reply(false) + } + } + ) + + // Set up a block manager endpoint and endpoint reference + val bmRef = rpcEnv.setupEndpoint(s"bm-0", new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + private def reply[T](context: RpcCallContext, response: T): Unit = { + context.reply(response) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RemoveRdd(_) => reply(context, 1) + case RemoveBroadcast(_, _) => reply(context, 1) + case RemoveShuffle(_) => reply(context, true) + } + }) + val bmId = BlockManagerId(s"exec-0", "localhost", 1234, None) + // Register the block manager with isReRegister = true + val updatedId = master.registerBlockManager( + bmId, Array.empty, 2000, 0, bmRef, isReRegister = true) + // The re-registration should fail since the executor is considered as dead by DriverEndpoint + assert(updatedId.executorId === BlockManagerId.INVALID_EXECUTOR_ID) + } + test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, 3) // this should return the same object as level1 From 757fb4795c54a2e141ac759925e9621ef6b6fd03 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 5 Dec 2022 11:11:46 +0800 Subject: [PATCH 2/3] fix tests --- .../spark/storage/BlockManagerSuite.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 23a652daa378..842b66193f29 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -295,7 +295,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe eventually(timeout(5.seconds)) { // make sure both bm1 and bm2 are registered at driver side BlockManagerMaster verify(master, times(2)) - .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) assert(driverEndpoint.askSync[Boolean]( CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId))) assert(driverEndpoint.askSync[Boolean]( @@ -707,6 +707,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) + // Set up a DriverEndpoint which simulates the executor is alive (required by SPARK-41360) + rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, + new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) => + if (executorId == store.blockManagerId.executorId) { + context.reply(true) + } else { + context.reply(false) + } + } + } + ) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -2245,7 +2261,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe }.getMessage assert(e.contains("TimeoutException")) verify(master, times(0)) - .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) server.close() } } From 13d7d18bf64dd9332e4742216badec5a6b90b7af Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Sat, 10 Dec 2022 11:38:12 +0800 Subject: [PATCH 3/3] exit executor on reregistration failure --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3fd8f8770f01..1067ee155673 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -641,6 +641,9 @@ private[spark] class BlockManager( maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true) if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) { reportAllBlocks() + } else { + logError("Exiting executor due to block manager re-registration failure") + System.exit(-1) } }