diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 72c7143edcd71..4ed1fa72789ca 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.yarn +import java.io.IOException import java.net.Socket import org.apache.hadoop.conf.Configuration import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -34,6 +36,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.scheduler.SplitInfo import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.util.ShutdownHookManager /** * An application master that allocates executors on behalf of a driver that is running outside @@ -56,10 +59,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = _ + private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) + private val fs = FileSystem.get(yarnConf) - private var driverClosed: Boolean = false + @volatile private var driverClosed: Boolean = false private var isFinished: Boolean = false private var registered: Boolean = false + @volatile private var isLastAMRetry: Boolean = true // Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", @@ -100,7 +107,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // then user specified and /tmp. System.setProperty("spark.local.dir", getLocalDirs()) + // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) + appAttemptId = getApplicationAttemptId() + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() synchronized { @@ -141,14 +152,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val interval = math.min(timeoutInterval / 2, schedulerInterval) reporterThread = launchReporterThread(interval) + isLastAMRetry = true // Wait for the reporter thread to Finish. reporterThread.join() finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) actorSystem.shutdown() - logInfo("Exited") + System.exit(0) } @@ -318,10 +330,43 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } } + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + // The shutdown hook that runs when a signal is received AND during normal close of the JVM. + class AppMasterShutdownHook(appMaster: ExecutorLauncher) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } + } -object ExecutorLauncher { +object ExecutorLauncher extends Logging { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () => diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a7585748b7f88..7ceb8642d59b4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -17,8 +17,11 @@ package org.apache.spark.deploy.yarn +import java.io.IOException import java.net.Socket import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -54,9 +57,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed: Boolean = false + private val maxAppAttempts: Int = conf.getInt( + YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + private val fs = FileSystem.get(yarnConf) + + @volatile private var driverClosed: Boolean = false private var isFinished: Boolean = false private var registered: Boolean = false + @volatile private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ @@ -99,11 +107,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // then user specified and /tmp. System.setProperty("spark.local.dir", getLocalDirs()) + // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) + + appAttemptId = ApplicationMaster.getApplicationAttemptId() + logInfo("ApplicationAttemptId: " + appAttemptId) + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() - appAttemptId = ApplicationMaster.getApplicationAttemptId() synchronized { if (!isFinished) { registerApplicationMaster() @@ -129,7 +142,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val interval = math.min(timeoutInterval / 2, schedulerInterval) reporterThread = launchReporterThread(interval) - + isLastAMRetry = true // Wait for the reporter thread to Finish. reporterThread.join() @@ -280,9 +293,42 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } } + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + // The shutdown hook that runs when a signal is received AND during normal close of the JVM. + class AppMasterShutdownHook(appMaster: ExecutorLauncher) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } + } -object ExecutorLauncher { +object ExecutorLauncher extends Logging { def main(argStrings: Array[String]) { val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () =>