From 636fd782c9b18840d5fabb59f625993ad5d73fc5 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 3 Dec 2015 15:51:34 +0530 Subject: [PATCH 1/3] Spark on Yarn handle AM being told command from RM When RM throws ApplicationAttemptNotFoundException for allocate invocation, making the ApplicationMaster to finish immediately without any retries. --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 50ae7ffeec4c..507f90cab947 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.spark.rpc._ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, @@ -370,6 +371,12 @@ private[spark] class ApplicationMaster( failureCount = 0 } catch { case i: InterruptedException => + case a: ApplicationAttemptNotFoundException => { + val message = "ApplicationAttemptNotFoundException was thrown from Reporter thread."; + logError(message, a) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + message) + } case e: Throwable => { failureCount += 1 if (!NonFatal(e) || failureCount >= reporterMaxFailures) { From d4511812905ffc572b1dbccbaa463516f2b77280 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 8 Dec 2015 20:05:10 +0530 Subject: [PATCH 2/3] Removed referring ApplicationAttemptNotFoundException class directly and handling it as part of Throwable case. --- .../spark/deploy/yarn/ApplicationMaster.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 507f90cab947..63f508df0fd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.spark.rpc._ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, @@ -371,15 +370,16 @@ private[spark] class ApplicationMaster( failureCount = 0 } catch { case i: InterruptedException => - case a: ApplicationAttemptNotFoundException => { - val message = "ApplicationAttemptNotFoundException was thrown from Reporter thread."; - logError(message, a) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - message) - } case e: Throwable => { failureCount += 1 - if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException".equals( + e.getClass().getName())) { + val message = "ApplicationAttemptNotFoundException was thrown " + + "from Reporter thread." + logError(message, e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + message) + } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"$failureCount time(s) from Reporter thread.") From 620b33d5a606bab31a69944c85d65004c0e661ba Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 15 Dec 2015 16:49:26 +0530 Subject: [PATCH 3/3] Updated with the review comments. --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 63f508df0fd9..1ecdec734a4f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -372,13 +372,13 @@ private[spark] class ApplicationMaster( case i: InterruptedException => case e: Throwable => { failureCount += 1 - if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException".equals( - e.getClass().getName())) { - val message = "ApplicationAttemptNotFoundException was thrown " + - "from Reporter thread." - logError(message, e) + // this exception was introduced in hadoop 2.4 and this code would not compile + // with earlier versions if we refer it directly. + if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == + e.getClass().getName()) { + logError("Exception from Reporter thread.", e) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - message) + e.getMessage) } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +