Skip to content

Conversation

@ericl
Copy link
Contributor

@ericl ericl commented Mar 4, 2017

What changes were proposed in this pull request?

This refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as (N killed: $reason) and TaskKilled: $reason. Without this change, there is no way to provide the user feedback through the UI.

Currently used reasons are "stage cancelled", "another attempt succeeded", and "killed via SparkContext.killTask". The user can also specify a custom reason through SparkContext.killTask.

cc @rxin

In the stage overview UI the reasons are summarized:
1

Within the stage UI you can see individual task kill reasons:
2

How was this patch tested?

Existing tests, tried killing some stages in the UI and verified the messages are as expected.

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73912 has finished for PR 17166 at commit e9178b6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason
  • case class KillTask(

@SparkQA
Copy link

SparkQA commented Mar 5, 2017

Test build #73913 has finished for PR 17166 at commit ba7cbd0.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 5, 2017

Test build #73916 has finished for PR 17166 at commit 1a716aa.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason
  • case class KillTask(

fix compile

really fix compile
@SparkQA
Copy link

SparkQA commented Mar 5, 2017

Test build #73917 has finished for PR 17166 at commit 91b8aef.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason
  • case class KillTask(

@mridulm
Copy link
Contributor

mridulm commented Mar 5, 2017

What is the rationale for this change ? Is it to propagate the task kill reason to UI ?
The one line in https://github.com/apache/spark/pull/17166/files#diff-b8adb646ef90f616c34eb5c98d1ebd16R357.
Or did I miss some other use for this ?

@ericl
Copy link
Contributor Author

ericl commented Mar 5, 2017

Yes (updated the pr description) -- without this change, there is no way to provide the user feedback through the UI.

This also lets you choose whether the task will be rescheduled.

@mridulm
Copy link
Contributor

mridulm commented Mar 5, 2017

If I did not miss it, there is no way for user to provide this information currently, right ?
Or is that coming in a subsequent PR ?

@ericl
Copy link
Contributor Author

ericl commented Mar 5, 2017 via email

@mridulm
Copy link
Contributor

mridulm commented Mar 6, 2017

In that case, let us make it a more complete PR - with the proposed api changes included - so that we can evaluate the merits of the change in total.

@ericl
Copy link
Contributor Author

ericl commented Mar 7, 2017

Added killTask(id: TaskId, reason: String) to SparkContext and a corresponding test.

cc @JoshRosen for the API changes. As discussed offline, it's very hard to preserve binary compatibility here since we have to move from a case object to a case class to add a reason.

* @param shouldRetry Whether the scheduler should retry the task.
*/
def killTask(
taskId: Long, executorId: String, interruptThread: Boolean, reason: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the convention defined in the "Indentation" section at http://spark.apache.org/contributing.html for long parameter lists. This happens in a bunch of methods in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@DeveloperApi
case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason {
Copy link
Contributor

@JoshRosen JoshRosen Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a little weird for shouldRetry to be part of this interface: whether a killed task should be retried is already handled by existing scheduling logic and it's a little confusing to be setting it on a per-task basis rather than, say, having different events which are used to distinguish between speculative tasks being killed and jobs being canceled/aborted. Basically, it's confusing to me because this isn't the source-of-truth on whether we can/will re-try and its purpose here is therefore a bit unclear to me.

As discussed offline, I think that we may be able to update the logic in TaskSchedulerImpl.handleFailedTask (the only place which reads this field) in order to eliminate the need for this field in this class.

Copy link
Contributor Author

@ericl ericl Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout does removing this check seem safe to you? It looks like the only case taskState != TaskState.KILLED guards against here is cancelled speculative tasks. Since those are relatively rare, it seems ok to call revive offers in those cases unconditionally. Tasks from cancelled stages and jobs should still be dropped here by the remaining isZombie check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to do some git blaming to be 100% sure this is OK...I'll take a look tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc killed was always used internally for killing tasks without needing retry - hence the check.
Good point @kayousterhout , we might need to revisit use of KILLED to ensure this does not break (now with dev invocation of killed) which might need retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this more and this is fine. In any case it's not harmful to call reviveOffers -- just may result in wasted work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout Are we making the change that killed tasks can/should be retried ?
If yes, this is a behavior change; and TSM.handleFailedTask(), we need to do the same.

This is what I mentioned w.r.t killed not resulting in task resubmission.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm It's possible that the makeOffers() call causes a different job's tasks to be executed on the given executor. Fundamentally, the problem is that the killed task needs to be re-scheduled on a different executor, and the only way to guarantee that the task gets offered new/different executors is to do a full reviveOffers() call (which is why the code in question exists in the first place).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if Eric changes TSM.handleFailedTask to return a boolean value indicating whether the failed task needs to be re-scheduled? Then we could use that to decide whether to call reviveOffers.

Copy link
Contributor Author

@ericl ericl Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a simple solution here to avoid extra overhead on speculative tasks. We just need to check if the task index has been marked as successful -- if so, we can skip calling reviveOffers(). How does this look?

-    if (!taskSetManager.isZombie) {
+    if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) {
        reviveOffers()

Then in TaskSetManager,

+  def someAttemptSucceeded(tid: Long): Boolean = {
+    successful(taskInfos(tid).index)
+  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that sounds great to me @ericl and minimally invasive!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, I updated the PR to include this.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74059 has finished for PR 17166 at commit 90d2c98.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TaskKilled(reason: String) extends TaskFailedReason

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74053 has finished for PR 17166 at commit a58d391.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74057 has finished for PR 17166 at commit 02d81b5.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* @param taskId the task ID to kill
* @param reason the reason for killing the task, which should be a short string
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm i don't think we should automatically retry just by providing a reason. Perhaps this

def killTask(taskId: Long, reason: String): Unit

def killTaskAndRetry(taskId: Long, reason: String): Unit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing for the lower level dag scheduler api.

Copy link
Contributor Author

@ericl ericl Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it turns out there's not a good reason to not retry. The task will get retried anyways eventually unless the stage is cancelled. The previous code seems to be just a performance optimization to not call reviveOffers for speculative task completions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok. for some reason i read it as killTask(long) is kill without retry, and killTask(long, string) is with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about calling it "killAndRescheduleTask" then? Otherwise kill is a little misleading -- since where we use it elsewhere (to kill a stage) it implies no retry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unclear about the expectations from the api.

  • What is the expectation when a task is being killed.
    • Is it specifically for the task being referenced; or all attempts of the task ?
    • If latter, do we discard already succeeded tasks ?
    • "killAndRescheduleTask" implies it will be rescheduled - which might not occur in case this was a speculative task (or already completed) : would be good to clarify.
  • Is this expected to be exposed via the UI ?
    • How is it to be leveraged (if not via UI) ?
  • How to get to taskId - via SparkListener.onTaskStart ?
    • Any other means ? (IIRC no other way to get at it, but good to clarify for api user)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given Mridul's points maybe killTaskAttempt is a better name? IMO specifying "attempt" in the name makes it sound less permanent than killTask (which to me sounds like it won't be retried)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expectation when a task is being killed.
Is it specifically for the task being referenced; or all attempts of the task ?

The current task attempt (which is uniquely identifier by the task id). I updated the docs as suggested here.

"killAndRescheduleTask" implies it will be rescheduled - which might not occur in case this was a speculative task (or already completed) : would be good to clarify.

Went with killTaskAttempt.

Is this expected to be exposed via the UI ?
How is it to be leveraged (if not via UI) ?

For now, you can look at the Spark UI, find the task ID, and call killTaskAttempt on it. It would be nice to have this as a button on the executor page in a follow-up. You can also have a listener that kills tasks as suggested.

}

def killTask(taskId: Long, interruptThread: Boolean): Unit = {
def killTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fits in one line?

  def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


/**
* Kill a given task. It will be retried.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the public api, we should separate retry from reason...

case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
case class KillTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also seems to fit in one line?

  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean, reason: String)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74073 has started for PR 17166 at commit f03de61.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74065 has finished for PR 17166 at commit 170fa34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TaskKilled(reason: String) extends TaskFailedReason


private[spark] override def killTaskIfInterrupted(): Unit = {
if (maybeKillReason.isDefined) {
throw new TaskKilledException(maybeKillReason.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not thread safe - while technically we do not allow kill reason to be reset to None right now and might be fine, it can lead to future issues.

Either make all access/updates to kill reason synchronized; or capture maybeKillReason to a local variable and use that in the if and throw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
throw new TaskKilledException(maybeKillReason.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above here - atomic use of maybeKillReason required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Whether the corresponding task has been killed.
@volatile private var interrupted: Boolean = false
// If defined, the corresponding task has been killed for the contained reason.
@volatile private var maybeKillReason: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Overloading maybeKillReason to indicate interrupted status smells a bit; but might be ok for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the reason here is to allow this to be set atomically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this reasonIfKilled, here and elsewhere? (if you strongly prefer the existing name find to leave as-is -- I just slightly prefer making it somewhat more obvious that this and the fact that the task has been killed are tightly intertwined).

In any case, can you expand the comment a bit to one you used below: "If specified, this task has been killed and this option contains the reason."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
@volatile @transient private var _maybeKillReason: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to make this a String and not Option[String] - like other places it is defined/used ?

Copy link
Contributor Author

@ericl ericl Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one gets deserialized to null sometimes (@ transient), so it seemed cleaner to use a bare string rather than have an option that can be null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SparkContextSuite.taskSucceeded = true
}
}
assert(SparkContextSuite.taskSucceeded)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both listener and the task are both setting taskSuceeded ? That does not look right ...
I am assuming we need one failure to be raised with the appropriate message, one task success - to ensure listener success.
Additionally, re-execution of task to indicate success of task (though this aspect should be covered in some other test already).

case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
case class TaskKilled(reason: String) extends TaskFailedReason {
override def toErrorString: String = s"TaskKilled ($reason)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is unfortunate, but looks like it cant be helped if we need this feature.
Probably something to keep in mind with future use of case objects !

Thx for clarifying.

logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
context.killTaskIfInterrupted()
null // not reached
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be good if we could directly throw the exception here - instead of relying on killTaskIfInterrupted to do the right thing (it is interrupted already according to the case check)
Not only will it not remove the unreachable null, but also ensure future changes to killTaskIfInterrupted or interrupt reset, etc does not break this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl Actually that is not correct.
Killed tasks were not candidates for resubmission on failure; and hence there is no need to revive offers when task kills are detected.

If they are to be made candidates, we need to introduce this expectation explicit elsewhere also to be consistent.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74993 has finished for PR 17166 at commit 884a3ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TaskKilledException(val reason: String) extends RuntimeException

@mridulm
Copy link
Contributor

mridulm commented Mar 21, 2017

Hi @kayousterhout,
Can you take over reviewing this PR ? I might be tied up with other things for next couple of weeks, and I dont want @ericl's work to be blocked on me.

Thx

Copy link
Contributor Author

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm addressed the comments around atomic access. Some of the changes are a bit awkward since we access that variable during exception handling, but maybe it's ok since we don't expect to hit this case.


private[spark] override def killTaskIfInterrupted(): Unit = {
if (maybeKillReason.isDefined) {
throw new TaskKilledException(maybeKillReason.get)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
context.killTaskIfInterrupted()
null // not reached
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
throw new TaskKilledException(maybeKillReason.get)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
@volatile @transient private var _maybeKillReason: String = null
Copy link
Contributor Author

@ericl ericl Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one gets deserialized to null sometimes (@ transient), so it seemed cleaner to use a bare string rather than have an option that can be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need, but reviving offers has no effect either way. Those tasks will not be resubmitted even if reviveOffers() is called (in fact, reviveOffers() is called periodically on a timer thread, so if this was an issue we should have already seen it).

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74999 has finished for PR 17166 at commit 203a900.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #75000 has finished for PR 17166 at commit 6e8593b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 22, 2017

Test build #75005 has finished for PR 17166 at commit 5707715.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good (and happy to take over the review @mridulm); just a few last minor clean up comments.

// Whether the corresponding task has been killed.
@volatile private var interrupted: Boolean = false
// If defined, the corresponding task has been killed for the contained reason.
@volatile private var maybeKillReason: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this reasonIfKilled, here and elsewhere? (if you strongly prefer the existing name find to leave as-is -- I just slightly prefer making it somewhat more obvious that this and the fact that the task has been killed are tightly intertwined).

In any case, can you expand the comment a bit to one you used below: "If specified, this task has been killed and this option contains the reason."

case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need the getOrElse here? (since isInterrupted is true, shouldn't this always be defined?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm pointed out that should the kill reason get reset to None by a concurrent thread, this would crash. However, it is true that this can't happen in the current implementation.

If you think it's clearer, we could throw an AssertionError in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ok if Mridul wants this then fine to leave as-is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I actually had not considered this, but the use of maybeKillReason in Executor/other places; this was a nice catch by @ericl


case _: InterruptedException if task.killed =>
logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")
val killReason = task.maybeKillReason.getOrElse("unknown reason")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change if task.killed to if task.maybeKillReason.isDefied, and then just do .get here? Then you could get rid of the task.killed variable and avoid the weird dependency between task.killed being set and task.maybeKillReason being defined.

def killed: Boolean = _maybeKillReason != null

/**
* If this task has been killed, contains the reason for the kill.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, can you make the comment "If specified, this task has been killed and this option contains the reason." (assuming that you get rid of the killed variable)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
@volatile @transient private var _maybeKillReason: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comment here?

}

override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Killing task ($reason): $taskId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit but can you make this s"Killing task $taskId ($reason)"? This is somewhat more consistent with task-level logging elsewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Killing task ($reason): $taskId")
val execId = taskIdToExecutorId.getOrElse(
taskId, throw new IllegalArgumentException("Task not found: " + taskId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly how about s"Cannot kill task $taskId because it no task with that ID was found."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's kind of ugly that this throws an exception (seems like it could be an unhappy surprise to the user that their SparkContext threw an exception / died). How about instead changing the killTaskAttempt calls to return a boolean that's True if the task was successfully killed (and the returning false here)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, the job could hang: if just one task was left, and that task got killed, I don't think reviveOffers would ever be called.

@mridulm I'm not that concerned about the extra calls to reviveOffers. In the worse case, if every task in a job is speculated (which of course can't actually happen), this leads to 2x the number of calls to reviveOffers -- so it still doesn't change the asymptotic time complexity even in the worse case.

There are already a bunch of cases where we're pretty conservative with reviveOffers, in the sense that we call it even though we might not need to (e.g., when an executor dies, even if there aren't any tasks that need to be run; or every time there are speculative tasks available to run, even if there aren't any resources to run them on) so this change is in keeping with that pattern.

Copy link
Contributor

@kayousterhout kayousterhout Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I spent a while making sure that everything is ok in TSM.handleFailedTask @mridulm, and all the code there seems to handle resubmission automatically (it just didn't happen previously, when we used TaskKilled for speculative tasks, because we have a check not to re-run tasks if one copy succeeded already)

Copy link
Contributor Author

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, ptal.

// Whether the corresponding task has been killed.
@volatile private var interrupted: Boolean = false
// If defined, the corresponding task has been killed for the contained reason.
@volatile private var maybeKillReason: Option[String] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm pointed out that should the kill reason get reset to None by a concurrent thread, this would crash. However, it is true that this can't happen in the current implementation.

If you think it's clearer, we could throw an AssertionError in this case.

// A flag to indicate whether the task is killed. This is used in case context is not yet
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
@volatile @transient private var _maybeKillReason: String = null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

def killed: Boolean = _maybeKillReason != null

/**
* If this task has been killed, contains the reason for the kill.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Killing task ($reason): $taskId")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
logInfo(s"Killing task ($reason): $taskId")
val execId = taskIdToExecutorId.getOrElse(
taskId, throw new IllegalArgumentException("Task not found: " + taskId))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ok if Mridul wants this then fine to leave as-is

// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
val killReason = reasonIfKilled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why re-name the variable here (instead of just using reasonIfKilled below)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assign to a temporary, then there is no risk of seeing concurrent mutations of the value as we access it below (though, this cannot currently happen).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh in retrospect I think TaskContext should have just clearly documented that an invariant of reasonIfKilled is that, once set, it won't be un-set, and then we'd avoid all of these corner cases. But not worth changing now.

backend.killTask(taskId, execId.get, interruptThread, reason)
true
} else {
logInfo(s"Could not kill task $taskId because no task with that ID was found.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logWarn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kayousterhout
Copy link
Contributor

LGTM. I'll merge once tests pass.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75064 has finished for PR 17166 at commit a37c09b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75069 has finished for PR 17166 at commit 71b41b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

LGTM -- this looks great. Thanks for coming up with a simple way to address @mridulm's feedback Eric!

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the changes @ericl !
This definitely looks much better to me now.

case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException
throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I actually had not considered this, but the use of maybeKillReason in Executor/other places; this was a nice catch by @ericl

reason: TaskFailedReason): Unit = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should do it IMO, thanks !

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75117 has finished for PR 17166 at commit 3ec3633.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75130 has finished for PR 17166 at commit 8c4381f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

I merged this to master. I realized that the PR description is still from an old version of the change, so I modified the commit message to add that this also adds the SparkContext.killTaskAttempt method. Thanks for all of the work here @ericl!

@asfgit asfgit closed this in 8e55804 Mar 24, 2017
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
This commit adds a killTaskAttempt method to SparkContext, to allow users to
kill tasks so that they can be re-scheduled elsewhere.

This also refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as `(N killed: $reason)` and `TaskKilled: $reason`. Without this change, there is no way to provide the user feedback through the UI.

Currently used reasons are "stage cancelled", "another attempt succeeded", and "killed via SparkContext.killTask". The user can also specify a custom reason through `SparkContext.killTask`.

cc rxin

In the stage overview UI the reasons are summarized:
![1](https://cloud.githubusercontent.com/assets/14922/23929209/a83b2862-08e1-11e7-8b3e-ae1967bbe2e5.png)

Within the stage UI you can see individual task kill reasons:
![2](https://cloud.githubusercontent.com/assets/14922/23929200/9a798692-08e1-11e7-8697-72b27ad8a287.png)

Existing tests, tried killing some stages in the UI and verified the messages are as expected.

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekl@google.com>

Closes apache#17166 from ericl/kill-reason.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants