-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost #38876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // of re-registration from the terminating/stopped executor is meaningless and harmful. | ||
| lazy val isExecutorAlive = | ||
| driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId)) | ||
| if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ? Or are there other cases as well ?
Else a transient network partition can result in loosing all executors ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is an issue only for terminating executors, we can detect that in executor side and propagate it in the registration request right ?
What do you mean? The terminating executor can detect itself as being terminating?
Actually, not only the terminating executors, executors lost due to long GC or executors who failed to be killed by the driver (where the executor could be an orphan rather than terminated ) are also applied here as long as the case is considered executor lost by the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShutdownHookManager.inShutdown should tell if it is in the process of shutting down - and prevent call to reregister ?
For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for reregisteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShutdownHookManager.inShutdown should tell if it is in the process of shutting down - and prevent call to reregister ?
Ok, i see.
For the other cases, lost due to long GC, lost due to network partitions, etc - they are legitimate candidates for registration.
Note that there's a prerequisite of the re-registration in this PR that the executor should already be lost in the driver's view. In that case, block manager re-registration is meaningless since the executor won't reconnect to the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)
(Sorry for the delay, I will get back to this PR later this week - not in good health)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont the driver not remove in case of a heartbeat expiry even though the executor did not disconnect ? (for the same reasons as above - long gc pause, network partition, etc)
It will. But in these cases, the driver could fail to kill the executor.
(@mridulm No worries, take care:))
| // the executor is recognized as active by the scheduler backend. Otherwise, this kind | ||
| // of re-registration from the terminating/stopped executor is meaningless and harmful. | ||
| lazy val isExecutorAlive = | ||
| driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to be careful with this change.
Scheduler backend does call into block manager master - but as of today, these are nonblocking calls. So this sync call is fine right now - but can become a potential deadlock as the code evolves.
| id | ||
| val updatedId = if (isReRegister && !isExecutorAlive) { | ||
| assert(!blockManagerInfo.contains(id), | ||
| "BlockManager re-registration shouldn't succeed when the executor is lost") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this assertion need to always hold ?
I will need to relook at the code a bit, but I vaguely think there are corner cases here ... might be good to check up on this. (I will too next week).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this assertion need to always hold ?
It does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @Ngone51 !
Left a few comments - I definitely need to spend some time and look at it in more detail next week, so any thoughts from you to elaborate on the change would be very helpful.
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Would be good if someone else also takes a look - given how sensitive this change can become.
Hopefully @JoshRosen or @attilapiros can also take a look.
| maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true) | ||
| if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) { | ||
| reportAllBlocks() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to terminate in case of INVALID_EXECUTOR_ID ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. So ideally, a lost executor should be terminated in the end anyways (whether killed by the driver or exit itself proactively)...if the lost executor fails to terminate, there must be something wrong with it. So I think terminate here won't make any difference to the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is failing to terminate, every heartbeat will end up returning BlockManagerId.INVALID_EXECUTOR_ID from driver right ?
Wondering if there is a reason to keep it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note - if we do change this - it is introduction of a new code path.
So I dont want to block the PR on this discussion - but would be good to understand this case better :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case we met is that the executor failed to be killed by both StopExecutor and ExecutorRunner.killProcess. After second thinking, maybe killing (via System.exit) from inside would give it one more chance to save this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the termination logic here. cc @mridulm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the termination logic here. cc @mridulm
Hi @Ngone51, for our spark structured streaming jobs, we have seen a lot of executor getting killed by this termination logic with error code -1. which will eventually kill the driver due to the spark.max.executor.failures configurations. We've been seeing this very frequent for a streaming job that perform aggressive scaling as they mark a lot of executors idle at the same time. Just want to ask why are we using -1 error code here if the exeuctor is already killing itself?
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for fixing it!
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this @Ngone51 !
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
|
Merged to master, branch-3.3 and branch-3.2 |
|
Thank you, @Ngone51 , @mridulm , @jiangxb1987 . |
| maxOffHeapMemSize: Long, | ||
| sender: RpcEndpointRef) | ||
| sender: RpcEndpointRef, | ||
| isReRegister: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why and how but seems like branch-3.3 complains about this in MiMa binary compatibility test (https://github.com/apache/spark/actions/runs/3683054520/jobs/6231260546).
[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.2.0! Found 4 potential problems (filtered 921)
[error] * method copy(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error] filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.copy")
[error] * method this(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)Unit in class org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error] filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.this")
[error] * the type hierarchy of object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager is different in current version. Missing types {scala.runtime.AbstractFunction5}
[error] filter with: ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager$")
[error] * method apply(org.apache.spark.storage.BlockManagerId,Array[java.lang.String],Long,Long,org.apache.spark.rpc.RpcEndpointRef)org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager in object org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager does not have a correspondent in current version
[error] filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me just make a quick followup to fix this in all branches. This was detected with Scala 2.13 FWIW.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here: #39052
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon
…gisterBlockManager in MiMa ### What changes were proposed in this pull request? This PR is a followup of #38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. ### Why are the changes needed? It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Filters copied from error messages. Will monitor the build in other branches. Closes #39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…gisterBlockManager in MiMa This PR is a followup of #38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. No, dev-only. Filters copied from error messages. Will monitor the build in other branches. Closes #39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit a2ceff2) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…gisterBlockManager in MiMa This PR is a followup of #38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. No, dev-only. Filters copied from error messages. Will monitor the build in other branches. Closes #39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit a2ceff2) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…gisterBlockManager in MiMa ### What changes were proposed in this pull request? This PR is a followup of apache#38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. ### Why are the changes needed? It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Filters copied from error messages. Will monitor the build in other branches. Closes apache#39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…r has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](apache#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…gisterBlockManager in MiMa This PR is a followup of apache#38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. No, dev-only. Filters copied from error messages. Will monitor the build in other branches. Closes apache#39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit a2ceff2) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend.
Along with the major proposal, this PR also includes a few other changes:
SparkListenerBlockManagerAddedevent when the registration succeedsWhy are the changes needed?
BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), block fetching to the dead executor. And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost.
Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in
blockManagerInfo. And the block manager will only be removed fromblockManagerInfowhether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test