diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 382b09422a4a0..d83d2173ca057 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,6 +17,12 @@ package org.apache.spark.scheduler +import scala.util.{Failure, Success, Try} + +import org.apache.spark.Logging + +import sun.misc.{Signal, SignalHandler} + /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their * results to the given handler function. @@ -28,6 +34,28 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { + private val sigint: Signal = new Signal("INT") + @volatile + private var _originalHandler: SignalHandler = null + + def attachSigintHandler(): Unit = { + _originalHandler = Signal.handle(sigint, new SignalHandler with Logging { + override def handle(signal: Signal): Unit = { + logInfo("Cancelling running job.. This might take some time, so be patient. " + + "Press Ctrl-C again to kill JVM.") + // Detach sigint handler so that pressing ctrl-c again will interrupt the jvm. + detachSigintHandler() + cancel() + } + }) + } + + def detachSigintHandler(): Unit = { + if (_originalHandler != null) { + Signal.handle(sigint, _originalHandler) + } + } + private var finishedTasks = 0 // Is the job as a whole finished (succeeded or failed)? @@ -69,9 +97,14 @@ private[spark] class JobWaiter[T]( } def awaitResult(): JobResult = synchronized { + val attachTry = Try(attachSigintHandler()) while (!_jobFinished) { this.wait() } + attachTry match { + case _: Success[_] => detachSigintHandler() + case _: Failure[_] => // Ignore error. Signal handler is on a best effort basis. + } return jobResult } }