-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8425][CORE] Application Level Blacklisting #14079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1. create new BlacklistTracker and BlacklistStrategy interface to support complex use case for blacklist mechanism. 2. make Yarn allocator aware of node blacklist information 3. three strategies implemented for convenience, also user can define his own strategy SingleTaskStrategy: remain default behavior before this change. AdvanceSingleTaskStrategy: enhance SingleTaskStrategy by supporting stage level node blacklist ExecutorAndNodeStrategy: different taskSet can share blacklist information.
|
@kayousterhout @markhamstra @tgravescs @mwws I finally this is ready for review. I have some minor updates left but I wanted to get this in your hands now. The main thing is testing on a cluster (would appreciate any input from you on this as well Tom). One big change in implementation I'd like to highlight: the blacklisttracker no longer requires locks. Though its accessed by multiple threads, its (almost) always from some place in TaskSschedulerImpl, which already has a lock on the taskScheduler. This also requires expiring executors while we're doing other work (rather than in a background thread) -- I chose to do it inside the call to The one exception to having a lock on taskScheduler is the YarnBackend -- it needs the full set of blacklisted nodes, and it does that without a lock a on the task scheduler. But this was pretty easy to workaround. I'll drop a few inline comments as well. |
|
Test build #61873 has finished for PR 14079 at commit
|
| } else { | ||
| blacklistTracker.taskSetFailed(manager.taskSet.stageId) | ||
| logInfo(s"Removed TaskSet ${manager.taskSet.id}, since it failed, from pool" + | ||
| s" ${manager.parent.name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the log msg is unrelated to blacklisting, but this msg had always annoyed / confused me earlier, so I thought it was worth updating since i needed success anyway.
|
I took another look at having BlacklistTracker just be an option, rather than having a NoopBlacklist. After some other cleanup, I decided it made more sense to go back to the option, but its in one commit so easy to go either way a34e9ae |
|
Test build #61931 has finished for PR 14079 at commit
|
| indexInTaskSet: Int): Boolean = { | ||
| // intentionally avoiding .getOrElse(..., new HashMap()) to avoid lots of object | ||
| // creation, since this method gets called a *lot* | ||
| stageIdToExecToFailures.get(stageId) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if something like this isn't easier to follow:
stageIdToExecToFailures.get(stageId)
.flatMap(_.get(executorId))
.map(_.failuresByTask.contains(indexInTaskSet))
.getOrElse(false)
|
thanks for the review @kayousterhout. I also added a testcase to BlacklistTrackerSuite, "task failure timeout works as expected for long-running tasksets" to cover your point about the long running tasksets. |
| val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten | ||
| firstTaskAttempts.foreach { task => logInfo(s"scheduled $task on ${task.executorId}") } | ||
| assert(firstTaskAttempts.isEmpty) | ||
| assert(firstTaskAttempts.size === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also check that the executor ID is executor4?
|
Test build #70102 has finished for PR 14079 at commit
|
|
Test build #70120 has finished for PR 14079 at commit
|
|
Jenkins, retest this please |
|
Test build #70122 has finished for PR 14079 at commit
|
There is a small race in SchedulerIntegrationSuite. The test assumes that the taskscheduler thread processing that last task will finish before the DAGScheduler processes the task event and notifies the job waiter, but that is not 100% guaranteed. ran the test locally a bunch of times, never failed, though admittedly it never failed locally for me before either. However I am nearly 100% certain this is what caused the failure of one jenkins build https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/68694/consoleFull (which is long gone now, sorry -- I fixed it as part of apache#14079 initially) Author: Imran Rashid <irashid@cloudera.com> Closes apache#16270 from squito/sched_integ_flakiness.
|
LGTM!!!!! 🎉 🎉 🎉 🎉 🎉 Nice work on this -- this will be awesome to have in. |
|
Great! I've been working on some additional changes on top of this: UI representation for blacklisted executors (SPARK-16654), and implicit killing of blacklisted executors (SPARK-16554). I'll be sending pull requests for those soon after this is merged. |
|
Test build #70194 has finished for PR 14079 at commit
|
|
thanks @kayousterhout ! appreciate all the time you've spent helping out on this issue. merged to master |
|
@squito Scala 2.10 is broken: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/3394/console Could you submit a PR to fix it? Thanks! |
There is a small race in SchedulerIntegrationSuite. The test assumes that the taskscheduler thread processing that last task will finish before the DAGScheduler processes the task event and notifies the job waiter, but that is not 100% guaranteed. ran the test locally a bunch of times, never failed, though admittedly it never failed locally for me before either. However I am nearly 100% certain this is what caused the failure of one jenkins build https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/68694/consoleFull (which is long gone now, sorry -- I fixed it as part of apache#14079 initially) Author: Imran Rashid <irashid@cloudera.com> Closes apache#16270 from squito/sched_integ_flakiness.
## What changes were proposed in this pull request? This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. ## How was this patch tested? Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes apache#14079 from squito/blacklist-SPARK-8425.
There is a small race in SchedulerIntegrationSuite. The test assumes that the taskscheduler thread processing that last task will finish before the DAGScheduler processes the task event and notifies the job waiter, but that is not 100% guaranteed. ran the test locally a bunch of times, never failed, though admittedly it never failed locally for me before either. However I am nearly 100% certain this is what caused the failure of one jenkins build https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/68694/consoleFull (which is long gone now, sorry -- I fixed it as part of apache#14079 initially) Author: Imran Rashid <irashid@cloudera.com> Closes apache#16270 from squito/sched_integ_flakiness.
## What changes were proposed in this pull request? This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. ## How was this patch tested? Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes apache#14079 from squito/blacklist-SPARK-8425.
This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira. Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness. The added tests include: - verifying BlacklistTracker works correctly - verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker) - an integration test for the entire scheduler with blacklisting in a few different scenarios Author: Imran Rashid <irashid@cloudera.com> Author: mwws <wei.mao@intel.com> Closes apache#14079 from squito/blacklist-SPARK-8425.
What changes were proposed in this pull request?
This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application. Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout. Full details are available in a design doc attached to the jira.
How was this patch tested?
Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness.
The added tests include: