Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[spark] class YarnClientSchedulerBackend(

private var client: Client = null
private var appId: ApplicationId = null
private var monitorThread: Thread = null
private var monitorThread: MonitorThread = null

/**
* Create a Yarn client to submit an application to the ResourceManager.
Expand Down Expand Up @@ -131,24 +131,42 @@ private[spark] class YarnClientSchedulerBackend(
}
}

/**
* We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
* because the SparkContext is being shut down(sc.stop() called by user code), but if
* monitorApplication return, it means the Yarn application finished before sc.stop() was called,
* which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
* before SparkContext stops successfully.
*/
private class MonitorThread extends Thread {
private var allowInterrupt = true

override def run() {
try {
val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
allowInterrupt = false
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor thread")
}
}

def stopMonitor(): Unit = {
if (allowInterrupt) {
this.interrupt()
}
}
}

/**
* Monitor the application state in a separate thread.
* If the application has exited for any reason, stop the SparkContext.
* This assumes both `client` and `appId` have already been set.
*/
private def asyncMonitorApplication(): Thread = {
private def asyncMonitorApplication(): MonitorThread = {
assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
try {
val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor thread")
}
}
}
val t = new MonitorThread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's tidier. Now that it's its own named class, name and daemon status can be set by the class itself I think.

t.setName("Yarn application state monitor")
t.setDaemon(true)
t
Expand All @@ -160,7 +178,7 @@ private[spark] class YarnClientSchedulerBackend(
override def stop() {
assert(client != null, "Attempted to stop this scheduler before starting it!")
if (monitorThread != null) {
monitorThread.interrupt()
monitorThread.stopMonitor()
}
super.stop()
client.stop()
Expand All @@ -174,5 +192,4 @@ private[spark] class YarnClientSchedulerBackend(
super.applicationId
}
}

}