Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.deploy.yarn

import java.io.IOException
import java.net.Socket
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.util.ShutdownHookManager

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand All @@ -56,10 +59,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)

private var yarnAllocator: YarnAllocationHandler = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private val fs = FileSystem.get(yarnConf)

private var driverClosed: Boolean = false
@volatile private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false
@volatile private var isLastAMRetry: Boolean = true

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
Expand Down Expand Up @@ -100,7 +107,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()

synchronized {
Expand Down Expand Up @@ -141,14 +152,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val interval = math.min(timeoutInterval / 2, schedulerInterval)

reporterThread = launchReporterThread(interval)
isLastAMRetry = true

// Wait for the reporter thread to Finish.
reporterThread.join()

finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
actorSystem.shutdown()

logInfo("Exited")

System.exit(0)
}

Expand Down Expand Up @@ -318,10 +330,43 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}

/**
* Clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
logError("Staging directory is null")
return
}
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}

// The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ExecutorLauncher) extends Runnable {

def run() {
logInfo("AppMaster received a signal.")
// we need to clean up staging dir before HDFS is shut down
// make sure we don't delete it until this is the last AM
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}

}


object ExecutorLauncher {
object ExecutorLauncher extends Logging {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
SparkHadoopUtil.get.runAsSparkUser { () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.deploy.yarn

import java.io.IOException
import java.net.Socket
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
Expand Down Expand Up @@ -54,9 +57,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)

private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed: Boolean = false
private val maxAppAttempts: Int = conf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
private val fs = FileSystem.get(yarnConf)

@volatile private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false
@volatile private var isLastAMRetry: Boolean = true

private var amClient: AMRMClient[ContainerRequest] = _

Expand Down Expand Up @@ -99,11 +107,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = ApplicationMaster.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()

appAttemptId = ApplicationMaster.getApplicationAttemptId()
synchronized {
if (!isFinished) {
registerApplicationMaster()
Expand All @@ -129,7 +142,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val interval = math.min(timeoutInterval / 2, schedulerInterval)

reporterThread = launchReporterThread(interval)

isLastAMRetry = true

// Wait for the reporter thread to Finish.
reporterThread.join()
Expand Down Expand Up @@ -280,9 +293,42 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}

/**
* Clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
logError("Staging directory is null")
return
}
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}

// The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ExecutorLauncher) extends Runnable {

def run() {
logInfo("AppMaster received a signal.")
// we need to clean up staging dir before HDFS is shut down
// make sure we don't delete it until this is the last AM
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}

}

object ExecutorLauncher {
object ExecutorLauncher extends Logging {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
SparkHadoopUtil.get.runAsSparkUser { () =>
Expand Down