diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 443830f8d03b6..3cac2d62ce605 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -19,7 +19,7 @@ package org.apache.spark /** * A client that communicates with the cluster manager to request or kill executors. - * This is currently supported only in YARN mode. + * This is currently supported only in YARN and Mesos coarse-grained mode. */ private[spark] trait ExecutorAllocationClient { diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9385f557c4614..47023b01acb29 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -96,7 +96,8 @@ private[spark] class ExecutorAllocationManager( // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is - // 1, but it might need to be attained differently for different cluster managers + // 1, but it might need to be attained differently for different cluster managers. + // For Mesos, see SPARK-6350, which adds a new parameter for setting this value. private val tasksPerExecutor = conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3f1a7dd99d635..ba9ba265f969d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,7 +56,7 @@ import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend} -import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseGrainedMesosSchedulerBackend, FineGrainedMesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.{SparkUI, ConsoleProgressBar} @@ -415,7 +415,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (dynamicAllocationEnabled) { assert(supportDynamicAllocation, - "Dynamic allocation of executors is currently only supported in YARN mode") + "Dynamic allocation of executors is currently only supported in YARN " + + "and Mesos coarse-grained modes") Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { None @@ -1143,10 +1144,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Return whether dynamically adjusting the amount of resources allocated to - * this application is supported. This is currently only available for YARN. + * this application is supported. This is currently only available for YARN + * and Mesos coarse-grained modes. */ private[spark] def supportDynamicAllocation = - master.contains("yarn") || dynamicAllocationTesting + master.contains("yarn") || master.contains("mesos") || dynamicAllocationTesting /** * :: DeveloperApi :: @@ -1160,11 +1162,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Express a preference to the cluster manager for a given total number of executors. * This can result in canceling pending requests or filing additional requests. - * This is currently only supported in YARN mode. Return whether the request is received. + * This is currently only supported in YARN and Mesos modes. + * Return whether the request is received. */ private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, - "Requesting executors is currently only supported in YARN mode") + assert(supportDynamicAllocation, + "Requesting executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestTotalExecutors(numExecutors) @@ -1177,12 +1180,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. - * This is currently only supported in YARN mode. Return whether the request is received. + * This is currently only supported in YARN and Mesos modes. + * Return whether the request is received. */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { assert(supportDynamicAllocation, - "Requesting executors is currently only supported in YARN mode") + "Requesting executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1195,12 +1199,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executors. - * This is currently only supported in YARN mode. Return whether the request is received. + * This is currently only supported in YARN and Mesos modes. + * Return whether the request is received. */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { assert(supportDynamicAllocation, - "Killing executors is currently only supported in YARN mode") + "Killing executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) @@ -1213,7 +1218,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request that cluster manager the kill the specified executor. - * This is currently only supported in Yarn mode. Return whether the request is received. + * This is currently only supported in Yarn and Mesos modes. + * Return whether the request is received. */ @DeveloperApi override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) @@ -1403,17 +1409,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def stop() { // Use the stopping variable to ensure no contention for the stop scenario. // Still track the stopped variable for use elsewhere in the code. - + if (!stopped.compareAndSet(false, true)) { logInfo("SparkContext already stopped.") return } - + postApplicationEnd() ui.foreach(_.stop()) env.metricsSystem.report() metadataCleaner.cancel() - cleaner.foreach(_.stop()) + cleaner.foreach(_.stop()) executorAllocationManager.foreach(_.stop()) dagScheduler.stop() dagScheduler = null @@ -2267,9 +2273,9 @@ object SparkContext extends Logging { val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, sc, url) + new CoarseGrainedMesosSchedulerBackend(scheduler, sc, url) } else { - new MesosSchedulerBackend(scheduler, sc, url) + new FineGrainedMesosSchedulerBackend(scheduler, sc, url) } scheduler.initialize(backend) (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 0075d963711f1..255da54a0a1d9 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -77,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend) val reply = bos.toByteArray ctx.write(reply) } - + override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { ctx.flush() } @@ -97,6 +97,7 @@ private[r] class RBackendHandler(server: RBackend) dos: DataOutputStream): Unit = { var obj: Object = null try { + import scala.language.existentials val cls = if (isStatic) { Class.forName(objId) } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4c49da87af9dc..cdbb5a5f4319a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Called by subclasses when notified of a lost worker - def removeExecutor(executorId: String, reason: String) { + def removeExecutor(executorId: String, reason: String): Unit = { try { driverEndpoint.askWithReply[Boolean](RemoveExecutor(executorId, reason)) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala new file mode 100644 index 0000000000000..e9065d73be058 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackend.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.io.File +import java.util.{List => JList} +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock + +import com.google.common.collect.HashBiMap + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap => MutableHashMap, HashSet => MutableHashSet} + +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.{Utils, AkkaUtils} + +/** + * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds + * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever + * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the + * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable + * latency. + * + * Unfortunately, there is some duplication with FineGrainedMesosSchedulerBackend + * that is hard to remove. + */ +private[spark] class CoarseGrainedMesosSchedulerBackend( + val scheduler: TaskSchedulerImpl, + val sparkContext: SparkContext, + val master: String) + extends CoarseGrainedSchedulerBackend(scheduler, sparkContext.env.rpcEnv) + with CommonMesosSchedulerBackend { + + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + + // Maximum number of cores to acquire (TODO: we'll need more flexible controls here.) + private[mesos] val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + + /** Cores we have acquired with each Mesos task ID */ + private[mesos] val coresByTaskId = MutableHashMap.empty[Int, Int] + + private[mesos] var totalCoresAcquired = 0 + + // How many times tasks on each slave failed? + private[mesos] val failuresBySlaveId = MutableHashMap.empty[String, Int] + + private[mesos] val pendingRemovedSlaveIds = MutableHashSet.empty[String] + + protected val executorBackend = this.getClass + + val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) + + var nextMesosTaskId = 0 + + /** Return a new task id for coarse-grained mode. */ + def newMesosTaskId(): Int = { + val id = nextMesosTaskId + nextMesosTaskId += 1 + id + } + + // ==== Definitions for start(): + + // Nothing to do + protected def preStart(): Unit = {} + + // Nothing to do + protected def postStart(): Unit = {} + + /** @see CommonMesosSchedulerBackend.doStart() */ + override def start(): Unit = { + super.start() + doStart() + } + + /** @see CommonMesosSchedulerBackend.doStop() */ + override def stop(): Unit = { + super.stop() + doStop() + } + + def createCommand(offer: Offer, numCores: Int): CommandInfo = { + val extraCommandArguments = + s" --driver-url $driverUrl" + + s" --executor-id ${offer.getSlaveId.getValue}" + + s" --hostname ${offer.getHostname}" + + s" --cores $numCores" + + s" --app-id $appId" + createCommandInfo(extraCommandArguments) + } + + protected def driverUrl: String = AkkaUtils.address( + AkkaUtils.protocol(sparkContext.env.actorSystem), + SparkEnv.driverActorSystemName, + conf.get("spark.driver.host"), + conf.get("spark.driver.port"), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME) + + override def registered( + d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = { + doRegistered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) + } + + /** + * Method called by Mesos to offer resources on slaves. We respond by launching an executor, + * unless we've already launched more than we wanted to. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + stateLock.synchronized { + val filters = Filters.newBuilder().setRefuseSeconds(-1).build() + + for (offer <- offers) { + val slaveId = offer.getSlaveId.getValue + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus").toInt + if (taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + mem >= MemoryUtils.calculateTotalMemory(sparkContext) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId)) { + // Launch an executor on the slave + val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + totalCoresAcquired += cpusToUse + val taskId = newMesosTaskId() + taskIdToSlaveId(taskId) = slaveId + slaveIdsWithExecutors += slaveId + coresByTaskId(taskId) = cpusToUse + val task = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) + .setName("Task " + taskId) + .addResources(createResource("cpus", cpusToUse)) + .addResources(createResource("mem", + MemoryUtils.calculateTotalMemory(sparkContext))) + .build() + d.launchTasks( + Collections.singleton(offer.getId), Collections.singletonList(task), filters) + } else { + // Filter it out + driver.declineOffer(offer.getId) + } + } + } + } + + /** Build a Mesos resource protobuf object */ + private def createResource(resourceName: String, quantity: Double): Protos.Resource = { + Resource.newBuilder() + .setName(resourceName) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) + .build() + } + + + override def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit = { + val taskId = status.getTaskId.getValue.toInt + val state = status.getState + logInfo("Mesos task " + taskId + " is now " + state) + stateLock.synchronized { + if (isFinished(state)) { + val slaveId = taskIdToSlaveId(taskId) + slaveIdsWithExecutors -= slaveId + taskIdToSlaveId -= taskId + // Remove the cores we have remembered for this task, if it's in the hashmap + for (cores <- coresByTaskId.get(taskId)) { + totalCoresAcquired -= cores + coresByTaskId -= taskId + } + // If it was a failure, mark the slave as failed for blacklisting purposes + if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { + failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 + if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { + logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + + "is Spark installed on it?") + } + } + executorTerminated(d, slaveId, s"Executor finished with state $state") + driver.reviveOffers() // In case we'd rejected everything before but have now lost a node + } + } + } + + override def error(d: SchedulerDriver, message: String): Unit = doError(d, message) + + /** Called when a slave is lost or a Mesos task finished. Update local view on + * what tasks are running and remove the terminated slave from the list of pending + * slave IDs that we might have asked to be killed. It also notifies the driver + * that an executor was removed. + */ + private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String) { + stateLock.synchronized { + if (slaveIdsWithExecutors.contains(slaveId)) { + val slaveIdToTaskId = taskIdToSlaveId.inverse() + if (slaveIdToTaskId.contains(slaveId)) { + val taskId: Long = slaveIdToTaskId.get(slaveId) + taskIdToSlaveId.remove(taskId) + removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason) + } + pendingRemovedSlaveIds -= slaveId + slaveIdsWithExecutors -= slaveId + } + } + } + + private def sparkExecutorId(slaveId: String, taskId: String) = "%s/%s".format(slaveId, taskId) + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + val sid = slaveId.getValue + logInfo("Mesos slave lost: " + sid) + executorTerminated(d, sid, "Mesos slave lost: " + sid) + } + + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) + slaveLost(d, s) + } + + override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { + // We don't truly know if we can fulfill the full amount of executors + // since at coarse grain it depends on the amount of slaves available. + logInfo("Capping the total amount of executors to " + requestedTotal) + executorLimit = requestedTotal + true + } + + override def doKillExecutors(executorIds: Seq[String]): Boolean = { + if (driver == null) { + logWarning("Asked to kill executors before the executor was started.") + return false + } + + val slaveIdToTaskId = taskIdToSlaveId.inverse() + for (executorId <- executorIds) { + val slaveId = executorId.split("/")(0) + if (slaveIdToTaskId.contains(slaveId)) { + driver.killTask( + TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build) + pendingRemovedSlaveIds += slaveId + } else { + logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + } + } + + assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size) + + // We cannot simply decrement from the existing executor limit as we may not able to + // launch as much executors as the limit. But we assume if we are notified to kill + // executors, that means the scheduler wants to set the limit that is less than + // the amount of the executors that has been launched. Therefore, we take the existing + // amount of executors launched and deduct the executors killed as the new limit. + executorLimit = taskIdToSlaveId.size - pendingRemovedSlaveIds.size + true + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala deleted file mode 100644 index b037a4966ced0..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{List => JList} -import java.util.Collections - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} - -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} - -import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState} -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.{Utils, AkkaUtils} - -/** - * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds - * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever - * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable - * latency. - * - * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to - * remove this. - */ -private[spark] class CoarseMesosSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with MScheduler - with Logging { - - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - - // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() - - // Driver for talking to Mesos - var driver: SchedulerDriver = null - - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - - // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] - var totalCoresAcquired = 0 - - val slaveIdsWithExecutors = new HashSet[String] - - val taskIdToSlaveId = new HashMap[Int, String] - val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed - - - val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) - - var nextMesosTaskId = 0 - - @volatile var appId: String = _ - - def newMesosTaskId(): Int = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id - } - - override def start() { - super.start() - - synchronized { - new Thread("CoarseMesosSchedulerBackend driver") { - setDaemon(true) - override def run() { - val scheduler = CoarseMesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) - try { { - val ret = driver.run() - logInfo("driver.run() returned with code " + ret) - } - } catch { - case e: Exception => logError("driver.run() failed", e) - } - } - }.start() - - waitForRegister() - } - } - - def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") - extraClassPath.foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val driverUrl = AkkaUtils.address( - AkkaUtils.protocol(sc.env.actorSystem), - SparkEnv.driverActorSystemName, - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME) - - val uri = conf.get("spark.executor.uri", null) - if (uri == null) { - val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath - command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" - .format(prefixEnv, runScript) + - s" --driver-url $driverUrl" + - s" --executor-id ${offer.getSlaveId.getValue}" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head - command.setValue( - s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverUrl" + - s" --executor-id ${offer.getSlaveId.getValue}" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) - } - command.build() - } - - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - appId = frameworkId.getValue - logInfo("Registered as framework ID " + appId) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() - } - } - - def waitForRegister() { - registeredLock.synchronized { - while (!isRegistered) { - registeredLock.wait() - } - } - } - - override def disconnected(d: SchedulerDriver) {} - - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We respond by launching an executor, - * unless we've already launched more than we wanted to. - */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(-1).build() - - for (offer <- offers) { - val slaveId = offer.getSlaveId.toString - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && - mem >= MemoryUtils.calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId(taskId) = slaveId - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) - .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", - MemoryUtils.calculateTotalMemory(sc))) - .build() - d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task), filters) - } else { - // Filter it out - d.launchTasks( - Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) - } - } - } - } - - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - private def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - 0 - } - - /** Build a Mesos resource protobuf object */ - private def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue.toInt - val state = status.getState - logInfo("Mesos task " + taskId + " is now " + state) - synchronized { - if (TaskState.isFinished(TaskState.fromMesos(state))) { - val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId -= taskId - // Remove the cores we have remembered for this task, if it's in the hashmap - for (cores <- coresByTaskId.get(taskId)) { - totalCoresAcquired -= cores - coresByTaskId -= taskId - } - // If it was a failure, mark the slave as failed for blacklisting purposes - if (TaskState.isFailed(TaskState.fromMesos(state))) { - failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 - if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { - logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + - "is Spark installed on it?") - } - } - driver.reviveOffers() // In case we'd rejected everything before but have now lost a node - } - } - } - - override def error(d: SchedulerDriver, message: String) { - logError("Mesos error: " + message) - scheduler.error(message) - } - - override def stop() { - super.stop() - if (driver != null) { - driver.stop() - } - } - - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - if (slaveIdsWithExecutors.contains(slaveId.getValue)) { - // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithExecutors -= slaveId.getValue - removeExecutor(slaveId.getValue, "Mesos slave lost") - } - } - } - - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) - } - - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala new file mode 100644 index 0000000000000..b2a56f3a74900 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CommonMesosSchedulerBackend.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.io.File +import java.util.{List => JList} +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock + +import com.google.common.collect.HashBiMap + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap => MutableHashMap, HashSet => MutableHashSet} + +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} +import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.{Utils, AkkaUtils} + +/** + * Shared code between {@link FineGrainedMesosSchedulerBackend} and + * {@link CoarseGrainedMesosSchedulerBackend}. + */ +trait CommonMesosSchedulerBackend + extends SchedulerBackend + with MScheduler + with Logging { + + // TODO Move these declarations somewhere else? + def resourceOffers(d: SchedulerDriver, offers: JList[Offer]): Unit + def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit + def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit + def requestExecutors(numAdditionalExecutors: Int): Boolean + def requestTotalExecutors(numAdditionalExecutors: Int): Boolean + def doKillExecutors(executorIds: Seq[String]): Boolean + + val scheduler: TaskSchedulerImpl + val sparkContext: SparkContext + val master: String + + /** Driver for talking to Mesos */ + var driver: SchedulerDriver = null + + // the total number of executors we aim to have + private[mesos] var executorLimit = Int.MaxValue + + /** + * Return the current executor limit, which may be [[Int.MaxValue]]. + */ + def getExecutorLimit: Int = executorLimit + + protected val executorBackend: Class[_] + + private[mesos] val taskIdToSlaveId = HashBiMap.create[Long, String] + + private[mesos] val slaveIdsWithExecutors = MutableHashSet.empty[String] + + def slaveHasExecutor(slaveId: String): Boolean = { + slaveIdsWithExecutors.contains(slaveId) + } + + private def executorBackendName: String = executorBackend.getName + private def executorSimpleBackendName: String = executorBackend.getSimpleName + + @volatile var appId: String = _ + + // Lock used to wait for scheduler to be registered + private var isRegistered = false + private val registeredLock = new Object() + + // Protected lock object protecting other mutable state. Using the intrinsic lock + // may lead to deadlocks since the superclass might also try to lock + protected val stateLock = new ReentrantLock + + // ==== Declarations for doStart(): + + protected def preStart(): Unit + protected def postStart(): Unit + + /** + * We would like to override start here and we almost can, except + * unfortunately, we have to call super.start in + * CoarseGrainedMesosSchedulerBackend.start, to invoke + * CoarseGrainedSchedulerBackend.start), which is concrete. + * However, for FineGrainedMesosSchedulerBackend, we _can't_ call + * super.start, because SchedulerBackend.start is abstract. + * So, all the common logic is implemented in this helper method and each + * concrete class overrides start itself. + */ + protected def doStart(): Unit = { + preStart() + + stateLock.synchronized { + val scheduler = this + new Thread(s"$executorSimpleBackendName driver") { + setDaemon(true) + override def run() { + val fwInfo = FrameworkInfo.newBuilder(). + setUser(sparkContext.sparkUser).setName(sparkContext.appName).build() + driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + try { + val ret = driver.run() + logInfo("driver.run() returned with code " + ret) + } catch { + case e: Exception => logError("driver.run() failed", e) + } + } + }.start() + + waitForRegister() + postStart() + } + } + + /** Like start, we must override stop the same way. */ + protected def doStop(): Unit = { + if (driver != null) { + driver.stop() + } + } + + def createCommandInfo(extraCommandArguments: String): CommandInfo = { + val executorSparkHome = sparkContext.conf.getOption("spark.mesos.executor.home") + .orElse(sparkContext.getSparkHome()) + .getOrElse { + throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + } + val environment = Environment.newBuilder() + sparkContext.conf.getOption("spark.executor.extraClassPath").foreach { cp => + environment.addVariables( + Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) + } + val extraJavaOpts = sparkContext.conf.get("spark.executor.extraJavaOptions", "") + + // Set the environment variable through a command prefix + // to append to the existing value of the variable + val prefixEnv = sparkContext.conf.getOption("spark.executor.extraLibraryPath").map { p => + Utils.libraryPathEnvPrefix(Seq(p)) + }.getOrElse("") + + environment.addVariables( + Environment.Variable.newBuilder() + .setName("SPARK_EXECUTOR_OPTS") + .setValue(extraJavaOpts) + .build()) + + sparkContext.executorEnvs.foreach { case (key, value) => + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(value) + .build()) + } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + + val uri = sparkContext.conf.get("spark.executor.uri", null) + if (uri == null) { + val executorPath= new File(executorSparkHome, "./bin/spark-class").getCanonicalPath + command.setValue("%s \"%s\" %s %s".format( + prefixEnv, executorPath, executorBackendName, extraCommandArguments)) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = uri.split('/').last.split('.').head + command.setValue("cd %s*; %s \"%s\" %s %s".format( + basename, prefixEnv, "./bin/spark-class", executorBackendName, extraCommandArguments)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + command.build() + } + + /** Handle rescinding of an offer from Mesos. */ + override def offerRescinded(d: SchedulerDriver, o: OfferID): Unit = {} + + /** + * Implements registered for coarse grained, but the fine grained + * implementation wraps it in the separate class loader. + */ + protected def doRegistered( + d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = { + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } + + /** Busy-wait for registration to complete. */ + def waitForRegister(): Unit = { + registeredLock.synchronized { + while (!isRegistered) { + registeredLock.wait() + } + } + } + + override def disconnected(d: SchedulerDriver): Unit = {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo): Unit = {} + + + /** Helper function to pull out a resource from a Mesos Resources protobuf */ + protected def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) { + return r.getScalar.getValue + } + 0 + } + + /** Check whether a Mesos task state represents a finished task */ + protected def isFinished(state: MesosTaskState): Boolean = { + state == MesosTaskState.TASK_FINISHED || + state == MesosTaskState.TASK_FAILED || + state == MesosTaskState.TASK_KILLED || + state == MesosTaskState.TASK_LOST + } + + /** + * Implements error for coarse grained, but the fine grained + * implementation wraps it in the separate class loader. + */ + protected def doError(d: SchedulerDriver, message: String): Unit = { + logError("Mesos error: " + message) + scheduler.error(message) + } + + override def frameworkMessage( + d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]): Unit = {} +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala similarity index 55% rename from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala rename to core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala index b381436839227..af6ad661dbb84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackend.scala @@ -25,13 +25,12 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.protobuf.ByteString -import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, ExecutorInfo => MesosExecutorInfo, _} import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} +import org.apache.spark.{SparkContext, SparkException, TaskState} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -40,113 +39,62 @@ import org.apache.spark.util.Utils * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks * from multiple apps can run on different cores) and in time (a core can switch ownership). + * + * Unfortunately, there is some duplication with CoarseGrainedMesosSchedulerBackend + * that is hard to remove. */ -private[spark] class MesosSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String) - extends SchedulerBackend - with MScheduler - with Logging { - - // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() - - // Driver for talking to Mesos - var driver: SchedulerDriver = null - - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] - val taskIdToSlaveId = new HashMap[Long, String] +private[spark] class FineGrainedMesosSchedulerBackend( + val scheduler: TaskSchedulerImpl, + val sparkContext: SparkContext, + val master: String) + extends CommonMesosSchedulerBackend { // An ExecutorInfo for our tasks var execArgs: Array[Byte] = null var classLoader: ClassLoader = null + val executorCores = sparkContext.conf.getInt("spark.mesos.executor.cores", 1) + // The listener bus to publish executor added/removed events. - val listenerBus = sc.listenerBus - - @volatile var appId: String = _ - - override def start() { - synchronized { - classLoader = Thread.currentThread.getContextClassLoader - - new Thread("MesosSchedulerBackend driver") { - setDaemon(true) - override def run() { - val scheduler = MesosSchedulerBackend.this - val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() - driver = new MesosSchedulerDriver(scheduler, fwInfo, master) - try { - val ret = driver.run() - logInfo("driver.run() returned with code " + ret) - } catch { - case e: Exception => logError("driver.run() failed", e) - } - } - }.start() + val listenerBus = sparkContext.listenerBus - waitForRegister() - } + // ==== Definitions for start(): + + protected val executorBackend = classOf[MesosExecutorBackend] + + // Initialize the classLoader. + protected def preStart(): Unit = { + classLoader = Thread.currentThread.getContextClassLoader + } + + // Nothing to do + protected def postStart(): Unit = {} + + /** @see CommonMesosSchedulerBackend.doStart() */ + override def start(): Unit = { + doStart() + } + + /** @see CommonMesosSchedulerBackend.doStop() */ + override def stop(): Unit = { + doStop() } def createExecutorInfo(execId: String): MesosExecutorInfo = { - val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val environment = Environment.newBuilder() - sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") - - val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val uri = sc.conf.get("spark.executor.uri", null) - val executorBackendName = classOf[MesosExecutorBackend].getName - if (uri == null) { - val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath - command.setValue(s"$prefixEnv $executorPath $executorBackendName") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head - command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) - } + val command = createCommandInfo("") val cpus = Resource.newBuilder() .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(scheduler.CPUS_PER_TASK).build()) + .setValue(executorCores).build()) .build() val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar( Value.Scalar.newBuilder() - .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) + .setValue(MemoryUtils.calculateTotalMemory(sparkContext)).build()) .build() MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -164,7 +112,7 @@ private[spark] class MesosSchedulerBackend( private def createExecArg(): Array[Byte] = { if (execArgs == null) { val props = new HashMap[String, String] - for ((key,value) <- sc.conf.getAll) { + for ((key,value) <- sparkContext.conf.getAll) { props(key) = value } // Serialize the map as an array of (String, String) pairs @@ -173,28 +121,15 @@ private[spark] class MesosSchedulerBackend( execArgs } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + /** TODO: Is wrapping in the separate class loader necessary? */ + override def registered( + d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = { inClassLoader() { - appId = frameworkId.getValue - logInfo("Registered as framework ID " + appId) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() - } + doRegistered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) } } - def waitForRegister() { - registeredLock.synchronized { - while (!isRegistered) { - registeredLock.wait() - } - } - } - - private def inClassLoader()(fun: => Unit) = { + private def inClassLoader()(fun: => Unit): Unit = { val oldClassLoader = Thread.currentThread.getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) try { @@ -204,39 +139,28 @@ private[spark] class MesosSchedulerBackend( } } - override def disconnected(d: SchedulerDriver) {} - - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} - /** - * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. + * Method called by Mesos to offer resources on slaves. We respond by asking our + * active task sets for tasks in order of priority. We fill each node with tasks + * in a round-robin manner so that tasks are balanced across the cluster. */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]): Unit = { inClassLoader() { // Fail-fast on offers we know will be rejected val (usableOffers, unUsableOffers) = offers.partition { o => + val slaveId = o.getSlaveId.getValue val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? - (mem >= MemoryUtils.calculateTotalMemory(sc) && - // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || - (slaveIdsWithExecutors.contains(slaveId) && - cpus >= scheduler.CPUS_PER_TASK) + val minMemory = MemoryUtils.calculateTotalMemory(sparkContext) + (mem >= minMemory && cpus >= executorCores + scheduler.CPUS_PER_TASK) || + (slaveHasExecutor(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } val workerOffers = usableOffers.map { o => - val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { - getResource(o.getResourcesList, "cpus").toInt - } else { - // If the executor doesn't exist yet, subtract CPU for executor - // TODO(pwendell): Should below just subtract "1"? - getResource(o.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK - } + // If the executor doesn't exist yet, subtract CPU for executor + val slaveId = o.getSlaveId.getValue + val cpus1 = getResource(o.getResourcesList, "cpus").toInt + val cpus = if (slaveHasExecutor(slaveId)) cpus1 else cpus1 - executorCores new WorkerOffer( o.getSlaveId.getValue, o.getHostname, @@ -268,11 +192,12 @@ private[spark] class MesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? mesosTasks.foreach { case (slaveId, tasks) => - slaveIdToWorkerOffer.get(slaveId).foreach(o => - listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, + slaveIdToWorkerOffer.get(slaveId).foreach { o => + val executorAdded = SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) - ) + new ExecutorInfo(o.host, o.cores, Map.empty)) + listenerBus.post(executorAdded) + } d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } @@ -287,21 +212,13 @@ private[spark] class MesosSchedulerBackend( } } - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - 0 - } - /** Turn a Spark TaskDescription into a Mesos task */ def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() val cpuResource = Resource.newBuilder() .setName("cpus") .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) + .setScalar(Value.Scalar.newBuilder().setValue(executorCores).build()) .build() MesosTaskInfo.newBuilder() .setTaskId(taskId) @@ -313,11 +230,11 @@ private[spark] class MesosSchedulerBackend( .build() } - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + override def statusUpdate(d: SchedulerDriver, status: TaskStatus): Unit = { inClassLoader() { val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) - synchronized { + stateLock.synchronized { if (TaskState.isFailed(TaskState.fromMesos(status.getState)) && taskIdToSlaveId.contains(tid)) { // We lost the executor on this slave, so remember that it's gone @@ -331,36 +248,29 @@ private[spark] class MesosSchedulerBackend( } } - override def error(d: SchedulerDriver, message: String) { + /** TODO: is wrapping in the separate class loader necessary? */ + override def error(d: SchedulerDriver, message: String): Unit = { inClassLoader() { - logError("Mesos error: " + message) - scheduler.error(message) - } - } - - override def stop() { - if (driver != null) { - driver.stop() + doError(d, message) } } - override def reviveOffers() { + override def reviveOffers(): Unit = { driver.reviveOffers() } - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - /** * Remove executor associated with slaveId in a thread safe manner. */ - private def removeExecutor(slaveId: String, reason: String) = { - synchronized { + private def removeExecutor(slaveId: String, reason: String): Unit = { + stateLock.synchronized { listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) slaveIdsWithExecutors -= slaveId } } - private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { + private def recordSlaveLost( + d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason): Unit = { inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) removeExecutor(slaveId.getValue, reason.toString) @@ -368,12 +278,12 @@ private[spark] class MesosSchedulerBackend( } } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { recordSlaveLost(d, slaveId, SlaveLost()) } override def executorLost(d: SchedulerDriver, executorId: ExecutorID, - slaveId: SlaveID, status: Int) { + slaveId: SlaveID, status: Int): Unit = { logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, slaveId.getValue)) recordSlaveLost(d, slaveId, ExecutorExited(status)) @@ -386,13 +296,13 @@ private[spark] class MesosSchedulerBackend( ) } - // TODO: query Mesos for number of cores - override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8) + // TODO: Not currently used. + def requestExecutors(numAdditionalExecutors: Int): Boolean = false + def requestTotalExecutors(numAdditionalExecutors: Int): Boolean = false + def doKillExecutors(executorIds: Seq[String]): Boolean = false - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } + // TODO: query Mesos for number of cores + override def defaultParallelism(): Int = + sparkContext.conf.getInt("spark.default.parallelism", 8) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index bbed8ddc6bafc..82eed7f3b4431 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} -import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseGrainedMesosSchedulerBackend, FineGrainedMesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend class SparkContextSchedulerCreationSuite @@ -166,14 +166,14 @@ class SparkContextSchedulerCreationSuite } test("mesos fine-grained") { - testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false) + testMesos("mesos://localhost:1234", classOf[FineGrainedMesosSchedulerBackend], coarse = false) } test("mesos coarse-grained") { - testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true) + testMesos("mesos://localhost:1234", classOf[CoarseGrainedMesosSchedulerBackend], coarse = true) } test("mesos with zookeeper") { - testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false) + testMesos("zk://localhost:1234,localhost:2345", classOf[FineGrainedMesosSchedulerBackend], coarse = false) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..ef5df0b4ba006 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseGrainedMesosSchedulerBackendSuite.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util +import java.util.Collections +import scala.collection.mutable.ArrayBuffer + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos._ +import org.apache.mesos.SchedulerDriver +import org.apache.spark.{ LocalSparkContext, SparkConf, SparkEnv, SparkContext } +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessage + +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.{ ArgumentCaptor, Matchers } +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar + +import scala.collection.mutable + +class CoarseGrainedMesosSchedulerBackendSuite + extends FunSuite + with MesosSchedulerBackendSuiteHelper[CoarseGrainedMesosSchedulerBackend] + with LocalSparkContext + with MockitoSugar { + + protected def makeTestMesosSchedulerBackend( + taskScheduler: TaskSchedulerImpl): CoarseGrainedMesosSchedulerBackend = { + val backend = new CoarseGrainedMesosSchedulerBackend( + taskScheduler, taskScheduler.sc, "master") { + override val driverUrl = "" + + // Since we don't call the start() method, we have to initialize this + // ourselves. We use a mock. + driverEndpoint = makeMockDriverEndpoint + } + backend + } + + // TODO: test that expected methods are called on the endpoint. + protected def makeMockDriverEndpoint: RpcEndpointRef = mock[RpcEndpointRef] + + val (taskIDVal1, slaveIDVal1) = ("0", "s1") + val (taskIDVal2, slaveIDVal2) = ("1", "s2") + + def makeMesosExecutorsTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = { + val (backend, driver) = makeBackendAndDriver + + val (minMem, minCPU) = minMemMinCPU(backend.sparkContext) + + val mesosOffers1 = makeOffersList(makeOffer(taskIDVal1, slaveIDVal1, minMem, minCPU)) + + backend.resourceOffers(driver, mesosOffers1) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers1.get(0).getId)), + any[util.Collection[TaskInfo]], + any[Filters]) + + // Verify we have one executor and the executor limit is 1. + assert(backend.slaveIdsWithExecutors.size === 1) + assert(backend.getExecutorLimit >= 1) + + (backend, driver) // Return so this test can be embedded in others. + } + + def killMesosExecutorDeprecateByOneTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = { + val (backend, driver) = makeMesosExecutorsTest() + + // Calling doKillExecutors should invoke driver.killTask. + val taskID1 = makeTaskID(taskIDVal1) + assert(backend.doKillExecutors(Seq(s"$slaveIDVal1/$taskIDVal1"))) + verify(driver, times(1)).killTask(taskID1) + // Must invoke the status update explicitly here. + // TODO: can we mock other parts of the API so this can be called automatically? + backend.statusUpdate(driver, makeKilledTaskStatus(taskIDVal1, slaveIDVal1)) + + // Verify we don't have any executors. + assert(backend.slaveIdsWithExecutors.size === 0) + // Verify that the executor limit is now 0. + assert(backend.getExecutorLimit === 0) + + val (minMem, minCPU) = minMemMinCPU(backend.sparkContext) + val mesosOffers2 = makeOffersList(makeOffer(taskIDVal2, slaveIDVal2, minMem, minCPU)) + backend.resourceOffers(driver, mesosOffers2) + + verify(driver, times(1)) + .declineOffer(makeOfferID(taskIDVal2)) + + // Verify we didn't launch any new executor + assert(backend.slaveIdsWithExecutors.size === 0) + assert(backend.getExecutorLimit === 0) + + (backend, driver) // Return so this test can be embedded in others. + } + + def increaseAllowedMesosExecutorsTest(): (CommonMesosSchedulerBackend, SchedulerDriver) = { + val (backend, driver) = killMesosExecutorDeprecateByOneTest() + + val (minMem, minCPU) = minMemMinCPU(backend.sparkContext) + val mesosOffers2 = makeOffersList(makeOffer(taskIDVal2, slaveIDVal2, minMem, minCPU)) + + // Now allow one more executor: + backend.requestExecutors(1) + backend.resourceOffers(driver, mesosOffers2) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), + any[util.Collection[TaskInfo]], + any[Filters]) + + assert(backend.slaveIdsWithExecutors.size === 1) + assert(backend.getExecutorLimit >= 1) + + (backend, driver) // Return so this test can be embedded in others. + } + + def slaveLostDoesntChangeMaxAllowedMesosExecutorsTest(): Unit = { + val (backend, driver) = increaseAllowedMesosExecutorsTest() + + backend.slaveLost(driver, makeSlaveID(slaveIDVal2)) + assert(backend.slaveIdsWithExecutors.size === 0) + assert(backend.getExecutorLimit >= 1) + } + + def killAndRelaunchTasksTest(): Unit = { + val (backend, driver) = makeBackendAndDriver + val (minMem, minCPU) = minMemMinCPU(backend.sparkContext, 1024) + val offer1 = makeOffer(taskIDVal1, slaveIDVal1, minMem, minCPU) + val mesosOffers = makeOffersList(offer1) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(offer1.getId)), + anyObject(), + anyObject[Filters]) + assert(backend.slaveIdsWithExecutors.contains(slaveIDVal1)) + + backend.statusUpdate(driver, makeKilledTaskStatus(taskIDVal1, slaveIDVal1)) + assert(!backend.slaveIdsWithExecutors.contains(slaveIDVal1)) + assert(backend.slaveIdsWithExecutors.size === 0) + assert(backend.getExecutorLimit >= 1) + + val offer2 = makeOffer(taskIDVal2, slaveIDVal2, minMem, 1) + mesosOffers.clear() + mesosOffers.add(offer2) + backend.resourceOffers(driver, mesosOffers) + assert(!backend.slaveIdsWithExecutors.contains(slaveIDVal1)) + assert( backend.slaveIdsWithExecutors.contains(slaveIDVal2)) + assert(backend.slaveIdsWithExecutors.size === 1) + assert(backend.getExecutorLimit >= 1) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(offer2.getId)), + anyObject(), + anyObject[Filters]) + + verify(driver, times(1)).reviveOffers() + } + + test("When Mesos offers resources, a Mesos executor is created.") { + makeMesosExecutorsTest() + } + + test("When a Mesos executor is killed, the maximum number of allowed Mesos executors is deprecated by one") { + killMesosExecutorDeprecateByOneTest() + } + + test("The maximum number of allowed Mesos executors can be increased by explicitly requesting new Mesos executors") { + increaseAllowedMesosExecutorsTest() + } + + test("Losing a slave and its Mesos executor doesn't change the maximum allowed number of Mesos executors") { + slaveLostDoesntChangeMaxAllowedMesosExecutorsTest() + } + + test("mesos supports killing and relaunching tasks with executors") { + killAndRelaunchTasksTest() + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..7ca5c76619087 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/FineGrainedMesosSchedulerBackendSuite.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.nio.ByteBuffer +import java.util +import java.util.Collections +import java.util.{ ArrayList => JArrayList } + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.mesos.SchedulerDriver +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.{TaskState => MesosTaskState} +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import org.mockito.{ArgumentCaptor, Matchers} +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} +import org.apache.spark.executor.MesosExecutorBackend +import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, + TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler.cluster.ExecutorInfo + +class FineGrainedMesosSchedulerBackendSuite + extends FunSuite + with MesosSchedulerBackendSuiteHelper[FineGrainedMesosSchedulerBackend] + with LocalSparkContext + with MockitoSugar { + + protected def makeTestMesosSchedulerBackend( + taskScheduler: TaskSchedulerImpl): FineGrainedMesosSchedulerBackend = { + new FineGrainedMesosSchedulerBackend(taskScheduler, taskScheduler.sc, "master") + } + + protected def makeTestOffers(minMem: Int, minCpu: Int): (Offer, Offer, Offer, Offer) = { + val goodOffer1 = makeOffer("o1", "s1", minMem, minCpu) + val badOffer1 = makeOffer("o2", "s2", minMem - 1, minCpu) // memory will be too small. + val goodOffer2 = makeOffer("o3", "s3", minMem, minCpu) + val badOffer2 = makeOffer("o4", "s4", minMem, minCpu - 2) // CPUs will be too small. + (goodOffer1, badOffer1, goodOffer2, badOffer2) + } + + protected def checkLaunchTask( + driver: SchedulerDriver, offer: Offer, expectedValue: Int): ArgumentCaptor[util.Collection[TaskInfo]] = { + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(offer.getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(expectedValue)) + capture + } + + test("The spark-class location is correctly computed") { + val sc = makeMockSparkContext() + sc.conf.set("spark.mesos.executor.home" , "/mesos-home") + + sc.listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val mesosSchedulerBackend = new FineGrainedMesosSchedulerBackend(taskScheduler, sc, "master") + + // uri is null. + val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id") + assert(executorInfo.getCommand.getValue === s""" "/mesos-home/bin/spark-class" ${classOf[MesosExecutorBackend].getName} """) + + // uri exists. + sc.conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") + val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id") + assert(executorInfo1.getCommand.getValue === s"""cd test-app-1*; "./bin/spark-class" ${classOf[MesosExecutorBackend].getName} """) + } + + // The mock taskScheduler will only accept the first offer. + private val expectedTaskId1 = 1L + private val expectedTaskDescriptions = + Seq(Seq(new TaskDescription(expectedTaskId1, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))))) + + protected def offerResourcesHelper(): + (FineGrainedMesosSchedulerBackend, SchedulerDriver, ArgumentCaptor[util.Collection[TaskInfo]], JArrayList[Offer]) = { + + val (backend, driver) = makeBackendAndDriver() + val taskScheduler = backend.scheduler + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val sc = taskScheduler.sc // a mocked object + when(sc.getSparkHome()).thenReturn(Option("/path")) + sc.listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host_s1", 2, Map.empty))) + + val (minMem, minCpu) = minMemMinCPU(sc) + val (goodOffer1, badOffer1, goodOffer2, badOffer2) = makeTestOffers(minMem, minCpu) + val mesosOffers = makeOffersList(goodOffer1, badOffer1, goodOffer2, badOffer2) + val cores = minCpu - backend.executorCores + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) + expectedWorkerOffers.append(new WorkerOffer( + goodOffer1.getSlaveId.getValue, + goodOffer1.getHostname, + cores + )) + expectedWorkerOffers.append(new WorkerOffer( + goodOffer2.getSlaveId.getValue, + goodOffer2.getHostname, + cores + )) + + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(expectedTaskDescriptions) + + val capture = checkLaunchTask(driver, goodOffer1, 1) + when(driver.declineOffer(badOffer1.getId)).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(goodOffer2.getId)).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(badOffer2.getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + (backend, driver, capture, mesosOffers) + } + + test("When acceptable Mesos resource offers are received, tasks are launched for them") { + + val (backend, driver, capture, mesosOffers) = offerResourcesHelper() + val goodOffer1 = mesosOffers.get(0) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(goodOffer1.getId)), + capture.capture(), + any(classOf[Filters]) + ) + + assert(capture.getValue.size() === 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName === "n1") + val cpus = taskInfo.getResourcesList.get(0) + assert(cpus.getName === "cpus") + val actualCpus = cpus.getScalar.getValue.toInt + assert(actualCpus === 1) + assert(taskInfo.getSlaveId.getValue === "s1") + } + + test("When unacceptable Mesos resource offers are received, no tasks are launched for them") { + + val (backend, driver, capture, mesosOffers) = offerResourcesHelper() + val badOffer1 = mesosOffers.get(1) + val goodOffer2 = mesosOffers.get(2) + val badOffer2 = mesosOffers.get(3) + + verify(driver, times(1)).declineOffer(badOffer1.getId) + verify(driver, times(1)).declineOffer(goodOffer2.getId) + verify(driver, times(1)).declineOffer(badOffer2.getId) + } + + test("When acceptable Mesos resource offers are received for a node that already has an executor, they are declined") { + + val (backend, driver, capture, mesosOffers) = offerResourcesHelper() + val goodOffer1 = mesosOffers.get(0) + val taskScheduler = backend.scheduler + resetTaskScheduler(taskScheduler) + reset(driver) + + when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(driver.declineOffer(goodOffer1.getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, makeOffersList(goodOffer1)) + verify(driver, times(1)).declineOffer(goodOffer1.getId) + } + + test("When acceptable Mesos resource offers are received for a node that had an executor that is now gone, they are accepted") { + + val (backend, driver, capture, mesosOffers) = offerResourcesHelper() + val goodOffer1 = mesosOffers.get(0) + val goodOffer2 = mesosOffers.get(1) + val slaveId1 = goodOffer1.getSlaveId.getValue + val taskScheduler = backend.scheduler + + resetTaskScheduler(taskScheduler) + reset(driver) + + // First, reconfirm that offers are rejected while the executor exists. + when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(driver.declineOffer(goodOffer1.getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, makeOffersList(goodOffer1)) + verify(driver, times(1)).declineOffer(goodOffer1.getId) + + // Now, kill the executor, re-offer, and confirm an offer is now accepted (again). + resetTaskScheduler(taskScheduler) + reset(driver) + + val (minMem, minCpu) = minMemMinCPU(taskScheduler.sc) + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + goodOffer1.getSlaveId.getValue, + goodOffer1.getHostname, + minCpu - backend.executorCores + )) + + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(expectedTaskDescriptions) + + backend.statusUpdate(driver, makeKilledTaskStatus(expectedTaskId1.toString, slaveId1, MesosTaskState.TASK_LOST)) + assert(backend.slaveHasExecutor(slaveId1) === false) + checkLaunchTask(driver, goodOffer1, 1) + + backend.resourceOffers(driver, makeOffersList(goodOffer1)) + verify(driver, times(0)).declineOffer(goodOffer1.getId) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala deleted file mode 100644 index f1a4380d349b3..0000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.nio.ByteBuffer -import java.util -import java.util.Collections - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.mesos.Protos.Value.Scalar -import org.apache.mesos.Protos._ -import org.apache.mesos.SchedulerDriver -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.{ArgumentCaptor, Matchers} -import org.scalatest.FunSuite -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, - TaskDescription, TaskSchedulerImpl, WorkerOffer} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} - -class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar { - - test("check spark-class location correctly") { - val conf = new SparkConf - conf.set("spark.mesos.executor.home" , "/mesos-home") - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - - when(sc.conf).thenReturn(conf) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.executorMemory).thenReturn(100) - when(sc.listenerBus).thenReturn(listenerBus) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - - val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") - - // uri is null. - val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id") - assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") - - // uri exists. - conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") - val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id") - assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") - } - - test("mesos resource offers result in launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int) = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() - } - - val driver = mock[SchedulerDriver] - val taskScheduler = mock[TaskSchedulerImpl] - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/path")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.conf).thenReturn(new SparkConf) - when(sc.listenerBus).thenReturn(listenerBus) - - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt - val minCpu = 4 - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) - - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) - expectedWorkerOffers.append(new WorkerOffer( - mesosOffers.get(0).getSlaveId.getValue, - mesosOffers.get(0).getHostname, - 2 - )) - expectedWorkerOffers.append(new WorkerOffer( - mesosOffers.get(2).getSlaveId.getValue, - mesosOffers.get(2).getHostname, - 2 - )) - val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - - val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) - when( - driver.launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - ).thenReturn(Status.valueOf(1)) - when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1)) - when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1)) - - backend.resourceOffers(driver, mesosOffers) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) - verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) - assert(capture.getValue.size() == 1) - val taskInfo = capture.getValue.iterator().next() - assert(taskInfo.getName.equals("n1")) - val cpus = taskInfo.getResourcesList.get(0) - assert(cpus.getName.equals("cpus")) - assert(cpus.getScalar.getValue.equals(2.0)) - assert(taskInfo.getSlaveId.getValue.equals("s1")) - - // Unwanted resources offered on an existing node. Make sure they are declined - val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) - reset(taskScheduler) - reset(driver) - when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) - - backend.resourceOffers(driver, mesosOffers2) - verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) - } -} diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala new file mode 100644 index 0000000000000..1b8f5d346fe17 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuiteHelper.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util +import java.util.Collections +import java.util.{ ArrayList => JArrayList } +import akka.actor.ActorSystem +import com.typesafe.config.Config +import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} +import org.apache.mesos.SchedulerDriver +import org.apache.spark.scheduler.{ LiveListenerBus, SchedulerBackend, TaskSchedulerImpl } +import org.apache.spark.{ LocalSparkContext, SparkConf, SparkEnv, SparkContext } +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.{ ArgumentCaptor, Matchers } +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar + +import scala.collection.mutable + +trait MesosSchedulerBackendSuiteHelper[MSB <: CommonMesosSchedulerBackend] { + self: FunSuite with LocalSparkContext with MockitoSugar => + + protected def makeTestMesosSchedulerBackend( + taskScheduler: TaskSchedulerImpl): MSB + + protected def makeOffer(offerId: String, slaveId: String, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(offerId).build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) + .setHostname(s"host_$slaveId").build() + } + + protected def makeOffersList(offers: Offer*): JArrayList[Offer] = { + val mesosOffers = new JArrayList[Offer] + for (o <- offers) mesosOffers.add(o) + mesosOffers + } + + protected def makeMockSparkContext(): SparkContext = { + val sparkConf = new SparkConf + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + val se = mock[SparkEnv] + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + + val emptyHashMap = mutable.HashMap.empty[String, String] + when(sc.executorEnvs).thenReturn(emptyHashMap) + when(sc.conf).thenReturn(sparkConf) + when(sc.env).thenReturn(se) + + val listenerBus = mock[LiveListenerBus] + when(sc.listenerBus).thenReturn(listenerBus) + + sc + } + + protected def resetTaskScheduler(taskScheduler: TaskSchedulerImpl): TaskSchedulerImpl = { + val sc = taskScheduler.sc + reset(taskScheduler) + when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + taskScheduler + } + + protected def makeMockEnvironment(): (SparkContext, TaskSchedulerImpl, SchedulerDriver) = { + val sc = makeMockSparkContext() + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + (sc, taskScheduler, driver) + } + + protected def makeBackendAndDriver(): (MSB, SchedulerDriver) = { + val (sc, taskScheduler, driver) = makeMockEnvironment() + val backend = makeTestMesosSchedulerBackend(taskScheduler) + backend.driver = driver + (backend, driver) + } + + protected def makeTaskID( id: String): TaskID = TaskID.newBuilder().setValue(id).build() + protected def makeSlaveID(id: String): SlaveID = SlaveID.newBuilder().setValue(id).build() + protected def makeOfferID(id: String): OfferID = OfferID.newBuilder().setValue(id).build() + + // Simulate task killed message, signaling that an executor is no longer running. + protected def makeKilledTaskStatus(taskId: String, slaveId: String, state: MesosTaskState = MesosTaskState.TASK_KILLED) = + TaskStatus.newBuilder() + .setTaskId(makeTaskID(taskId)) + .setSlaveId(makeSlaveID(slaveId)) + .setState(state) + .build + + protected def minMemMinCPU(sc: SparkContext, extraMemory: Int = 0, numCores: Int = 4): (Int,Int) = + (MemoryUtils.calculateTotalMemory(sc).toInt + extraMemory, numCores) + +}