From 03b48330c91ba576b97048121087c8a0f5c5eb42 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 14 Aug 2015 14:07:31 -0700 Subject: [PATCH 1/7] Add signal handler to JobWaiter to handle ctrl-c --- .../apache/spark/scheduler/JobWaiter.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 382b09422a4a0..51b3cae8268d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,6 +17,10 @@ package org.apache.spark.scheduler +import sun.misc.{Signal, SignalHandler} + +import org.apache.spark.Logging + /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their * results to the given handler function. @@ -28,6 +32,26 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { + private val sigint: Signal = new Signal("INT") + @volatile + private var _originalHandler: SignalHandler = _ + + def attachSigintHandler(): SignalHandler = { + Signal.handle(sigint, new SignalHandler with Logging { + override def handle(signal: Signal): Unit = { + logInfo("Cancelling running job.. This might take some time, so be patient. " + + "Press Ctrl-C again to kill JVM.") + // Detach sigint handler so that pressing ctrl-c again will interrupt the jvm. + detachSigintHandler(_originalHandler) + cancel() + } + }) + } + + def detachSigintHandler(originalHandler: SignalHandler): Unit = { + Signal.handle(sigint, originalHandler) + } + private var finishedTasks = 0 // Is the job as a whole finished (succeeded or failed)? @@ -69,9 +93,11 @@ private[spark] class JobWaiter[T]( } def awaitResult(): JobResult = synchronized { + _originalHandler = attachSigintHandler() while (!_jobFinished) { this.wait() } + detachSigintHandler(_originalHandler) return jobResult } } From eb5340b53d23fdd949c034827cf55194c63b83e3 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Sun, 16 Aug 2015 23:14:53 -0700 Subject: [PATCH 2/7] Invoke Signal.handle via reflection --- .../apache/spark/scheduler/JobWaiter.scala | 65 ++++++++++++++----- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 51b3cae8268d2..965de9565331d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,9 +17,9 @@ package org.apache.spark.scheduler -import sun.misc.{Signal, SignalHandler} +import java.lang.reflect.{InvocationHandler, Method, Proxy} -import org.apache.spark.Logging +import org.apache.spark.util.Utils /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their @@ -32,24 +32,55 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { - private val sigint: Signal = new Signal("INT") + // Original signal handler that is overridden. @volatile - private var _originalHandler: SignalHandler = _ + private var _originalHandler: Object = _ - def attachSigintHandler(): SignalHandler = { - Signal.handle(sigint, new SignalHandler with Logging { - override def handle(signal: Signal): Unit = { - logInfo("Cancelling running job.. This might take some time, so be patient. " + - "Press Ctrl-C again to kill JVM.") - // Detach sigint handler so that pressing ctrl-c again will interrupt the jvm. - detachSigintHandler(_originalHandler) - cancel() - } - }) + // Override default signal handler for ctrl-c if sun.misc Signal handler classes exist. + private def attachSigintHandler(): Object = { + try { + val signalClazz = Utils.classForName("sun.misc.Signal") + val signalHandlerClazz = Utils.classForName("sun.misc.SignalHandler") + val newHandler = Proxy.newProxyInstance(Utils.getContextOrSparkClassLoader, + Array(signalHandlerClazz), new InvocationHandler { + override def invoke(proxy: Any, method: Method, args: Array[AnyRef]): AnyRef = { + // scalastyle:off println + println("Cancelling running job.. This might take some time, so be patient.\n" + + "Press Ctrl-C again to kill JVM.") + // scalastyle:on println + // Detach sigint handler so that pressing ctrl-c again will interrupt jvm. + detachSigintHandler() + cancel() + null + } + }) + signalClazz.getMethod("handle", signalClazz, signalHandlerClazz) + .invoke( + null, + signalClazz.getConstructor(classOf[String]).newInstance("INT").asInstanceOf[Object], + newHandler) + } catch { + // Ignore. sun.misc Signal handler classes don't exist. + case _: ClassNotFoundException => null + case e: Exception => throw e + } } - def detachSigintHandler(originalHandler: SignalHandler): Unit = { - Signal.handle(sigint, originalHandler) + // Reset signal handler to default + private def detachSigintHandler(): Unit = { + try { + val signalClazz = Utils.classForName("sun.misc.Signal") + val signalHandlerClazz = Utils.classForName("sun.misc.SignalHandler") + signalClazz.getMethod("handle", signalClazz, signalHandlerClazz) + .invoke( + null, + signalClazz.getConstructor(classOf[String]).newInstance("INT").asInstanceOf[Object], + _originalHandler) + } catch { + // Ignore. sun.misc Signal handler classes don't exist. + case _: ClassNotFoundException => + case e: Exception => throw e + } } private var finishedTasks = 0 @@ -97,7 +128,7 @@ private[spark] class JobWaiter[T]( while (!_jobFinished) { this.wait() } - detachSigintHandler(_originalHandler) + detachSigintHandler() return jobResult } } From ff8bdea7ac5ef25529a3a539ca78ed73978d4b04 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 17 Aug 2015 08:21:57 -0700 Subject: [PATCH 3/7] Revert "Invoke Signal.handle via reflection" This reverts commit 1cae5da93e44a82a6da7828203ce090e5afb1107. --- .../apache/spark/scheduler/JobWaiter.scala | 65 +++++-------------- 1 file changed, 17 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 965de9565331d..51b3cae8268d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,9 +17,9 @@ package org.apache.spark.scheduler -import java.lang.reflect.{InvocationHandler, Method, Proxy} +import sun.misc.{Signal, SignalHandler} -import org.apache.spark.util.Utils +import org.apache.spark.Logging /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their @@ -32,55 +32,24 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { - // Original signal handler that is overridden. + private val sigint: Signal = new Signal("INT") @volatile - private var _originalHandler: Object = _ + private var _originalHandler: SignalHandler = _ - // Override default signal handler for ctrl-c if sun.misc Signal handler classes exist. - private def attachSigintHandler(): Object = { - try { - val signalClazz = Utils.classForName("sun.misc.Signal") - val signalHandlerClazz = Utils.classForName("sun.misc.SignalHandler") - val newHandler = Proxy.newProxyInstance(Utils.getContextOrSparkClassLoader, - Array(signalHandlerClazz), new InvocationHandler { - override def invoke(proxy: Any, method: Method, args: Array[AnyRef]): AnyRef = { - // scalastyle:off println - println("Cancelling running job.. This might take some time, so be patient.\n" + - "Press Ctrl-C again to kill JVM.") - // scalastyle:on println - // Detach sigint handler so that pressing ctrl-c again will interrupt jvm. - detachSigintHandler() - cancel() - null - } - }) - signalClazz.getMethod("handle", signalClazz, signalHandlerClazz) - .invoke( - null, - signalClazz.getConstructor(classOf[String]).newInstance("INT").asInstanceOf[Object], - newHandler) - } catch { - // Ignore. sun.misc Signal handler classes don't exist. - case _: ClassNotFoundException => null - case e: Exception => throw e - } + def attachSigintHandler(): SignalHandler = { + Signal.handle(sigint, new SignalHandler with Logging { + override def handle(signal: Signal): Unit = { + logInfo("Cancelling running job.. This might take some time, so be patient. " + + "Press Ctrl-C again to kill JVM.") + // Detach sigint handler so that pressing ctrl-c again will interrupt the jvm. + detachSigintHandler(_originalHandler) + cancel() + } + }) } - // Reset signal handler to default - private def detachSigintHandler(): Unit = { - try { - val signalClazz = Utils.classForName("sun.misc.Signal") - val signalHandlerClazz = Utils.classForName("sun.misc.SignalHandler") - signalClazz.getMethod("handle", signalClazz, signalHandlerClazz) - .invoke( - null, - signalClazz.getConstructor(classOf[String]).newInstance("INT").asInstanceOf[Object], - _originalHandler) - } catch { - // Ignore. sun.misc Signal handler classes don't exist. - case _: ClassNotFoundException => - case e: Exception => throw e - } + def detachSigintHandler(originalHandler: SignalHandler): Unit = { + Signal.handle(sigint, originalHandler) } private var finishedTasks = 0 @@ -128,7 +97,7 @@ private[spark] class JobWaiter[T]( while (!_jobFinished) { this.wait() } - detachSigintHandler() + detachSigintHandler(_originalHandler) return jobResult } } From e5faddad3e9106824aec6d82b79be4378a910563 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 17 Aug 2015 08:22:28 -0700 Subject: [PATCH 4/7] Fix import ordering --- .../src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 51b3cae8268d2..552dd1fa0de8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,10 +17,10 @@ package org.apache.spark.scheduler -import sun.misc.{Signal, SignalHandler} - import org.apache.spark.Logging +import sun.misc.{Signal, SignalHandler} + /** * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their * results to the given handler function. From 14a9d5b34236f09a15ba6c177c5ed592bd0c9bf4 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 17 Aug 2015 08:28:04 -0700 Subject: [PATCH 5/7] Remove SignalHanlder parameter from detachSigintHandler --- .../main/scala/org/apache/spark/scheduler/JobWaiter.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 552dd1fa0de8c..7688dd2e86dbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -42,14 +42,14 @@ private[spark] class JobWaiter[T]( logInfo("Cancelling running job.. This might take some time, so be patient. " + "Press Ctrl-C again to kill JVM.") // Detach sigint handler so that pressing ctrl-c again will interrupt the jvm. - detachSigintHandler(_originalHandler) + detachSigintHandler() cancel() } }) } - def detachSigintHandler(originalHandler: SignalHandler): Unit = { - Signal.handle(sigint, originalHandler) + def detachSigintHandler(): Unit = { + Signal.handle(sigint, _originalHandler) } private var finishedTasks = 0 @@ -97,7 +97,7 @@ private[spark] class JobWaiter[T]( while (!_jobFinished) { this.wait() } - detachSigintHandler(_originalHandler) + detachSigintHandler() return jobResult } } From 368e4eb77ddffe9378c101058b0f24813811978a Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 17 Aug 2015 10:42:05 -0700 Subject: [PATCH 6/7] Fix unit tests --- .../scala/org/apache/spark/scheduler/JobWaiter.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 7688dd2e86dbf..6352509a519c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.util.Try + import org.apache.spark.Logging import sun.misc.{Signal, SignalHandler} @@ -34,7 +36,7 @@ private[spark] class JobWaiter[T]( private val sigint: Signal = new Signal("INT") @volatile - private var _originalHandler: SignalHandler = _ + private var _originalHandler: SignalHandler = null def attachSigintHandler(): SignalHandler = { Signal.handle(sigint, new SignalHandler with Logging { @@ -49,7 +51,9 @@ private[spark] class JobWaiter[T]( } def detachSigintHandler(): Unit = { - Signal.handle(sigint, _originalHandler) + if (_originalHandler != null) { + Signal.handle(sigint, _originalHandler) + } } private var finishedTasks = 0 @@ -93,11 +97,11 @@ private[spark] class JobWaiter[T]( } def awaitResult(): JobResult = synchronized { - _originalHandler = attachSigintHandler() + Try(_originalHandler = attachSigintHandler()) while (!_jobFinished) { this.wait() } - detachSigintHandler() + Try(detachSigintHandler()) return jobResult } } From 1ba208a09937d36727bd18a51008b1f70d888714 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 11 Sep 2015 13:37:51 -0700 Subject: [PATCH 7/7] Code clean up using scala try, success, and failure --- .../org/apache/spark/scheduler/JobWaiter.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 6352509a519c5..d83d2173ca057 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import scala.util.Try +import scala.util.{Failure, Success, Try} import org.apache.spark.Logging @@ -38,8 +38,8 @@ private[spark] class JobWaiter[T]( @volatile private var _originalHandler: SignalHandler = null - def attachSigintHandler(): SignalHandler = { - Signal.handle(sigint, new SignalHandler with Logging { + def attachSigintHandler(): Unit = { + _originalHandler = Signal.handle(sigint, new SignalHandler with Logging { override def handle(signal: Signal): Unit = { logInfo("Cancelling running job.. This might take some time, so be patient. " + "Press Ctrl-C again to kill JVM.") @@ -97,11 +97,14 @@ private[spark] class JobWaiter[T]( } def awaitResult(): JobResult = synchronized { - Try(_originalHandler = attachSigintHandler()) + val attachTry = Try(attachSigintHandler()) while (!_jobFinished) { this.wait() } - Try(detachSigintHandler()) + attachTry match { + case _: Success[_] => detachSigintHandler() + case _: Failure[_] => // Ignore error. Signal handler is on a best effort basis. + } return jobResult } }