Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,14 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
reportAllBlocks()
} else {
logError("Exiting executor due to block manager re-registration failure")
System.exit(-1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to terminate in case of INVALID_EXECUTOR_ID ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So ideally, a lost executor should be terminated in the end anyways (whether killed by the driver or exit itself proactively)...if the lost executor fails to terminate, there must be something wrong with it. So I think terminate here won't make any difference to the result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is failing to terminate, every heartbeat will end up returning BlockManagerId.INVALID_EXECUTOR_ID from driver right ?
Wondering if there is a reason to keep it around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note - if we do change this - it is introduction of a new code path.
So I dont want to block the PR on this discussion - but would be good to understand this case better :-)

Copy link
Member Author

@Ngone51 Ngone51 Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case we met is that the executor failed to be killed by both StopExecutor and ExecutorRunner.killProcess. After second thinking, maybe killing (via System.exit) from inside would give it one more chance to save this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the termination logic here. cc @mridulm

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the termination logic here. cc @mridulm

Hi @Ngone51, for our spark structured streaming jobs, we have seen a lot of executor getting killed by this termination logic with error code -1. which will eventually kill the driver due to the spark.max.executor.failures configurations. We've been seeing this very frequent for a streaming job that perform aggressive scaling as they mark a lot of executors idle at the same time. Just want to ask why are we using -1 error code here if the exeuctor is already killing itself?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,6 @@ private[spark] object BlockManagerId {
}

private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"

private[spark] val INVALID_EXECUTOR_ID = "invalid"
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,25 @@ class BlockManagerMaster(
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
storageEndpoint: RpcEndpointRef): BlockManagerId = {
storageEndpoint: RpcEndpointRef,
isReRegister: Boolean = false): BlockManagerId = {
logInfo(s"Registering BlockManager $id")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint))
logInfo(s"Registered BlockManager $updatedId")
RegisterBlockManager(
id,
localDirs,
maxOnHeapMemSize,
maxOffHeapMemSize,
storageEndpoint,
isReRegister
)
)
if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) {
assert(isReRegister, "Got invalid executor id from non re-register case")
logInfo(s"Re-register BlockManager $id failed")
} else {
logInfo(s"Registered BlockManager $updatedId")
}
updatedId
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ class BlockManagerMasterEndpoint(
RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) =>
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint))
case RegisterBlockManager(
id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) =>
context.reply(
register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister))

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
Expand Down Expand Up @@ -572,7 +574,8 @@ class BlockManagerMasterEndpoint(
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
storageEndpoint: RpcEndpointRef): BlockManagerId = {
storageEndpoint: RpcEndpointRef,
isReRegister: Boolean): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
val id = BlockManagerId(
Expand All @@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(

val time = System.currentTimeMillis()
executorIdToLocalDirs.put(id.executorId, localDirs)
if (!blockManagerInfo.contains(id)) {
// SPARK-41360: For the block manager re-registration, we should only allow it when
// the executor is recognized as active by the scheduler backend. Otherwise, this kind
// of re-registration from the terminating/stopped executor is meaningless and harmful.
lazy val isExecutorAlive =
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to be careful with this change.
Scheduler backend does call into block manager master - but as of today, these are nonblocking calls. So this sync call is fine right now - but can become a potential deadlock as the code evolves.

if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {
Copy link
Contributor

@mridulm mridulm Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ? Or are there other cases as well ?
Else a transient network partition can result in loosing all executors ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ?

What do you mean? The terminating executor can detect itself as being terminating?

Actually, not only the terminating executors, executors lost due to long GC or executors who failed to be killed by the driver (where the executor could be an orphan rather than terminated ) are also applied here as long as the case is considered executor lost by the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShutdownHookManager.inShutdown should tell if it is in the process of shutting down - and prevent call to reregister ?

For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for reregisteration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShutdownHookManager.inShutdown should tell if it is in the process of shutting down - and prevent call to reregister ?

Ok, i see.

For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for registration.

Note that there's a prerequisite of the re-registration in this PR that the executor should already be lost in the driver's view. In that case, block manager re-registration is meaningless since the executor won't reconnect to the driver.

Copy link
Contributor

@mridulm mridulm Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)

(Sorry for the delay, I will get back to this PR later this week - not in good health)

Copy link
Member Author

@Ngone51 Ngone51 Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)

It will. But in these cases, the driver could fail to kill the executor.

(@mridulm No worries, take care:))

blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
Expand Down Expand Up @@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
val updatedId = if (isReRegister && !isExecutorAlive) {
assert(!blockManagerInfo.contains(id),
"BlockManager re-registration shouldn't succeed when the executor is lost")
Copy link
Contributor

@mridulm mridulm Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assertion need to always hold ?
I will need to relook at the code a bit, but I vaguely think there are corner cases here ... might be good to check up on this. (I will too next week).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assertion need to always hold ?

It does.


logInfo(s"BlockManager ($id) re-registration is rejected since " +
s"the executor (${id.executorId}) has been lost")

// Use "invalid" as the return executor id to indicate the block manager that
// re-registration failed. It's a bit hacky but fine since the returned block
// manager id won't be accessed in the case of re-registration. And we'll use
// this "invalid" executor id to print better logs and avoid blocks reporting.
BlockManagerId(
BlockManagerId.INVALID_EXECUTOR_ID,
id.host,
id.port,
id.topologyInfo)
} else {
id
}
updatedId
}

private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
sender: RpcEndpointRef)
sender: RpcEndpointRef,
isReRegister: Boolean)
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why and how but seems like branch-3.3 complains about this in MiMa binary compatibility test (https://github.com/apache/spark/actions/runs/3683054520/jobs/6231260546).

[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.2.0! Found 4 potential problems (filtered 921)
[error]  * method copy(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.copy")
[error]  * method this(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)Unit in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.this")
[error]  * the type hierarchy of object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager is different in current version. Missing types {scala.runtime.AbstractFunction5}
[error]    filter with: ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager$")
[error]  * method apply(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me just make a quick followup to fix this in all branches. This was detected with Scala 2.13 FWIW.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here: #39052

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HyukjinKwon

extends ToBlockManagerMaster

case class UpdateBlockInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
eventually(timeout(5.seconds)) {
// make sure both bm1 and bm2 are registered at driver side BlockManagerMaster
verify(master, times(2))
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
assert(driverEndpoint.askSync[Boolean](
CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId)))
assert(driverEndpoint.askSync[Boolean](
Expand Down Expand Up @@ -361,6 +361,44 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
master.removeShuffle(0, true)
}

test("SPARK-41360: Avoid block manager re-registration if the executor has been lost") {
// Set up a DriverEndpoint which always returns isExecutorAlive=false
rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
new RpcEndpoint {
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) =>
context.reply(true)
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
// always return false
context.reply(false)
}
}
)

// Set up a block manager endpoint and endpoint reference
val bmRef = rpcEnv.setupEndpoint(s"bm-0", new RpcEndpoint {
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv

private def reply[T](context: RpcCallContext, response: T): Unit = {
context.reply(response)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RemoveRdd(_) => reply(context, 1)
case RemoveBroadcast(_, _) => reply(context, 1)
case RemoveShuffle(_) => reply(context, true)
}
})
val bmId = BlockManagerId(s"exec-0", "localhost", 1234, None)
// Register the block manager with isReRegister = true
val updatedId = master.registerBlockManager(
bmId, Array.empty, 2000, 0, bmRef, isReRegister = true)
// The re-registration should fail since the executor is considered as dead by DriverEndpoint
assert(updatedId.executorId === BlockManagerId.INVALID_EXECUTOR_ID)
}

test("StorageLevel object caching") {
val level1 = StorageLevel(false, false, false, 3)
// this should return the same object as level1
Expand Down Expand Up @@ -669,6 +707,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)

// Set up a DriverEndpoint which simulates the executor is alive (required by SPARK-41360)
rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
new RpcEndpoint {
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
if (executorId == store.blockManagerId.executorId) {
context.reply(true)
} else {
context.reply(false)
}
}
}
)

store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")

Expand Down Expand Up @@ -2207,7 +2261,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
}.getMessage
assert(e.contains("TimeoutException"))
verify(master, times(0))
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
server.close()
}
}
Expand Down