Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@ To use a custom metrics.properties for the application master and executors, upd
<td><code>spark.yarn.am.waitTime</code></td>
<td><code>100s</code></td>
<td>
In <code>cluster</code> mode, time for the YARN Application Master to wait for the
SparkContext to be initialized. In <code>client</code> mode, time for the YARN Application Master to wait
for the driver to connect to it.
Only used in <code>cluster</code> mode. Time for the YARN Application Master to wait for the
SparkContext to be initialized.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
synchronized {
if (!finished) {
val inShutdown = ShutdownHookManager.inShutdown()
if (registered) {
if (registered || !isClusterMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need to add non-cluster mode check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because otherwise the client mode AM will exit with "EXIT_SC_NOT_INITED" in certain cases, which doesn't really make a lot of sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explain.

exitCode = code
finalStatus = status
} else {
Expand Down Expand Up @@ -389,37 +389,40 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

private def registerAM(
host: String,
port: Int,
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: Option[String]) = {
uiAddress: Option[String]): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

val driverUrl = RpcEndpointAddress(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
registered = true
}

private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of separating into two methods? Sorry I cannot get the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in the PR description. YARN will create a non-helpful error message if an error happens before the AM is registered. This moves registration of the AM to an earlier spot.

val appId = client.getAttemptId().getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}

allocator = client.register(driverUrl,
driverRef,
allocator = client.createAllocator(
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
driverUrl,
driverRef,
securityMgr,
localResources)

Expand All @@ -434,15 +437,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
reporterThread = launchReporterThread()
}

/**
* @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
private def createSchedulerRef(host: String, port: String): RpcEndpointRef = {
rpcEnv.setupEndpointRef(
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
}

private def runDriver(): Unit = {
addAmIpFilter(None)
userClassThread = startUserApplication()
Expand All @@ -456,11 +450,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val driverRef = createSchedulerRef(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"))
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl))
registered = true

val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
registerAM(host, port, userConf, sc.ui.map(_.webUrl))

val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
createAllocator(driverRef, userConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
Expand All @@ -486,10 +485,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)
val driverRef = waitForSparkDriver()

// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))

// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"))
registered = true
createAllocator(driverRef, sparkConf)

// In client mode the actor will stop the reporter thread.
reporterThread.join()
Expand Down Expand Up @@ -600,40 +607,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

private def waitForSparkDriver(): RpcEndpointRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport)

// Spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
val deadline = System.currentTimeMillis + totalWaitTimeMs

while (!driverUp && !finished && System.currentTimeMillis < deadline) {
try {
val socket = new Socket(driverHost, driverPort)
socket.close()
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to driver at %s:%s, retrying ...".
format(driverHost, driverPort))
Thread.sleep(100L)
}
}

if (!driverUp) {
throw new SparkException("Failed to connect to driver!")
}

sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
createSchedulerRef(driverHost, driverPort.toString)
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,7 @@ private[spark] class Client(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true,
interval: Long = sparkConf.get(REPORT_INTERVAL)):
(YarnApplicationState, FinalApplicationStatus) = {
interval: Long = sparkConf.get(REPORT_INTERVAL)): YarnAppReport = {
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
Expand All @@ -1031,11 +1030,13 @@ private[spark] class Client(
case e: ApplicationNotFoundException =>
logError(s"Application $appId not found.")
cleanupStagingDir(appId)
return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None)
case NonFatal(e) =>
logError(s"Failed to contact YARN for application $appId.", e)
val msg = s"Failed to contact YARN for application $appId."
logError(msg, e)
// Don't necessarily clean up staging dir because status is unknown
return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
return YarnAppReport(YarnApplicationState.FAILED, FinalApplicationStatus.FAILED,
Some(msg))
}
val state = report.getYarnApplicationState

Expand Down Expand Up @@ -1073,14 +1074,14 @@ private[spark] class Client(
}

if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is here 4 space or 2 space indent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuation lines of conditions are generally double-indented (to clearly separate them from the rest of the code).

cleanupStagingDir(appId)
return (state, report.getFinalApplicationStatus)
return createAppReport(report)
}

if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
return createAppReport(report)
}

lastState = state
Expand Down Expand Up @@ -1129,16 +1130,17 @@ private[spark] class Client(
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach { err =>
logError(s"Application diagnostics message: $err")
}
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
Expand Down Expand Up @@ -1477,6 +1479,12 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}

def createAppReport(report: ApplicationReport): YarnAppReport = {
val diags = report.getDiagnostics()
val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
YarnAppReport(report.getYarnApplicationState(), report.getFinalApplicationStatus(), diagsOpt)
}

}

private[spark] class YarnClusterApplication extends SparkApplication {
Expand All @@ -1491,3 +1499,8 @@ private[spark] class YarnClusterApplication extends SparkApplication {
}

}

private[spark] case class YarnAppReport(
appState: YarnApplicationState,
finalState: FinalApplicationStatus,
diagnostics: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,20 @@ private[spark] class YarnRMClient extends Logging {
/**
* Registers the application master with the RM.
*
* @param driverHost Host name where driver is running.
* @param driverPort Port where driver is listening.
* @param conf The Yarn configuration.
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
* @param securityMgr The security manager.
* @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
driverHost: String,
driverPort: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @vanzin during our internal porting, we found this parameter is misleading.

It should be amHost and amRpcPort to be more accurate.
When running on client mode, the value passed here is ApplicationMaster rather than driver.
Do you think it's worth another Jira to resolve this issue?

conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
uiHistoryAddress: String): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
Expand All @@ -70,10 +67,19 @@ private[spark] class YarnRMClient extends Logging {

logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(driverRef.address.host, driverRef.address.port,
trackingUrl)
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
registered = true
}
}

def createAllocator(
conf: YarnConfiguration,
sparkConf: SparkConf,
driverUrl: String,
driverRef: RpcEndpointRef,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]): YarnAllocator = {
require(registered, "Must register AM before creating allocator.")
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}
Expand All @@ -88,6 +94,9 @@ private[spark] class YarnRMClient extends Logging {
if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
if (amClient != null) {
amClient.stop()
}
}

/** Returns the attempt ID. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.yarn.api.records.YarnApplicationState

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkAppHandle
Expand Down Expand Up @@ -75,13 +75,23 @@ private[spark] class YarnClientSchedulerBackend(
val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)

assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
interval = monitorInterval) // blocking
val YarnAppReport(state, _, diags) = client.monitorApplication(appId.get,
returnOnRunning = true, interval = monitorInterval)
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
throw new SparkException("Yarn application has already ended! " +
"It might have been killed or unable to launch application master.")
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
val genericMessage = "The YARN application has already ended! " +
"It might have been killed or the Application Master may have failed to start. " +
"Check the YARN application logs for more details."
val exceptionMsg = diags match {
case Some(msg) =>
logError(genericMessage)
msg

case None =>
genericMessage
}
throw new SparkException(exceptionMsg)
}
if (state == YarnApplicationState.RUNNING) {
logInfo(s"Application ${appId.get} has started running.")
Expand All @@ -100,8 +110,13 @@ private[spark] class YarnClientSchedulerBackend(

override def run() {
try {
val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
val YarnAppReport(_, state, diags) =
client.monitorApplication(appId.get, logApplicationReport = true)
logError(s"YARN application has exited unexpectedly with state $state! " +
"Check the YARN application logs for more details.")
diags.foreach { err =>
logError(s"Diagnostics message: $err")
}
allowInterrupt = false
sc.stop()
} catch {
Expand All @@ -124,7 +139,7 @@ private[spark] class YarnClientSchedulerBackend(
private def asyncMonitorApplication(): MonitorThread = {
assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val t = new MonitorThread
t.setName("Yarn application state monitor")
t.setName("YARN application state monitor")
t.setDaemon(true)
t
}
Expand Down