From 568f46f6cd4ed38ddf8a018d8d532f9be2228045 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 1 Apr 2015 13:50:25 +0800 Subject: [PATCH 1/7] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication --- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 8abdc26b43806..f9b9d0e8ee88d 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -128,14 +128,7 @@ private[spark] class YarnClientSchedulerBackend( val t = new Thread { override def run() { while (!stopping) { - var state: YarnApplicationState = null - try { - val report = client.getApplicationReport(appId) - state = report.getYarnApplicationState() - } catch { - case e: ApplicationNotFoundException => - state = YarnApplicationState.KILLED - } + val (state, _) = client.monitorApplication(appId, logApplicationReport = false) if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED || state == YarnApplicationState.FAILED) { @@ -143,7 +136,6 @@ private[spark] class YarnClientSchedulerBackend( sc.stop() stopping = true } - Thread.sleep(1000L) } Thread.currentThread().interrupt() } From 6b47ff7c21daf0db42e9a7f3233daf90bb70ee63 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 1 Apr 2015 14:17:14 +0800 Subject: [PATCH 2/7] Update code --- .../cluster/YarnClientSchedulerBackend.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f9b9d0e8ee88d..3dd85d0d97b02 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -127,15 +127,11 @@ private[spark] class YarnClientSchedulerBackend( assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { - while (!stopping) { - val (state, _) = client.monitorApplication(appId, logApplicationReport = false) - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.KILLED || - state == YarnApplicationState.FAILED) { - logError(s"Yarn application has already exited with state $state!") - sc.stop() - stopping = true - } + val (state, _) = client.monitorApplication(appId, logApplicationReport = false) + if (!stopping) { + logError(s"Yarn application has already exited with state $state!") + sc.stop() + stopping = true } Thread.currentThread().interrupt() } From 6483a2acb99fd44821fff6e668daf44c8133f2d5 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 3 Apr 2015 11:15:33 +0800 Subject: [PATCH 3/7] Catch exception --- .../org/apache/spark/deploy/yarn/Client.scala | 91 ++++++++++--------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 61f8fc3f5a014..061426960677e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} @@ -559,50 +560,56 @@ private[spark] class Client( var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) - val report = getApplicationReport(appId) - val state = report.getYarnApplicationState - - if (logApplicationReport) { - logInfo(s"Application report for $appId (state: $state)") - val details = Seq[(String, String)]( - ("client token", getClientToken(report)), - ("diagnostics", report.getDiagnostics), - ("ApplicationMaster host", report.getHost), - ("ApplicationMaster RPC port", report.getRpcPort.toString), - ("queue", report.getQueue), - ("start time", report.getStartTime.toString), - ("final status", report.getFinalApplicationStatus.toString), - ("tracking URL", report.getTrackingUrl), - ("user", report.getUser) - ) - - // Use more loggable format if value is null or empty - val formattedDetails = details - .map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" } - .mkString("") - - // If DEBUG is enabled, log report details every iteration - // Otherwise, log them every time the application changes state - if (log.isDebugEnabled) { - logDebug(formattedDetails) - } else if (lastState != state) { - logInfo(formattedDetails) + try { + val report = getApplicationReport(appId) + val state = report.getYarnApplicationState + + if (logApplicationReport) { + logInfo(s"Application report for $appId (state: $state)") + val details = Seq[(String, String)]( + ("client token", getClientToken(report)), + ("diagnostics", report.getDiagnostics), + ("ApplicationMaster host", report.getHost), + ("ApplicationMaster RPC port", report.getRpcPort.toString), + ("queue", report.getQueue), + ("start time", report.getStartTime.toString), + ("final status", report.getFinalApplicationStatus.toString), + ("tracking URL", report.getTrackingUrl), + ("user", report.getUser) + ) + + // Use more loggable format if value is null or empty + val formattedDetails = details + .map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" } + .mkString("") + + // If DEBUG is enabled, log report details every iteration + // Otherwise, log them every time the application changes state + if (log.isDebugEnabled) { + logDebug(formattedDetails) + } else if (lastState != state) { + logInfo(formattedDetails) + } } - } - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return (state, report.getFinalApplicationStatus) - } + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + return (state, report.getFinalApplicationStatus) + } - if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return (state, report.getFinalApplicationStatus) - } + if (returnOnRunning && state == YarnApplicationState.RUNNING) { + return (state, report.getFinalApplicationStatus) + } - lastState = state + lastState = state + } catch { + case e: ApplicationNotFoundException => + logError(s"Application $appId not found.") + return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + } } // Never reached, but keeps compiler happy @@ -809,7 +816,9 @@ object Client extends Logging { } } addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env) - populateHadoopClasspath(conf, env) + if (sparkConf.getBoolean("spark.yarn.includeClusterHadoopClasspath", true)) { + populateHadoopClasspath(conf, env) + } sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env)) } From ee2b2fdbbff930f5cad423ad6088ee3f2bed0379 Mon Sep 17 00:00:00 2001 From: Sephiroth-Lin Date: Fri, 3 Apr 2015 11:41:09 +0800 Subject: [PATCH 4/7] update --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 061426960677e..6086009f00cd4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -816,9 +816,7 @@ object Client extends Logging { } } addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env) - if (sparkConf.getBoolean("spark.yarn.includeClusterHadoopClasspath", true)) { - populateHadoopClasspath(conf, env) - } + populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env)) } From aaacb42e0a8d3851da3283ade68fdbd39e3b0f92 Mon Sep 17 00:00:00 2001 From: Sephiroth-Lin Date: Sun, 5 Apr 2015 21:32:19 +0800 Subject: [PATCH 5/7] don't wrap the entire block in the try --- .../org/apache/spark/deploy/yarn/Client.scala | 91 ++++++++++--------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6086009f00cd4..27e20042279c4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -560,56 +560,57 @@ private[spark] class Client( var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) - try { - val report = getApplicationReport(appId) - val state = report.getYarnApplicationState - - if (logApplicationReport) { - logInfo(s"Application report for $appId (state: $state)") - val details = Seq[(String, String)]( - ("client token", getClientToken(report)), - ("diagnostics", report.getDiagnostics), - ("ApplicationMaster host", report.getHost), - ("ApplicationMaster RPC port", report.getRpcPort.toString), - ("queue", report.getQueue), - ("start time", report.getStartTime.toString), - ("final status", report.getFinalApplicationStatus.toString), - ("tracking URL", report.getTrackingUrl), - ("user", report.getUser) - ) - - // Use more loggable format if value is null or empty - val formattedDetails = details - .map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" } - .mkString("") - - // If DEBUG is enabled, log report details every iteration - // Otherwise, log them every time the application changes state - if (log.isDebugEnabled) { - logDebug(formattedDetails) - } else if (lastState != state) { - logInfo(formattedDetails) - } + val report: ApplicationReport = + try { + getApplicationReport(appId) + } catch { + case e: ApplicationNotFoundException => + logError(s"Application $appId not found.") + return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) } - - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return (state, report.getFinalApplicationStatus) + val state = report.getYarnApplicationState + + if (logApplicationReport) { + logInfo(s"Application report for $appId (state: $state)") + val details = Seq[(String, String)]( + ("client token", getClientToken(report)), + ("diagnostics", report.getDiagnostics), + ("ApplicationMaster host", report.getHost), + ("ApplicationMaster RPC port", report.getRpcPort.toString), + ("queue", report.getQueue), + ("start time", report.getStartTime.toString), + ("final status", report.getFinalApplicationStatus.toString), + ("tracking URL", report.getTrackingUrl), + ("user", report.getUser) + ) + + // Use more loggable format if value is null or empty + val formattedDetails = details + .map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" } + .mkString("") + + // If DEBUG is enabled, log report details every iteration + // Otherwise, log them every time the application changes state + if (log.isDebugEnabled) { + logDebug(formattedDetails) + } else if (lastState != state) { + logInfo(formattedDetails) } + } - if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return (state, report.getFinalApplicationStatus) - } + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + return (state, report.getFinalApplicationStatus) + } - lastState = state - } catch { - case e: ApplicationNotFoundException => - logError(s"Application $appId not found.") - return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + if (returnOnRunning && state == YarnApplicationState.RUNNING) { + return (state, report.getFinalApplicationStatus) } + + lastState = state } // Never reached, but keeps compiler happy From d4298a19f7a7345b37c3f2fcb9f0d5b1abfb2583 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 7 Apr 2015 10:29:27 +0800 Subject: [PATCH 6/7] Unused, don't push --- .../org/apache/spark/deploy/yarn/Client.scala | 90 +++++++++---------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 061426960677e..b33a1ada73289 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} @@ -560,56 +559,51 @@ private[spark] class Client( var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) - try { - val report = getApplicationReport(appId) - val state = report.getYarnApplicationState - - if (logApplicationReport) { - logInfo(s"Application report for $appId (state: $state)") - val details = Seq[(String, String)]( - ("client token", getClientToken(report)), - ("diagnostics", report.getDiagnostics), - ("ApplicationMaster host", report.getHost), - ("ApplicationMaster RPC port", report.getRpcPort.toString), - ("queue", report.getQueue), - ("start time", report.getStartTime.toString), - ("final status", report.getFinalApplicationStatus.toString), - ("tracking URL", report.getTrackingUrl), - ("user", report.getUser) - ) - - // Use more loggable format if value is null or empty - val formattedDetails = details - .map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" } - .mkString("") - - // If DEBUG is enabled, log report details every iteration - // Otherwise, log them every time the application changes state - if (log.isDebugEnabled) { - logDebug(formattedDetails) - } else if (lastState != state) { - logInfo(formattedDetails) - } + val report = getApplicationReport(appId) + val state = report.getYarnApplicationState + + if (logApplicationReport) { + logInfo(s"Application report for $appId (state: $state)") + val details = Seq[(String, String)]( + ("client token", getClientToken(report)), + ("diagnostics", report.getDiagnostics), + ("ApplicationMaster host", report.getHost), + ("ApplicationMaster RPC port", report.getRpcPort.toString), + ("queue", report.getQueue), + ("start time", report.getStartTime.toString), + ("final status", report.getFinalApplicationStatus.toString), + ("tracking URL", report.getTrackingUrl), + ("user", report.getUser) + ) + + // Use more loggable format if value is null or empty + val formattedDetails = details + .map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" } - - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return (state, report.getFinalApplicationStatus) + .mkString("") + + // If DEBUG is enabled, log report details every iteration + // Otherwise, log them every time the application changes state + if (log.isDebugEnabled) { + logDebug(formattedDetails) + } else if (lastState != state) { + logInfo(formattedDetails) } + } - if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return (state, report.getFinalApplicationStatus) - } + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + return (state, report.getFinalApplicationStatus) + } - lastState = state - } catch { - case e: ApplicationNotFoundException => - logError(s"Application $appId not found.") - return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + if (returnOnRunning && state == YarnApplicationState.RUNNING) { + return (state, report.getFinalApplicationStatus) } + + lastState = state } // Never reached, but keeps compiler happy @@ -816,9 +810,7 @@ object Client extends Logging { } } addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env) - if (sparkConf.getBoolean("spark.yarn.includeClusterHadoopClasspath", true)) { - populateHadoopClasspath(conf, env) - } + populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env)) } From 52b29fee2d776320c9d6d27436d0ac241fa3f82e Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 8 Apr 2015 10:55:03 +0800 Subject: [PATCH 7/7] Interrupt thread when we call stop() --- .../org/apache/spark/deploy/yarn/Client.scala | 10 +++++++++- .../cluster/YarnClientSchedulerBackend.scala | 18 ++++++++---------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b33a1ada73289..5940a47e04ebd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} @@ -559,7 +560,14 @@ private[spark] class Client( var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) - val report = getApplicationReport(appId) + val report: ApplicationReport = + try { + getApplicationReport(appId) + } catch { + case e: ApplicationNotFoundException => + logError(s"Application $appId not found.") + return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + } val state = report.getYarnApplicationState if (logApplicationReport) { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 3dd85d0d97b02..407dc1ac4d37d 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend( private var client: Client = null private var appId: ApplicationId = null - @volatile private var stopping: Boolean = false + private var monitorThread: Thread = null /** * Create a Yarn client to submit an application to the ResourceManager. @@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) appId = client.submitApplication() waitForApplication() - asyncMonitorApplication() + monitorThread = asyncMonitorApplication() + monitorThread.start() } /** @@ -123,22 +124,19 @@ private[spark] class YarnClientSchedulerBackend( * If the application has exited for any reason, stop the SparkContext. * This assumes both `client` and `appId` have already been set. */ - private def asyncMonitorApplication(): Unit = { + private def asyncMonitorApplication(): Thread = { assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { val (state, _) = client.monitorApplication(appId, logApplicationReport = false) - if (!stopping) { - logError(s"Yarn application has already exited with state $state!") - sc.stop() - stopping = true - } + logError(s"Yarn application has already exited with state $state!") + sc.stop() Thread.currentThread().interrupt() } } t.setName("Yarn application state monitor") t.setDaemon(true) - t.start() + t } /** @@ -146,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend( */ override def stop() { assert(client != null, "Attempted to stop this scheduler before starting it!") - stopping = true + monitorThread.interrupt() super.stop() client.stop() logInfo("Stopped")