-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33799][CORE] Handle excluded executors/nodes in ExecutorMonitor #30795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @tgravescs @mridulm @jiangxb1987 @attilapiros Cloud you please take a look? |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #132865 has finished for PR 30795 at commit
|
|
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is registered as Improvement, but the content seems to be needed at branch-3.1. Are you targeting this at Apache Spark 3.1.0? If then, please adjust the JIRA accordingly, @Ngone51 .
cc @HyukjinKwon since he is the release manager of Apache Spark 3.1.0.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132904 has finished for PR 30795 at commit
|
|
I don't think we should port this to |
|
@dongjoon-hyun @HyukjinKwon Thanks for correcting. |
|
so there were changes to attempt to help with this dealing with unschedulableTaskSets that went in a while ago. seems like they are trying to solve similar things so we should see overlap. I don't know I'll have time to review today but will try tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @Ngone51 !
Also, +CC @tgravescs
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
Outdated
Show resolved
Hide resolved
Yeah, I also noticed the change(#28287). This PR can reduce the times of getting into the bad situation mentioned in #28287. As this PR essentially replaces those excluded executors with new healthy executors. Thus, a taskset can get more opportunities to launch tasks. Besides, we'd launch the new healthy executor early in this PR comparing to the solution in #28287, which helps improve the scheduling efficiency. However, this PR can not 100% replace #28287. Because we don't handle cc @venkata91 FYI |
|
@mridulm Thanks for the review. Have addressed the comments. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #132991 has finished for PR 30795 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #133004 has finished for PR 30795 at commit
|
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point I would really like to see the dynamic allocation manager put into the scheduler itself. We keep adding things like this where we are just tracking more and more things twice and disconnected via message passing which could be dropped. This is one reason in the previous PR to help this I said it should really have more knowledge of the excluded listings. that is obviously a bunch of work though.
Maybe I'm missing something here, but I think this also has an issue with the removal. since you changed the definition of executorCountWithResourceProfile to not include the excluded nodes, if the excluded nodes idle timeout we could be keeping around those excluded nodes. ie min=3, we have 5 active and 2 are excluded., we think only 3 so the 2 are never removed. I think we want to take the excluded nodes into account here and remove them if idle. ie see removeExecutors in ExecutorAllocationManager.
| // Increase the maxNumExecutors by adding the excluded executors so that manager can | ||
| // launch new executors to replace the excluded executors. | ||
| val exclude = executorMonitor.excludedExecutorCount | ||
| val maxOverheadExecutors = maxNumExecutors + exclude |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I don't agree with this, at least not how its defined. The user defined the maximum number of executors to use, this is getting more than that. I realize that some are excluded, but this also comes down to a resource utilization question as well. If I am in multi-tenant environment, I want to make sure 1 job doesn't take over the entire cluster. max is one way to do this. I think we would either need to redefine this, which isn't great for backwards compatibility and could result in unexpected behavior or we add another config that is around the excluded nodes. this would either just be an allow to go over or a allow to go over by X. The downside to this is default would be 0 or false so you would have to configure if you do set max and want to use this feature. But I don't see a lot of jobs setting max unless they are trying to be nice in multi-tenant so it seems ok as long as its in release notes, etc.
you will notice the other logic for unschedulableTaskSets does not increase this, just increases the number we ask for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me. Add an extra conf would be a good choice.
Although, I'm rethinking this change. It only takes effect when users set the max explicitly and the cluster reaches the max.( By default, max is Int.MaxValue. So we won't reach the max normally.) However, we still want to replace those excluded executors even if the cluster doesn't reach the max. For example, max/2 may be enough for task scheduling. And TaskScheduler also thinks there're max/2 executors without realizing X executors actually excluded.
So I think what we actually need here is to forcibly replace excluded executors when dynamic allocation & exclusion (but not kill) are both enabled. And it should not be related to the max value.
| var pendingRemoval: Boolean = false | ||
| var decommissioning: Boolean = false | ||
| var hasActiveShuffle: Boolean = false | ||
| // whether the executor is temporarily excluded by the `HealthTracker` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should expand this to state excluded for entire application (I realize HealthTracker implies this but would like to be more explicit), does not include excluded within the stage level.
|
I completely agree, we should look towards merging DRA into scheduler - the async eventing is not helping matters, and frankly has outlived its usefulness. |
Yea. I have mentioned this issue in the PR description. In this case, the better way is to remove excluded executors first. I thought it could be a follow-up if this PR gets approved. |
+1. |
|
catching up from vacation, I think this still needs the comments addressed so just ping me once it's ready. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR proposes to handle exclusion events in
ExecutorMonitorso it doesn't count excluded executors as available executors for running tasks.The main change includes:
onExecutorExcluded/onExecutorUnexcluded/onNodeExcluded/onNodeUnexcludedinsidesExecutorMonitor.ExecutorAllocationManagerto request at most (maxNumExecutors+excludeExecutors)Note that this improvement only tasks effects when both dynamic allocation and exclusion features are enabled but with
spark.excludeOnFailure.killExcludedExecutors=false. We don't want to handle the exclude executors specifically when we do kill excluded executors. Because in that case, we assume that there would be new executors launched later to replace those killed executors.Why are the changes needed?
Currently, the excluded executors are counted as available executors for running tasks. But that's not correct since the
TaskSchedulernever schedules tasks on those excluded executors. As a result, it can lower the scheduling efficiency of theTaskScheduler. In the worst case, the TaskSet can not be scheduled anywhere and it then has to go throughgetCompletelyExcludedTaskIfAny(...)path which is inefficient.This PR makes the Spark be aware of the lack of executors at dynamic allocation level. So we can launch the new executors early before the
TaskSchedulerrealizes the problem, which could ease the worst case and improve scheduling efficiency.Besides, this also prevents the
ExecutorAllocationManagerfrom going into the fakeminExecutorstatus when removing idle executors. For example, when we have 5 executors (2 excluded) and minExecutor=3, and we need to remove 2 idle but not exluded executors. Then, we'd have 3 executors with 2 excluded executor at the end and only one executor can launch tasks indeed. (this worths a followup to kill the idle-exclude-executor first if this PR gets approved). And this PR could avoid the problem since we'd remove the excluded executors in first place.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.