diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f8aac3008cef..fded8475a091 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -265,19 +265,16 @@ private[deploy] class Master( val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. - if (!normalExit) { - if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { - schedule() - } else { - val execs = appInfo.executors.values - if (!execs.exists(_.state == ExecutorState.RUNNING)) { - logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + - s"${appInfo.retryCount} times; removing it") - removeApplication(appInfo, ApplicationState.FAILED) - } + if (!normalExit && appInfo.incrementRetryCount() >= ApplicationState.MAX_NUM_RETRY) { + val execs = appInfo.executors.values + if (!execs.exists(_.state == ExecutorState.RUNNING)) { + logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + + s"${appInfo.retryCount} times; removing it") + removeApplication(appInfo, ApplicationState.FAILED) } } } + schedule() case None => logWarning(s"Got status update for unknown executor $appId/$execId") } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 0515e6e3a631..6beae842b04d 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -134,6 +134,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } } + test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") { + // Ensures that if a task which causes the JVM to exit with a zero exit code will cause the + // Spark job to eventually fail. + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } + } + assert(thrown.getClass === classOf[SparkException]) + assert(thrown.getMessage.contains("failed 4 times")) + } + // Check that the cluster is still usable: + sc.parallelize(1 to 10).count() + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache()