Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.spark.scheduler

import scala.util.{Failure, Success, Try}

import org.apache.spark.Logging

import sun.misc.{Signal, SignalHandler}

/**
* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
* results to the given handler function.
Expand All @@ -28,6 +34,28 @@ private[spark] class JobWaiter[T](
resultHandler: (Int, T) => Unit)
extends JobListener {

private val sigint: Signal = new Signal("INT")
@volatile
private var _originalHandler: SignalHandler = null

def attachSigintHandler(): Unit = {
_originalHandler = Signal.handle(sigint, new SignalHandler with Logging {
override def handle(signal: Signal): Unit = {
logInfo("Cancelling running job.. This might take some time, so be patient. " +
"Press Ctrl-C again to kill JVM.")
// Detach sigint handler so that pressing ctrl-c again will interrupt the jvm.
detachSigintHandler()
cancel()
}
})
}

def detachSigintHandler(): Unit = {
if (_originalHandler != null) {
Signal.handle(sigint, _originalHandler)
}
}

private var finishedTasks = 0

// Is the job as a whole finished (succeeded or failed)?
Expand Down Expand Up @@ -69,9 +97,14 @@ private[spark] class JobWaiter[T](
}

def awaitResult(): JobResult = synchronized {
val attachTry = Try(attachSigintHandler())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if the current thread is not MainThread ?

while (!_jobFinished) {
this.wait()
}
attachTry match {
case _: Success[_] => detachSigintHandler()
case _: Failure[_] => // Ignore error. Signal handler is on a best effort basis.
}
return jobResult
}
}