diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 13241b77bf97..4340bf49f484 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -198,6 +198,15 @@ case object TaskKilled extends TaskFailedReason { override def toErrorString: String = "TaskKilled (killed intentionally)" } +/** + * :: DeveloperApi :: + * Task caught OOM exception and needs to be rescheduled. + */ +@DeveloperApi +case object TaskOutOfMemory extends TaskFailedReason { + override def toErrorString: String = "TaskOutOfMemory (task caught OutOfMemoryError)" +} + /** * :: DeveloperApi :: * Task requested the driver to commit, but was denied. diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9e88d488c037..1d17cb42ebad 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal @@ -113,8 +114,12 @@ private[spark] class Executor( // Executor for the heartbeat task. private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") + private val shutdownHookLock = new Object() + startDriverHeartbeater() + addShutdownHook() + def launchTask( context: ExecutorBackend, taskId: Long, @@ -145,6 +150,24 @@ private[spark] class Executor( } } + private def addShutdownHook(): AnyRef = { + ShutdownHookManager.addShutdownHook(ShutdownHookManager.EXECUTOR_SHUTDOWN_PRIORITY) { () => + // Synchronized with the OOM task handler, which is sending its status update to driver. + // Please note that the assumption here is that OOM thread is still running and will gets + // the lock prior to this. + shutdownHookLock.synchronized { + // Cleanup all the tasks which are currently running, so that they would not through + // undesirable error messages during shutdown. Please note that kill interrupts all + // the running threads and tasks will be killed when interrupts are actually handled. + runningTasks.values.foreach(t => t.kill(true)) + } + logInfo("shutdown hook called") + + // Sleep to make sure that status update from OOM handle is flushed to driver. + Thread.sleep(conf.getInt("spark.executor.shutdownHookDelay", 300)) + } + } + /** Returns the total amount of time this JVM process has spent in garbage collection. */ private def computeTotalGcTime(): Long = { ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum @@ -291,6 +314,12 @@ private[spark] class Executor( val reason = cDE.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + case oom: OutOfMemoryError => + shutdownHookLock.synchronized { + logError("Out of memory exception in " + Thread.currentThread(), oom) + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskOutOfMemory)) + } + case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f7e84a2c2e14..5ba1f6d82a46 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -144,10 +144,11 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon } private def addShutdownHook(): AnyRef = { - ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => - logInfo("Shutdown hook called") - DiskBlockManager.this.doStop() - } + ShutdownHookManager.addShutdownHook( + ShutdownHookManager.DISK_BLOCK_MANAGER_SHUTDOWN_PRIORITY) { () => + logInfo("Shutdown hook called") + DiskBlockManager.this.doStop() + } } /** Cleanup local dirs and stop shuffle sender. */ diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index db4a8b304ec3..f1aa3b93051f 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -45,6 +45,18 @@ private[spark] object ShutdownHookManager extends Logging { */ val TEMP_DIR_SHUTDOWN_PRIORITY = 25 + /** + * The shutdown priority of disk block manager should be higher than temp directory. + */ + val DISK_BLOCK_MANAGER_SHUTDOWN_PRIORITY = 26 + + /** + * The shutdown priority of Executor to do the cleanup of the current running tasks. Its + * priority should be higher than DiskBlockManager shutdown priority as the tasks may throw + * unhandled exceptions if temp directory cleanup is happening in parallel. + */ + val EXECUTOR_SHUTDOWN_PRIORITY = 27 + private lazy val shutdownHooks = { val manager = new SparkShutdownHookManager() manager.install()