Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -986,13 +986,15 @@ private[spark] class Client(
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
* @param interval How often to poll the YARN RM for application status (in ms).
* @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.get(REPORT_INTERVAL)
logApplicationReport: Boolean = true,
interval: Long = sparkConf.get(REPORT_INTERVAL)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add new param to method description

(YarnApplicationState, FinalApplicationStatus) = {
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ package object config {
.stringConf
.createOptional

/* Cluster-mode launcher configuration. */
/* Launcher configuration. */

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
Expand All @@ -136,10 +136,16 @@ package object config {
.createWithDefault(true)

private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.doc("Interval between reports of the current app status.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL =
ConfigBuilder("spark.yarn.clientLaunchMonitorInterval")
.doc("Interval between requests for status the client mode AM when starting the app.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

/* Shared Client-mode AM / Driver configuration. */

private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
Expand Down Expand Up @@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend(
* This assumes both `client` and `appId` have already been set.
*/
private def waitForApplication(): Unit = {
val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)

assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
interval = monitorInterval) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Expand Down