diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 65a1a8fd7e929..b78d8babf3525 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -23,6 +23,8 @@ private[spark] class ApplicationDescription( val memoryPerSlave: Int, val command: Command, var appUiUrl: String, + val heartbeatInterval: Int, + val consecutiveExecutorFailuresThreshold: Int, val eventLogDir: Option[String] = None) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c46f84de8444a..142fc5bd4d2aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -101,6 +101,16 @@ private[deploy] object DeployMessages { case class MasterChangeAcknowledged(appId: String) + /** + * Periodic heartbeat from the app client to the master, used to detect driver death and to + * detect drivers that have not obtained any usable executors. + * + * @param appId the application id. + * @param hasRegisteredExecutors true if the app client has at least one registered executor, + * false otherwise. + */ + case class AppClientHeartbeat(appId: String, hasRegisteredExecutors: Boolean) + // Master to AppClient case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage @@ -136,6 +146,12 @@ private[deploy] object DeployMessages { case object StopAppClient + /** + * Message sent from the heartbeat timer thread to trigger the sending of a heartbeat to the + * Master. This is necessary for proper thread-safety + */ + case object TriggerHeartbeat + // Master to Worker & AppClient case class MasterChanged(masterUrl: String, masterWebUiUrl: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 696f32a6f5730..6bd73fa52904d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -48,7 +48,8 @@ private[spark] object JsonProtocol { ("memoryperslave" -> obj.desc.memoryPerSlave) ~ ("submitdate" -> obj.submitDate.toString) ~ ("state" -> obj.state.toString) ~ - ("duration" -> obj.duration) + ("duration" -> obj.duration) ~ + ("lastheartbeat" -> obj.lastHeartbeat) } def writeApplicationDescription(obj: ApplicationDescription) = { @@ -56,7 +57,8 @@ private[spark] object JsonProtocol { ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ ("user" -> obj.user) ~ - ("command" -> obj.command.toString) + ("command" -> obj.command.toString) ~ + ("heartbeatinterval" -> obj.heartbeatInterval) } def writeExecutorRunner(obj: ExecutorRunner) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 98a93d1fcb2a3..16cffa6bc72e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.client import java.util.concurrent.TimeoutException +import scala.collection.mutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -47,6 +48,8 @@ private[spark] class AppClient( conf: SparkConf) extends Logging { + private val heartbeatInterval = appDescription.heartbeatInterval.milliseconds + val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -56,11 +59,15 @@ private[spark] class AppClient( var registered = false var activeMasterUrl: String = null + /** Tracks the set of executors currently registered with this application. */ + private val runningExecutors = mutable.BitSet() + class ClientActor extends Actor with ActorLogReceive with Logging { var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times var registrationRetryTimer: Option[Cancellable] = None + var heartbeatTimer: Option[Cancellable] = None override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -111,6 +118,12 @@ private[spark] class AppClient( case x => throw new SparkException("Invalid spark URL: " + x) } + import context.dispatcher + // Cancel any existing heartbeat timer + heartbeatTimer.foreach(_.cancel()) + // Start a new master heartbeat timer + heartbeatTimer = Some(context.system.scheduler.schedule(heartbeatInterval, heartbeatInterval, + self, TriggerHeartbeat)) } private def isPossibleMaster(remoteUrl: Address) = { @@ -134,6 +147,7 @@ private[spark] class AppClient( val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) + runningExecutors += id listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => @@ -141,6 +155,7 @@ private[spark] class AppClient( val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { + runningExecutors -= id listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } @@ -161,6 +176,11 @@ private[spark] class AppClient( markDead("Application has been stopped.") sender ! true context.stop(self) + + case TriggerHeartbeat => + if (master != null) { + master ! AppClientHeartbeat(appId, hasRegisteredExecutors = runningExecutors.nonEmpty) + } } /** @@ -181,6 +201,7 @@ private[spark] class AppClient( } override def postStop() { + heartbeatTimer.foreach(_.cancel()) registrationRetryTimer.foreach(_.cancel()) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 88a0862b96afe..d3d7e83b7390f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,7 +49,8 @@ private[spark] object TestClient { val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, - Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored", + 10000, 10) val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationFailureDetector.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationFailureDetector.scala new file mode 100644 index 0000000000000..923e3e5411e5a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationFailureDetector.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.Logging + +/** + * This class encapsulates the Worker's logic for deciding whether to kill a Spark application + * based on executor failures. The goal of this logic is to ensure that buggy applications are + * killed in a prompt manner, while preventing applications from being killed due to buggy + * machines / executors. + * + * Thread-safety: this class is not thread-safe because it is only intended to be called + * from the Master actor. + * + * @param appName the application name + * @param appId the application id + * @param consecutiveExecutorFailuresThreshold the minimum number of consecutive executor + * failures that must occur before an application can be marked as failed + */ +private[master] class ApplicationFailureDetector( + appName: String, + appId: String, + consecutiveExecutorFailuresThreshold: Int) + extends Logging with Serializable { + + require(consecutiveExecutorFailuresThreshold > 0) + + private var consecutiveExecutorFailures = 0 + + /** + * True if the driver has reported that it has at least one registered executor, false otherwise. + */ + private var hasRegisteredExecutors: Boolean = false + + /** + * Called when an application's executor status might have changed. + */ + def updateExecutorStatus(hasRegisteredExecutors: Boolean): Unit = { + this.hasRegisteredExecutors = hasRegisteredExecutors + if (hasRegisteredExecutors) { + consecutiveExecutorFailures = 0 + } + } + + /** + * Called when an executor exits due to a failure. + */ + def onFailedExecutorExit(execId: Int): Unit = { + consecutiveExecutorFailures += 1 + } + + /** + * Ask the failure detector whether the application should be marked as failed. + * + * @return true if the application has failed, false otherwise. + */ + def isFailed: Boolean = { + if (consecutiveExecutorFailures >= consecutiveExecutorFailuresThreshold) { + if (!hasRegisteredExecutors) { + logError(s"Application $appName with ID $appId is failed because it has no executors and " + + s"there were $consecutiveExecutorFailures consecutive executor failures") + true + } else { + logWarning(s"$consecutiveExecutorFailures consecutive executor failures occurred for " + + s"application $appName with ID $appId, but not failing it because it reports that it" + + s" has at least one registered executor.") + false + } + } else { + false + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index ad7d81747c377..db9e48b259275 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -43,9 +43,13 @@ private[spark] class ApplicationInfo( @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ + @transient var lastHeartbeat: Long = _ @transient private var nextExecutorId: Int = _ + val failureDetector = + new ApplicationFailureDetector(desc.name, id, desc.consecutiveExecutorFailuresThreshold) + init() private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { @@ -61,6 +65,7 @@ private[spark] class ApplicationInfo( appSource = new ApplicationSource(this) nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorInfo] + lastHeartbeat = System.currentTimeMillis() } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -94,17 +99,6 @@ private[spark] class ApplicationInfo( def coresLeft: Int = myMaxCores - coresGranted - private var _retryCount = 0 - - def retryCount = _retryCount - - def incrementRetryCount() = { - _retryCount += 1 - _retryCount - } - - def resetRetryCount() = _retryCount = 0 - def markFinished(endState: ApplicationState.Value) { state = endState endTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala index 67e6c5d66af0e..cb5f5dec5f44a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala @@ -22,6 +22,4 @@ private[spark] object ApplicationState extends Enumeration { type ApplicationState = Value val WAITING, RUNNING, FINISHED, FAILED, UNKNOWN = Value - - val MAX_NUM_RETRY = 10 } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b32c505def9b..9aed3352a63f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -310,7 +310,6 @@ private[spark] class Master( case Some(exec) => { val appInfo = idToApp(appId) exec.state = state - if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app @@ -321,15 +320,12 @@ private[spark] class Master( val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) { - if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { - schedule() + appInfo.failureDetector.onFailedExecutorExit(execId) + if (appInfo.failureDetector.isFailed) { + logError(s"Removing failed application ${appInfo.desc.name} with ID ${appInfo.id}") + removeApplication(appInfo, ApplicationState.FAILED) } else { - val execs = appInfo.executors.values - if (!execs.exists(_.state == ExecutorState.RUNNING)) { - logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + - s"${appInfo.retryCount} times; removing it") - removeApplication(appInfo, ApplicationState.FAILED) - } + schedule() } } } @@ -364,6 +360,18 @@ private[spark] class Master( } } + case AppClientHeartbeat(appId, hasRegisteredExecutors) => { + idToApp.get(appId) match { + case Some(app) => + app.lastHeartbeat = System.currentTimeMillis() + app.failureDetector.updateExecutorStatus(hasRegisteredExecutors) + logDebug(s"Got heartbeat from app $appId " + + s"(hasRegisteredExecutors = $hasRegisteredExecutors)") + case None => + logWarning(s"Got heartbeat from unknown app $appId") + } + } + case MasterChangeAcknowledged(appId) => { idToApp.get(appId) match { case Some(app) => @@ -419,6 +427,7 @@ private[spark] class Master( case CheckForWorkerTimeOut => { timeOutDeadWorkers() + timeOutDeadApplications() } case RequestWebUIPort => { @@ -761,8 +770,21 @@ private[spark] class Master( appId } + /** Check for, and remove, any dead applications */ + def timeOutDeadApplications(): Unit = { + val currentTime = System.currentTimeMillis() + // Copy the applications into an array so we don't modify the hashset while iterating through it + val toRemove = + apps.filter(app => app.lastHeartbeat < currentTime - app.desc.heartbeatInterval).toArray + for (app <- toRemove) { + logWarning(s"Removing application ${app.desc.name} (id ${app.id}) because we got no " + + s"heartbeat in ${app.desc.heartbeatInterval / 1000.0} seconds") + removeApplication(app, ApplicationState.FAILED) + } + } + /** Check for, and remove, any timed-out workers */ - def timeOutDeadWorkers() { + def timeOutDeadWorkers(): Unit = { // Copy the workers into an array so we don't modify the hashset while iterating through it val currentTime = System.currentTimeMillis() val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 8c7de75600b5f..acefc3083f980 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -69,8 +69,12 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val heartbeatInterval = conf.getInt("spark.app.heartbeatInterval", 60000) + val consecutiveExecutorFailuresThreshold = + conf.getInt("spark.app.consecutiveExecutorFailuresThreshold", 10) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir) + appUIAddress, heartbeatInterval = heartbeatInterval, + consecutiveExecutorFailuresThreshold = consecutiveExecutorFailuresThreshold, sc.eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 3f1cd0752e766..1e6ffe327414c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -89,13 +89,14 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", 10000, 10) } def createAppInfo() : ApplicationInfo = { val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime, "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue) appInfo.endTime = JsonConstants.currTimeInMillis + appInfo.lastHeartbeat = JsonConstants.currTimeInMillis appInfo } @@ -155,9 +156,9 @@ object JsonConstants { |{"starttime":3,"id":"id","name":"name", |"cores":4,"user":"%s", |"memoryperslave":1234,"submitdate":"%s", - |"state":"WAITING","duration":%d} + |"state":"WAITING","duration":%d,"lastheartbeat":%d} """.format(System.getProperty("user.name", ""), - submitDate.toString, currTimeInMillis - appInfoStartTime).stripMargin + submitDate.toString, currTimeInMillis - appInfoStartTime, currTimeInMillis).stripMargin val workerInfoJsonStr = """ @@ -171,7 +172,8 @@ object JsonConstants { val appDescJsonStr = """ |{"name":"name","cores":4,"memoryperslave":1234, - |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} + |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())", + |"heartbeatinterval": 10000} """.format(System.getProperty("user.name", "")).stripMargin val executorRunnerJsonStr = diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ApplicationFailureDetectorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ApplicationFailureDetectorSuite.scala new file mode 100644 index 0000000000000..6c5403b104413 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/master/ApplicationFailureDetectorSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.scalatest.{FunSuite, Matchers} + +class ApplicationFailureDetectorSuite extends FunSuite with Matchers { + + test("initially, the application should not be failed") { + val failureDetector = new ApplicationFailureDetector("testApp", "testAppId", 10) + assert(!failureDetector.isFailed) + } + + test("normal operation (no executor failures)") { + val failureDetector = new ApplicationFailureDetector("testApp", "testAppId", 10) + for (execId <- 1 to 100) { + failureDetector.updateExecutorStatus(hasRegisteredExecutors = true) + assert(!failureDetector.isFailed) + } + assert(!failureDetector.isFailed) + } + + test("every other executor launch fails") { + val failureDetector = new ApplicationFailureDetector("testApp", "testAppId", 10) + for (execId <- 1 to 100) { + failureDetector.updateExecutorStatus(hasRegisteredExecutors = true) + assert(!failureDetector.isFailed) + if (execId % 2 == 0) { + failureDetector.onFailedExecutorExit(execId) + assert(!failureDetector.isFailed) + } + } + assert(!failureDetector.isFailed) + } + + test("every executor fails after launch") { + val failureDetector = new ApplicationFailureDetector("testApp", "testAppId", 10) + var failed: Boolean = false + for (execId <- 1 to 100) { + failureDetector.updateExecutorStatus(hasRegisteredExecutors = false) + failureDetector.onFailedExecutorExit(execId) + if (failureDetector.isFailed) { + failed = true + } + } + assert(failed, "Expected the application to be marked as failed") + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 196217062991e..0f7c749f6b8ea 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -31,7 +31,7 @@ class ExecutorRunnerTest extends FunSuite { val appId = "12345-worker321-9876" val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") + Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl", 10000, 10) val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)