Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Dec 7, 2016

What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call TaskRunner.kill() and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the spark.task.reaper.* namespace. See the updated configuration.md doc for details.

How was this patch tested?

Tested via a new test case in JobCancellationSuite, plus manual testing.

}
if (!taskRunner.isFinished) {
logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms")
if (takeThreadDump) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try is a precautionary measure to avoid potential issues where the thread mx bean can't give us a thread dump.

}

override def run(): Unit = {
Thread.currentThread().setName(threadName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task ids should be unique so therefore this thread name should be unique. Hence, I don't think it's super important to reset the thread's name when returning it to this task thread pool because the thread will just be renamed as soon as it's recycled for a new task and if the task has already exited then it'll be clear from the thread state / context that this is just a completed task's thread that's been returned to the pool.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69776 has started for PR 16189 at commit a46f9c2.

serializedTask: ByteBuffer)
extends Runnable {

val threadName = s"Executor task launch worker for task $taskId"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming scheme was intentionally chosen to match the pattern that we use for sorting threads in the executor thread dump page. I'll manually verify that this worked as expected there.

}
}
if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs) {
if (isLocal) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we did throw an exception here, it wouldn't exit the JVM in local mode because we don't set an uncaught exception handler in local mode (see code further up in this file).

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)
// Small delay to ensure tasks actually start executing the task body
Thread.sleep(1000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly ugly but it's needed to avoid a race where this regression test can spuriously pass (and thereby fail to test anything) in case we cancel a task right after it has launched on the executor but before the UDF in the task has actually run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coutdownlatch for this ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm that doesn't work since driver and executors are not in the same process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in practice this particular usage isn't flaky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed the obvious - thanks for clarifying.

taskRunner.kill(interruptThread)
}
}
runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A careful reviewer will notice that it's possible for killTask to be called twice for the same task, either via multiple calls to killTask here or via a call to killTask followed by a later killAllTasks call. I think that this should technically be okay as of the code in this first draft of this patch since having multiple TaskReapers for the same task should be fine, but I can also appreciate how this could cause resource exhaustion issues in the pathological case where killTask is spammed continuously. If we think it's important to avoid multiple reapers in this case then a simple solution would be to add a synchronized method on TaskRunner which submits a TaskReaper on the first kill request and is a no-op on subsequent requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or access TaskReapers via something like a guava LoadingCache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One corner-case that I just thought of: what should happen if the first call to killTask sets interruptThread = false and the second call sets it to true? If we used a loading cache naively then we would miss the second call. Instead, it might make sense to have the loading cache key be a tuple of (tid, interrupt).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the more extreme case is if interruptThread gets repeatedly reset, sometimes true and sometimes false. Probably the only tractable logic will be that if interruptThread ever gets set to true, then the Task can be interrupted regardless of whether or not interruptThread is subsequently reset to false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a nice way to handle this case by adding a taskReaperForTask map which maps from task id to the most recently created reaper. I then synchronize on this map to implement the "create a new task reaper if one doesn't exist or if the existing one is non-interrupting and the requested one should interrupt" logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the current defaults, it's probably best to flag this feature off by default and then I can turn it on in my production environment. For a bit of context, my motivation for this patch is to add a last-ditch safety mechanism to prevent zombie'd executors from causing resource starvation issues on multi-tenant production clusters. This watchdog timer mechanism would have prevented an outage due to uncancellable tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen Agree, off by default (with existing impl in place) would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For maximum conservatism, I agree that it probably makes sense to flag this off by default. Even if we don't have the "kill the JVM" behavior it's still useful to have the watchdog to pull thread dumps and write logs to indicate what's happening with the zombie tasks, so I think that the killing and polling should be controlled by separate flags. Let me figure out how to control these separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another note: Compared to earlier, where we were simply running down the values (TaskRunner), here we run down the keys (taskId) and do point lookups for each taskId - it does increase the complexity and to be kept in mind (it is iteration/lookup on a ConcurrentHashMap, not regular HashMap).
Having said that, this should be uncommon enough not to be an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm, I'm pretty sure that any performance difference here is practically unmeasurable in real-world situations because the runningTasks.length is going to be bounded at some small number based on the number of executor cores / task slots and that's typically going to be pretty small (order of tens to hundreds, not thousands), so it doesn't really make a huge difference if the individual lookup operations are ever-so-slightly slower.

Also, I'm not sure that killAllTasks() is ever actually called in practice, let alone called frequently, so even if it was way slower we'd never notice.

*/
private class TaskReaper(taskRunner: TaskRunner, interruptThread: Boolean) extends Runnable {

private[this] val killPollingFrequencyMs: Long =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the fence about documenting these publicly, but am willing to do so and appreciate naming suggestions.


private[this] val killPollingFrequencyMs: Long =
conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here was to let users set this to -1 to disable killing of the executor JVM. I'll add a test to make sure that this flag actually behaves that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.


while (!taskRunner.isFinished && elapsedTimeMs < killTimeoutMs) {
taskRunner.kill(interruptThread = interruptThread)
taskRunner.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the sleep inside synchronized ?
Also, would be good to use wait/notify instead of sleep - so that the TaskReaper's are released proactively as soon as task finishes, so that number of threads in the cached pool does not go very high.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant to use wait() (was prototyping this quickly late at night, hence this oversight). Will change now.

if (takeThreadDump) {
try {
val threads = Utils.getThreadDump()
threads.find(_.threadName == taskRunner.threadName).foreach { thread =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expensive only to get to a single thread dump.
Why not keep track of thread id, dump only for the single thread (instead of all) and validate threadName is same as expected ?

conf.getBoolean("spark.task.threadDumpKilledTasks", true)

override def run(): Unit = {
val startTimeMs = System.currentTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Clock instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that using Clock offers a huge improvement here unless we're going to use it to mock out time in unit tests, which I think may be difficult to do here given the structure of this code and difficultly in dependency injection here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly - for testing.
If not unit tests, atleast functional tests should be done.

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move the logging into finally block so that even if there are exceptions in the method we have the status ?
(Makes sense to keep the throw here ofcourse).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which finally block are you referring to here? The uncaught exception handler in the executor JVM?

Copy link
Contributor

@mridulm mridulm Dec 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant have a try/finally for the entire run method - and log if unable to kill.
Currently it logs only here - but it is possible to exit the method earlier due to exceptions/etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this and it seems like there are only two possibilities here:

  1. We're running in local mode, in which case we don't actually want to throw an exception to kill the JVM and even if we did throw then it would keep on running because there's not an uncaught exception handler here.
  2. We're running in a separate JVM, in which case any exception thrown in this thread and not caught will cause the JVM to exit. The only place in the body of this code that might actually throw unexpected exceptions is the taskThreadDump, which is already in a try-catch block to prevent exceptions from bubbling up.

Thus the only purpose of a finally block would be to detect whether it was reached via an exception patch and to log a warning to state that task kill progress will no longer be monitored. Basically, I'm not sure what the finally block is buying us in terms of actionable / useful logs and it's only going to add complexity because then we need to be careful to not throw from the finally block in case it was entered via an exception, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, however, it may be useful to have a finally to perform cleanup in a map that I'm about to add / to guarantee that entries cannot possibly be leaked in local mode.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69803 has finished for PR 16189 at commit 000eaef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69814 has finished for PR 16189 at commit 9c0e12f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69815 has finished for PR 16189 at commit fcaf6fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69912 has started for PR 16189 at commit 99f7b4f.

@JoshRosen JoshRosen changed the title [SPARK-18761][CORE][WIP] Introduce "task reaper" to oversee task killing in executors [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors Dec 9, 2016
@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69914 has started for PR 16189 at commit 7c65a74.

val startTimeMs = System.currentTimeMillis()
def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
try {
while (!taskRunner.isFinished && (elapsedTimeMs < killTimeoutMs || killTimeoutMs <= 0)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where interruptThread = false taskRunner.kill() will be idempotent and subsequent calls won't have any effect. In the case where we do interrupt, however, the introduction of this polling loop means that we'll interrupt the same task multiple times. Note that this could, in principle, have happened before, but in practice I think it never would.

Are there cases where back-to-back interrupts could cause user code to break in really bad ways? I'm wondering whether you might have a case where you, say, are interrupted when issuing a SQL query and then use a finally block to do some kind of rollback and then have that rollback / cleanup step itself itself be interrupted a little while later. If this is a scenario that's concerning, then maybe the subsequent polls should only periodically thread dump and not interrupt. In that case, however, we'd need to make sure that back-to-back killTask(interrupt=true) calls both interrupt once on the first time, so the logic of avoiding the creation of a second TaskReaper would need to change a bit so that we issue a one-time interrupt in the case where we decide that the current TaskReaper subsumes the interrupt=true one that we'd otherwise create.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where we do interrupt, however, the introduction of this polling loop means that we'll interrupt the same task multiple times

My 2c: if the application code doesn't respond to the first interrupt immediately, chances are very low that it would respond to the following interrupts (it may got stuck in some dead loop, or some blocking JNI call), so sending multiple interrupt may not be necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I'll update this tomorrow to only interrupt once.

private val executorSource = new ExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
// For tasks which are in the process of being killed, this map the most recently created
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sentence no verb.

Copy link
Contributor

@lins05 lins05 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the executor is killed by the reaper, the driver side would see these messags;

16/12/11 05:48:44 ERROR TaskSchedulerImpl: Lost executor 1 on 172.16.90.3: Remote
RPC client disassociated. Likely due to containers exceeding thresholds, or
network issues. Check driver logs for WARN messages.

16/12/11 05:48:44 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
172.16.90.3, executor 1): ExecutorLostFailure (executor 1 exited caused by one of
the running tasks) Reason: Remote RPC client disassociated. Likely due to
containers exceeding thresholds, or network issues. Check driver logs for WARN
messages.

16/12/11 05:48:44 ERROR TaskSchedulerImpl: Lost executor 0 on 172.16.90.3: Remote
RPC client disassociated. Likely due to containers exceeding thresholds, or
network issues. Check driver logs for WARN messages.

16/12/11 05:48:44 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
172.16.90.3, executor 0): ExecutorLostFailure (executor 0 exited caused by one of
the running tasks) Reason: Remote RPC client disassociated. Likely due to
containers exceeding thresholds, or network issues. Check driver logs for WARN
messages.

The above messages are not helpful for showing the real reason of the executor loss. Can we improve this?

}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we also have (elapsedTimeMs < killTimeoutMs || killTimeoutMs <= 0) above, what about extracting killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs to a method, e.g def timeoutExceeded(): Boolean?


private[this] val taskId: Long = taskRunner.taskId

private[this] val killPollingFrequencyMs: Long =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the name killPollingInterval would be slightly better? Also I think the poll interval should not be larger than spark.task.killTimeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the naming suggestion; I'll do this tomorrow.

@SparkQA
Copy link

SparkQA commented Dec 19, 2016

Test build #70368 has finished for PR 16189 at commit d87c8f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 19, 2016

Test build #70371 has finished for PR 16189 at commit 9cff80f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ghost pushed a commit to dbtsai/spark that referenced this pull request Dec 20, 2016
…DD & UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16340 from JoshRosen/sql-task-interruption.
asfgit pushed a commit that referenced this pull request Dec 20, 2016
…DD & UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16340 from JoshRosen/sql-task-interruption.

(cherry picked from commit 5857b9a)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
.set("spark.task.reaper.enabled", "true")
.set("spark.task.reaper.killTimeout", "-1")
.set("spark.task.reaper.PollingInterval", "1s")
.set("spark.deploy.maxExecutorRetries", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set it to 1 to make sure that we will not kill JVM, right (if we kill JVM, we will remove the application because spark.deploy.maxExecutorRetries is 1.)?

@yhuai
Copy link
Contributor

yhuai commented Dec 20, 2016

LGTM!

@yhuai
Copy link
Contributor

yhuai commented Dec 20, 2016

Thank you for those comments. I am merging this to master.

@asfgit asfgit closed this in fa829ce Dec 20, 2016
@mridulm
Copy link
Contributor

mridulm commented Dec 20, 2016

Sounds good @JoshRosen.
In general @yhuai it would have been better to give some more time for reviewers to get to ongoing conversations before commiting a patch under active review unless hotfix, thanks.

@yhuai
Copy link
Contributor

yhuai commented Dec 20, 2016

@mridulm Sure. Also, please feel free to leave more comments :)

JoshRosen added a commit to JoshRosen/spark that referenced this pull request Dec 20, 2016
…DD & UnsafeSorter

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16340 from JoshRosen/sql-task-interruption.
@JoshRosen JoshRosen deleted the cancellation branch December 20, 2016 19:52
asfgit pushed a commit that referenced this pull request Dec 20, 2016
…n executors

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16189 from JoshRosen/cancellation.
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Dec 20, 2016
…n executors

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16189 from JoshRosen/cancellation.
@JoshRosen
Copy link
Contributor Author

This cherry-picked cleanly into branch-2.1 but there were some merge conflicts for branch-2.0, so I've opened #16358 for the branch-2.0 backport.

asfgit pushed a commit that referenced this pull request Dec 20, 2016
…ling in executors

Branch-2.0 backport of #16189; original description follows:

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16358 from JoshRosen/cancellation-branch-2.0.
asfgit pushed a commit that referenced this pull request Dec 21, 2016
…anRDD, JDBCRDD & UnsafeSorter

This is a branch-2.0 backport of #16340; the original description follows:

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16357 from JoshRosen/sql-task-interruption-branch-2.0.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…DD & UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16340 from JoshRosen/sql-task-interruption.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…n executors

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16189 from JoshRosen/cancellation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants