diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ceda8a3ae240..c9e68c3bfd05 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -133,9 +133,8 @@ To use a custom metrics.properties for the application master and executors, upd
spark.yarn.am.waitTime |
100s |
- In cluster mode, time for the YARN Application Master to wait for the
- SparkContext to be initialized. In client mode, time for the YARN Application Master to wait
- for the driver to connect to it.
+ Only used in cluster mode. Time for the YARN Application Master to wait for the
+ SparkContext to be initialized.
|
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 595077e7e809..3d6ee50b070a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -346,7 +346,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
synchronized {
if (!finished) {
val inShutdown = ShutdownHookManager.inShutdown()
- if (registered) {
+ if (registered || !isClusterMode) {
exitCode = code
finalStatus = status
} else {
@@ -389,37 +389,40 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
private def registerAM(
+ host: String,
+ port: Int,
_sparkConf: SparkConf,
- _rpcEnv: RpcEnv,
- driverRef: RpcEndpointRef,
- uiAddress: Option[String]) = {
+ uiAddress: Option[String]): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
- val driverUrl = RpcEndpointAddress(
- _sparkConf.get("spark.driver.host"),
- _sparkConf.get("spark.driver.port").toInt,
+ client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
+ registered = true
+ }
+
+ private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
+ val appId = client.getAttemptId().getApplicationId().toString()
+ val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
- val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
- val executorCores = sparkConf.get(EXECUTOR_CORES)
- val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "",
+ val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
+ val executorCores = _sparkConf.get(EXECUTOR_CORES)
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "",
"", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
- allocator = client.register(driverUrl,
- driverRef,
+ allocator = client.createAllocator(
yarnConf,
_sparkConf,
- uiAddress,
- historyAddress,
+ driverUrl,
+ driverRef,
securityMgr,
localResources)
@@ -434,15 +437,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
reporterThread = launchReporterThread()
}
- /**
- * @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
- */
- private def createSchedulerRef(host: String, port: String): RpcEndpointRef = {
- rpcEnv.setupEndpointRef(
- RpcAddress(host, port.toInt),
- YarnSchedulerBackend.ENDPOINT_NAME)
- }
-
private def runDriver(): Unit = {
addAmIpFilter(None)
userClassThread = startUserApplication()
@@ -456,11 +450,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
- val driverRef = createSchedulerRef(
- sc.getConf.get("spark.driver.host"),
- sc.getConf.get("spark.driver.port"))
- registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl))
- registered = true
+
+ val userConf = sc.getConf
+ val host = userConf.get("spark.driver.host")
+ val port = userConf.get("spark.driver.port").toInt
+ registerAM(host, port, userConf, sc.ui.map(_.webUrl))
+
+ val driverRef = rpcEnv.setupEndpointRef(
+ RpcAddress(host, port),
+ YarnSchedulerBackend.ENDPOINT_NAME)
+ createAllocator(driverRef, userConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
@@ -486,10 +485,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)
- val driverRef = waitForSparkDriver()
+
+ // The client-mode AM doesn't listen for incoming connections, so report an invalid port.
+ registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))
+
+ // The driver should be up and listening, so unlike cluster mode, just try to connect to it
+ // with no waiting or retrying.
+ val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
+ val driverRef = rpcEnv.setupEndpointRef(
+ RpcAddress(driverHost, driverPort),
+ YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
- registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"))
- registered = true
+ createAllocator(driverRef, sparkConf)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
@@ -600,40 +607,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}
- private def waitForSparkDriver(): RpcEndpointRef = {
- logInfo("Waiting for Spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-
- // Spark driver should already be up since it launched us, but we don't want to
- // wait forever, so wait 100 seconds max to match the cluster mode setting.
- val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
- val deadline = System.currentTimeMillis + totalWaitTimeMs
-
- while (!driverUp && !finished && System.currentTimeMillis < deadline) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at %s:%s, retrying ...".
- format(driverHost, driverPort))
- Thread.sleep(100L)
- }
- }
-
- if (!driverUp) {
- throw new SparkException("Failed to connect to driver!")
- }
-
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
- createSchedulerRef(driverHost, driverPort.toString)
- }
-
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bafb129032b4..8da9cd51cd12 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1019,8 +1019,7 @@ private[spark] class Client(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true,
- interval: Long = sparkConf.get(REPORT_INTERVAL)):
- (YarnApplicationState, FinalApplicationStatus) = {
+ interval: Long = sparkConf.get(REPORT_INTERVAL)): YarnAppReport = {
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
@@ -1031,11 +1030,13 @@ private[spark] class Client(
case e: ApplicationNotFoundException =>
logError(s"Application $appId not found.")
cleanupStagingDir(appId)
- return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+ return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None)
case NonFatal(e) =>
- logError(s"Failed to contact YARN for application $appId.", e)
+ val msg = s"Failed to contact YARN for application $appId."
+ logError(msg, e)
// Don't necessarily clean up staging dir because status is unknown
- return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
+ return YarnAppReport(YarnApplicationState.FAILED, FinalApplicationStatus.FAILED,
+ Some(msg))
}
val state = report.getYarnApplicationState
@@ -1073,14 +1074,14 @@ private[spark] class Client(
}
if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
cleanupStagingDir(appId)
- return (state, report.getFinalApplicationStatus)
+ return createAppReport(report)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
- return (state, report.getFinalApplicationStatus)
+ return createAppReport(report)
}
lastState = state
@@ -1129,16 +1130,17 @@ private[spark] class Client(
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
- val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
- if (yarnApplicationState == YarnApplicationState.FAILED ||
- finalApplicationStatus == FinalApplicationStatus.FAILED) {
+ val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
+ if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
+ diags.foreach { err =>
+ logError(s"Application diagnostics message: $err")
+ }
throw new SparkException(s"Application $appId finished with failed status")
}
- if (yarnApplicationState == YarnApplicationState.KILLED ||
- finalApplicationStatus == FinalApplicationStatus.KILLED) {
+ if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
- if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+ if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
@@ -1477,6 +1479,12 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}
+ def createAppReport(report: ApplicationReport): YarnAppReport = {
+ val diags = report.getDiagnostics()
+ val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
+ YarnAppReport(report.getYarnApplicationState(), report.getFinalApplicationStatus(), diagsOpt)
+ }
+
}
private[spark] class YarnClusterApplication extends SparkApplication {
@@ -1491,3 +1499,8 @@ private[spark] class YarnClusterApplication extends SparkApplication {
}
}
+
+private[spark] case class YarnAppReport(
+ appState: YarnApplicationState,
+ finalState: FinalApplicationStatus,
+ diagnostics: Option[String])
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 17234b120ae1..b59dcf158d87 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -42,23 +42,20 @@ private[spark] class YarnRMClient extends Logging {
/**
* Registers the application master with the RM.
*
+ * @param driverHost Host name where driver is running.
+ * @param driverPort Port where driver is listening.
* @param conf The Yarn configuration.
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
- * @param securityMgr The security manager.
- * @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
- driverUrl: String,
- driverRef: RpcEndpointRef,
+ driverHost: String,
+ driverPort: Int,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
- uiHistoryAddress: String,
- securityMgr: SecurityManager,
- localResources: Map[String, LocalResource]
- ): YarnAllocator = {
+ uiHistoryAddress: String): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
@@ -70,10 +67,19 @@ private[spark] class YarnRMClient extends Logging {
logInfo("Registering the ApplicationMaster")
synchronized {
- amClient.registerApplicationMaster(driverRef.address.host, driverRef.address.port,
- trackingUrl)
+ amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
registered = true
}
+ }
+
+ def createAllocator(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ driverUrl: String,
+ driverRef: RpcEndpointRef,
+ securityMgr: SecurityManager,
+ localResources: Map[String, LocalResource]): YarnAllocator = {
+ require(registered, "Must register AM before creating allocator.")
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}
@@ -88,6 +94,9 @@ private[spark] class YarnRMClient extends Logging {
if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
+ if (amClient != null) {
+ amClient.stop()
+ }
}
/** Returns the attempt ID. */
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 06e54a2eaf95..f1a8df00f9c5 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkAppHandle
@@ -75,13 +75,23 @@ private[spark] class YarnClientSchedulerBackend(
val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)
assert(client != null && appId.isDefined, "Application has not been submitted yet!")
- val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
- interval = monitorInterval) // blocking
+ val YarnAppReport(state, _, diags) = client.monitorApplication(appId.get,
+ returnOnRunning = true, interval = monitorInterval)
if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application has already ended! " +
- "It might have been killed or unable to launch application master.")
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ val genericMessage = "The YARN application has already ended! " +
+ "It might have been killed or the Application Master may have failed to start. " +
+ "Check the YARN application logs for more details."
+ val exceptionMsg = diags match {
+ case Some(msg) =>
+ logError(genericMessage)
+ msg
+
+ case None =>
+ genericMessage
+ }
+ throw new SparkException(exceptionMsg)
}
if (state == YarnApplicationState.RUNNING) {
logInfo(s"Application ${appId.get} has started running.")
@@ -100,8 +110,13 @@ private[spark] class YarnClientSchedulerBackend(
override def run() {
try {
- val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false)
- logError(s"Yarn application has already exited with state $state!")
+ val YarnAppReport(_, state, diags) =
+ client.monitorApplication(appId.get, logApplicationReport = true)
+ logError(s"YARN application has exited unexpectedly with state $state! " +
+ "Check the YARN application logs for more details.")
+ diags.foreach { err =>
+ logError(s"Diagnostics message: $err")
+ }
allowInterrupt = false
sc.stop()
} catch {
@@ -124,7 +139,7 @@ private[spark] class YarnClientSchedulerBackend(
private def asyncMonitorApplication(): MonitorThread = {
assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val t = new MonitorThread
- t.setName("Yarn application state monitor")
+ t.setName("YARN application state monitor")
t.setDaemon(true)
t
}