Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
println(s"... waiting before polling master for driver state")
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
} else {
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
logError("Application submission failed" + failMessage)
logError(s"Application submission failed$failMessage")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like its missing a space between the failed and message but then again it wasn't here before. What does this message look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space is actually added in the map operation:
.map { ": " + _ }

I hadn't check what the message looks like before. Think it is not a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it should be fine

}
}

Expand Down
9 changes: 9 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
It should be no larger than the global number of max attempts in the YARN configuration.
</td>
</tr>
<tr>
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of making this configurable to provide backward compatibility? I wonder if the new behavior should just be the default; I can't imagine someone actually going out of her way to set this to false explicitly. On the other hand people may rely on the existing behavior to detect when the application has finished, but this is somewhat of an arbitrary behavior of Spark for the user to rely on. What do others think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the intention was for backwards compatibility. As you say if they are relying on the current behavior in other scripting it would now break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is that it would be fairly reasonable for users to have relied on the current behavior, so we shouldn't break it.

<td>true</td>
<td>
In YARN cluster mode, controls whether the client waits to exit until the application completes.
If set to true, the client process will stay alive reporting the application's status.
Otherwise, the client process will exit after submission.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
83 changes: 50 additions & 33 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ private[spark] class Client(
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)


def stop(): Unit = yarnClient.stop()
Expand Down Expand Up @@ -564,31 +566,13 @@ private[spark] class Client(

if (logApplicationReport) {
logInfo(s"Application report for $appId (state: $state)")
val details = Seq[(String, String)](
("client token", getClientToken(report)),
("diagnostics", report.getDiagnostics),
("ApplicationMaster host", report.getHost),
("ApplicationMaster RPC port", report.getRpcPort.toString),
("queue", report.getQueue),
("start time", report.getStartTime.toString),
("final status", report.getFinalApplicationStatus.toString),
("tracking URL", report.getTrackingUrl),
("user", report.getUser)
)

// Use more loggable format if value is null or empty
val formattedDetails = details
.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue" }
.mkString("")

// If DEBUG is enabled, log report details every iteration
// Otherwise, log them every time the application changes state
if (log.isDebugEnabled) {
logDebug(formattedDetails)
logDebug(formatReportDetails(report))
} else if (lastState != state) {
logInfo(formattedDetails)
logInfo(formatReportDetails(report))
}
}

Expand All @@ -609,24 +593,57 @@ private[spark] class Client(
throw new SparkException("While loop is depleted! This should never happen...")
}

private def formatReportDetails(report: ApplicationReport): String = {
val details = Seq[(String, String)](
("client token", getClientToken(report)),
("diagnostics", report.getDiagnostics),
("ApplicationMaster host", report.getHost),
("ApplicationMaster RPC port", report.getRpcPort.toString),
("queue", report.getQueue),
("start time", report.getStartTime.toString),
("final status", report.getFinalApplicationStatus.toString),
("tracking URL", report.getTrackingUrl),
("user", report.getUser)
)

// Use more loggable format if value is null or empty
details.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue"
}.mkString("")
}

/**
* Submit an application to the ResourceManager and monitor its state.
* This continues until the application has exited for any reason.
* Submit an application to the ResourceManager.
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
* reporting the application's status until the application has exited for any reason.
* Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException("Application finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException("Application is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException("The final status of application is undefined")
val appId = submitApplication()
if (fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
}
Expand Down