diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee2766..a65fe58a275ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1002,8 +1002,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, - "Requesting executors is currently only supported in YARN mode") + assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN or Mesos coarse-grained mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1020,8 +1020,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, - "Killing executors is currently only supported in YARN mode") + assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, + "Killing executors is currently only supported in YARN or Mesos mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index b9798963bab0a..d61019ca816fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -31,7 +31,6 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler * * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. */ -private[worker] class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala new file mode 100644 index 0000000000000..2eebe48cc3454 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.{SparkConf, Logging, SecurityManager} +import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} +import org.apache.spark.util.{Utils, SignalLogger} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.mesos.Protos._ +import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService +import scala.collection.JavaConversions._ +import scala.io.Source +import java.io.{File, PrintWriter} + +/** + * The Coarse grained Mesos executor backend is responsible for launching the shuffle service + * and the CoarseGrainedExecutorBackend actor. + * This is assuming the scheduler detected that the shuffle service is enabled and launches + * this class instead of CoarseGrainedExecutorBackend directly. + */ +private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) + extends MesosExecutor + with Logging { + + private var shuffleService: StandaloneWorkerShuffleService = null + private var driver: ExecutorDriver = null + private var executorProc: Process = null + private var taskId: TaskID = null + @volatile var killed = false + + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { + this.driver = driver + logInfo("Coarse Grained Mesos Executor '" + executorInfo.getExecutorId.getValue + + "' is registered.") + + if (shuffleService == null) { + sparkConf.set("spark.shuffle.service.enabled", "true") + shuffleService = new StandaloneWorkerShuffleService(sparkConf, new SecurityManager(sparkConf)) + shuffleService.startIfEnabled() + } + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { + if (executorProc != null) { + logError("Received LaunchTask while executor is already running") + val status = TaskStatus.newBuilder() + .setTaskId(taskInfo.getTaskId) + .setSlaveId(taskInfo.getSlaveId) + .setState(TaskState.TASK_FAILED) + .setMessage("Received LaunchTask while executor is already running") + .build() + d.sendStatusUpdate(status) + return + } + + killed = false + + logInfo("Launching task id: " + taskInfo.getTaskId.getValue) + + // We are launching the CoarseGrainedExecutorBackend via subprocess + // because the backend is designed to run in its own process. + // Since it's a shared class we are preserving the existing behavior + // and launching it as a subprocess here. + val command = "exec " + Utils.deserialize[String](taskInfo.getData().toByteArray) + + logInfo("Running command: " + command) + + // Mesos only work on linux platforms, so we assume bash is available is Mesos is used. + val pb = new ProcessBuilder("/bin/bash", "-c", command) + + val currentEnvVars = pb.environment() + for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) { + currentEnvVars.put(variable.getName, variable.getValue) + } + + executorProc = pb.start() + + new Thread("stderr reader for task " + taskInfo.getTaskId.getValue) { + override def run() { + for (line <- Source.fromInputStream(executorProc.getErrorStream).getLines) { + System.err.println(line) + } + } + }.start() + + new Thread("stdout reader for task " + taskInfo.getTaskId.getValue) { + override def run() { + for (line <- Source.fromInputStream(executorProc.getInputStream).getLines) { + System.out.println(line) + } + } + }.start() + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(taskInfo.getTaskId) + .build) + + new Thread("process waiter for mesos executor for task " + taskInfo.getTaskId.getValue) { + override def run() { + executorProc.waitFor() + val (state, msg) = if (killed) { + (TaskState.TASK_KILLED, "") + } else if (executorProc.exitValue() == 0) { + (TaskState.TASK_FINISHED, "") + } else { + (TaskState.TASK_FAILED, "Exited with status: " + executorProc.exitValue().toString) + } + // We leave the shuffle service running after the task. + cleanup(state, msg) + } + }.start() + + taskId = taskInfo.getTaskId + } + + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) + } + + override def killTask(d: ExecutorDriver, t: TaskID) { + if (taskId == null) { + logError("Received killtask when no process is initialized") + return + } + + if (!taskId.getValue.equals(t.getValue)) { + logError("Asked to kill task '" + t.getValue + "' but executor is running task '" + + taskId.getValue + "'") + return + } + + assert(executorProc != null) + killed = true + // We only destroy the coarse grained executor but leave the shuffle + // service running for other tasks that might be reusing this executor. + // This is no-op if the process already finished. + executorProc.destroy() + } + + def cleanup(state: TaskState, msg: String = ""): Unit = synchronized { + if (driver == null) { + logError("Cleaning up process but driver is not initialized") + return + } + + if (executorProc == null) { + logDebug("Process is not started or already cleaned up") + return + } + + assert(taskId != null) + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setState(state) + .setMessage(msg) + .setTaskId(taskId) + .build) + + executorProc = null + taskId = null + } + + override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} + + override def disconnected(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} + + override def shutdown(d: ExecutorDriver) { + if (executorProc != null) { + killTask(d, taskId) + } + + if (shuffleService != null) { + shuffleService.stop() + shuffleService = null + } + } +} + +private[spark] object CoarseGrainedMesosExecutorBackend extends Logging { + def main(args: Array[String]) { + SignalLogger.register(log) + SparkHadoopUtil.get.runAsSparkUser { () => + MesosNativeLibrary.load() + val sparkConf = new SparkConf() + // Create a new Executor and start it running + val runner = new CoarseGrainedMesosExecutorBackend(sparkConf) + new MesosExecutorDriver(runner).run() + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5289661eb896b..ff12505a16ce4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -32,6 +32,13 @@ import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils +import com.google.common.collect.{BiMap, HashBiMap} +import org.apache.mesos.protobuf.ByteString +import java.util.concurrent.locks.ReentrantLock + +case class SlaveStatus(var executorRunning: Boolean, var taskRunning: Boolean) { + def finished = !taskRunning && !executorRunning +} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -51,7 +58,7 @@ private[spark] class CoarseMesosSchedulerBackend( with MScheduler with Logging { - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures // Lock used to wait for scheduler to be registered var isRegistered = false @@ -61,22 +68,31 @@ private[spark] class CoarseMesosSchedulerBackend( var driver: SchedulerDriver = null // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] + val slaveStatuses = new HashMap[String, SlaveStatus] + + val taskIdToSlaveId = HashBiMap.create[Int, String]() - val taskIdToSlaveId = new HashMap[Int, String] val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed + val pendingRemovedSlaveIds = new HashSet[String] val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) var nextMesosTaskId = 0 + var executorLimit: Option[Int] = None + + // Introducing a new lock for protecting above shared state. We avoid locking on the class object + // as it can result in deadlock when synchronized method in this class calls a parent's method + // that is also trying to lock on the class object. + var stateLock = new ReentrantLock() + @volatile var appId: String = _ def newMesosTaskId(): Int = { @@ -91,6 +107,7 @@ private[spark] class CoarseMesosSchedulerBackend( synchronized { new Thread("CoarseMesosSchedulerBackend driver") { setDaemon(true) + override def run() { val scheduler = CoarseMesosSchedulerBackend.this val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() @@ -109,12 +126,27 @@ private[spark] class CoarseMesosSchedulerBackend( } } - def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } + // Set the environment variable through a command prefix + // to append to the existing value of the variable + lazy val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => + Utils.libraryPathEnvPrefix(Seq(p)) + }.getOrElse("") + + lazy val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + conf.get("spark.driver.host"), + conf.get("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + lazy val executorUri = conf.get("spark.executor.uri", null) + + lazy val executorSparkHome = conf.getOption("spark.mesos.executor.home") + .orElse(sc.getSparkHome()) + .getOrElse { + throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + } + + def createBaseCommand(offer: Offer): CommandInfo.Builder = { val environment = Environment.newBuilder() val extraClassPath = conf.getOption("spark.executor.extraClassPath") extraClassPath.foreach { cp => @@ -123,12 +155,6 @@ private[spark] class CoarseMesosSchedulerBackend( } val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") - // Set the environment variable through a command prefix - // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") @@ -143,30 +169,63 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) - - val uri = conf.get("spark.executor.uri", null) - if (uri == null) { + + if (executorUri != null) { + command.addUris(CommandInfo.URI.newBuilder().setValue(executorUri)) + } + + command + } + + def sparkExecutorId(slaveId: String, taskId: String) = "%s/%s".format(slaveId, taskId) + + def createTaskCommandString( + offer: Offer, + numCores: Int, + taskId: String, + executorUri: String = executorUri, + sparkHome: String = null): String = { + val sparkClassCommand = if (executorUri == null && sparkHome == null) { + "./bin/spark-class " + } else if (executorUri == null) { + val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath + "%s \"%s\" ".format(prefixEnv, runScript) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = executorUri.split('/').last.split('.').head + "cd %s*; %s ./bin/spark-class ".format(basename, prefixEnv) + } + sparkClassCommand + + "org.apache.spark.executor.CoarseGrainedExecutorBackend " + + "%s %s %s %d %s".format( + driverUrl, sparkExecutorId(offer.getSlaveId.getValue, taskId), + offer.getHostname, numCores, appId) + } + + def createTaskCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { + val command = createBaseCommand(offer) + command.setValue(createTaskCommandString(offer, numCores, taskId)).build + } + + def createExecutorCommand(offer: Offer): CommandInfo = { + val command = createBaseCommand(offer) + + if (executorUri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format( - prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId)) + ("%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend").format( + prefixEnv, runScript)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head + val basename = executorUri.split('/').last.split('.').head command.setValue( ("cd %s*; %s " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s") - .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId)) - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + "./bin/spark-class org.apache.spark.executor.CoarseGrainedMesosExecutorBackend ") + .format(basename, prefixEnv)) } + command.build() } @@ -193,45 +252,107 @@ private[spark] class CoarseMesosSchedulerBackend( override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + lazy val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + + lazy val shuffleServiceMem = if (shuffleServiceEnabled) { + conf.getInt("spark.mesos.shuffle.service.mem", 1024) + } else { + 0 + } + + lazy val shuffleServiceCpu = if (shuffleServiceEnabled) { + conf.getInt("spark.mesos.shuffle.service.cpu", 1) + } else { + 0 + } + /** * Method called by Mesos to offer resources on slaves. We respond by launching an executor, * unless we've already launched more than we wanted to. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { + stateLock.synchronized { val filters = Filters.newBuilder().setRefuseSeconds(-1).build() for (offer <- offers) { - val slaveId = offer.getSlaveId.toString + val slaveId = offer.getSlaveId.getValue val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && - mem >= MemoryUtils.calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { + logTrace("Received offer id: " + offer.getId.getValue + ", cpu: " + cpus.toString + + ", mem: " + mem.toString) + + var minCpuRequired = 1 + var minMemRequired = MemoryUtils.calculateTotalMemory(sc) + var shuffleResourcesUsed = false + + if (!slaveStatuses.contains(slaveId) || !slaveStatuses(slaveId).executorRunning) { + minCpuRequired += shuffleServiceCpu + minMemRequired += shuffleServiceMem + shuffleResourcesUsed = true + } + + if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && + totalCoresAcquired < maxCores && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + (!slaveStatuses.contains(slaveId) || !slaveStatuses(slaveId).taskRunning) && + mem >= minMemRequired && + cpus >= minCpuRequired) { + // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId - slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + val taskIdString: String = taskId.toString + val builder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskIdString).build()) .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", + + if (!shuffleServiceEnabled) { + slaveStatuses(slaveId) = SlaveStatus(false, true) + builder + .setCommand(createTaskCommand(offer, cpusToUse + extraCoresPerSlave, taskIdString)) + .addResources(createResource("cpus", cpusToUse)) + .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) - .build() + } else { + slaveStatuses(slaveId) = SlaveStatus(true, true) + + // Deduct the executor resources that the remains is for the task. + val (taskCpus, taskMem) = if (shuffleResourcesUsed) { + (cpusToUse - shuffleServiceCpu, mem - shuffleServiceMem) + } else { + (cpusToUse, mem) + } + + builder.setExecutor( + ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue(slaveId).build) + .addResources(createResource("cpus", shuffleServiceCpu)) + .addResources(createResource("mem", shuffleServiceMem)) + .setCommand(createExecutorCommand(offer)).build()) + .addResources(createResource("cpus", taskCpus)) + .addResources(createResource("mem", taskMem)) + + builder.setData( + ByteString.copyFrom( + Utils.serialize( + createTaskCommandString(offer, taskCpus, taskIdString, null, null)))) + } + + logTrace("Launching task with offer id: " + offer.getId.getValue + + ", task: " + builder.build()) + + val status = slaveStatuses(slaveId) + status.taskRunning = true + status.executorRunning = true + d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task), filters) + Collections.singleton(offer.getId), Collections.singletonList(builder.build()), filters) } else { - // Filter it out - d.launchTasks( - Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) + d.declineOffer(offer.getId) } } } @@ -265,12 +386,15 @@ private[spark] class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt val state = status.getState - logInfo("Mesos task " + taskId + " is now " + state) - synchronized { + stateLock.synchronized { + if (!taskIdToSlaveId.containsKey(taskId)) { + return + } + + logInfo("Mesos task " + taskId + " is now " + state) + if (isFinished(state)) { val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId -= taskId // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -281,9 +405,11 @@ private[spark] class CoarseMesosSchedulerBackend( failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + - "is Spark installed on it?") + "is Spark installed on it?") } } + + executorTerminated(d, status.getSlaveId.getValue, "Executor finished with state: " + state) driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } } @@ -303,17 +429,31 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - if (slaveIdsWithExecutors.contains(slaveId.getValue)) { - // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithExecutors -= slaveId.getValue - removeExecutor(slaveId.getValue, "Mesos slave lost") + def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String) { + stateLock.synchronized { + if (slaveStatuses.contains(slaveId)) { + val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() + if (slaveIdToTaskId.contains(slaveId)) { + val taskId: Int = slaveIdToTaskId.get(slaveId) + taskIdToSlaveId.remove(taskId) + removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason) + } + pendingRemovedSlaveIds -= slaveId + val status = slaveStatuses(slaveId) + status.taskRunning = false + if (status.finished) { + slaveStatuses -= slaveId + } } } } + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + logInfo("Mesos slave lost: " + slaveId.getValue) + executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + slaveStatuses -= slaveId.getValue + } + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) slaveLost(d, s) @@ -325,4 +465,40 @@ private[spark] class CoarseMesosSchedulerBackend( super.applicationId } + override def doRequestTotalExecutors(requestedTotal: Int): Boolean = stateLock.synchronized { + // We don't truly know if we can fulfill the full amount of executors + // since at coarse grain it depends on the amount of slaves available. + logInfo("Capping the total amount of executors to " + requestedTotal) + executorLimit = Option(requestedTotal) + true + } + + override def doKillExecutors(executorIds: Seq[String]): Boolean = stateLock.synchronized { + if (driver == null) { + logWarning("Asked to kill executors before the executor was started.") + return false + } + + val slaveIdToTaskId = taskIdToSlaveId.inverse() + for (executorId <- executorIds) { + val slaveId = executorId.split("/")(0) + if (slaveIdToTaskId.contains(slaveId)) { + driver.killTask( + TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build) + pendingRemovedSlaveIds += slaveId + } else { + logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + } + } + + assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size) + + // We cannot simply decrement from the existing executor limit as we may not able to + // launch as much executors as the limit. But we assume if we are notified to kill + // executors, that means the scheduler wants to set the limit that is less than + // the amount of the executors that has been launched. Therefore, we take the existing + // amount of executors launched and deduct the executors killed as the new limit. + executorLimit = Option(taskIdToSlaveId.size - pendingRemovedSlaveIds.size) + true + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..96787b771eda6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import org.scalatest.FunSuite +import org.apache.spark.{SparkEnv, SparkConf, SparkContext, LocalSparkContext} +import org.scalatest.mock.EasyMockSugar +import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, CoarseMesosSchedulerBackend} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.easymock.EasyMock +import org.apache.mesos.SchedulerDriver +import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable +import akka.actor.ActorSystem +import java.util.Collections + +class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { + def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(offerId).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)).setHostname(s"host${slaveId}").build() + } + + test("mesos supports killing and limiting executors") { + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val se = EasyMock.createMock(classOf[SparkEnv]) + val actorSystem = EasyMock.createMock(classOf[ActorSystem]) + val sparkConf = new SparkConf + EasyMock.expect(se.actorSystem).andReturn(actorSystem) + EasyMock.replay(se) + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(sparkConf).anyTimes() + EasyMock.expect(sc.env).andReturn(se) + EasyMock.replay(sc) + + EasyMock.expect(taskScheduler.sc).andReturn(sc) + EasyMock.replay(taskScheduler) + + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.expect( + driver.killTask(TaskID.newBuilder().setValue("0").build()) + ).andReturn(Status.valueOf(1)) + + EasyMock.expect( + driver.declineOffer(OfferID.newBuilder().setValue("o2").build()) + ).andReturn(Status.valueOf(1)) + + EasyMock.replay(driver) + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") + backend.driver = driver + backend.resourceOffers(driver, mesosOffers) + + assert(backend.doKillExecutors(Seq("s1/0"))) + assert(backend.executorLimit.get.equals(0)) + + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu)) + backend.resourceOffers(driver, mesosOffers2) + // Verify we didn't launch any new executor + assert(backend.slaveStatuses.size.equals(1)) + assert(backend.slaveStatuses.values.iterator.next().taskRunning.equals(true)) + assert(backend.pendingRemovedSlaveIds.size.equals(1)) + + EasyMock.verify(driver) + + EasyMock.reset(driver) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers2.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.replay(driver) + + backend.doRequestTotalExecutors(2) + backend.resourceOffers(driver, mesosOffers2) + assert(backend.slaveStatuses.size.equals(2)) + backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) + assert(backend.slaveStatuses.size.equals(1)) + assert(backend.pendingRemovedSlaveIds.size.equals(0)) + + EasyMock.verify(driver) + } + + test("mesos supports killing and relaunching tasks with executors") { + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val se = EasyMock.createMock(classOf[SparkEnv]) + val actorSystem = EasyMock.createMock(classOf[ActorSystem]) + val sparkConf = new SparkConf + EasyMock.expect(se.actorSystem).andReturn(actorSystem) + EasyMock.replay(se) + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(sparkConf).anyTimes() + EasyMock.expect(sc.env).andReturn(se) + EasyMock.replay(sc) + + EasyMock.expect(taskScheduler.sc).andReturn(sc) + EasyMock.replay(taskScheduler) + + // Enable shuffle service so it will require extra resources + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + 1024 + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + val offer2 = createOffer("o2", "s1", minMem, 1); + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(offer2.getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.expect(driver.reviveOffers()).andReturn(Status.valueOf(1)).once + + EasyMock.replay(driver) + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") + backend.driver = driver + backend.resourceOffers(driver, mesosOffers) + + // Simulate task killed, but executor is still running + val status = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue("0").build()) + .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) + .setState(TaskState.TASK_KILLED) + .build + + backend.statusUpdate(driver, status) + assert(backend.slaveStatuses("s1").taskRunning.equals(false)) + assert(backend.slaveStatuses("s1").executorRunning.equals(true)) + + mesosOffers.clear() + mesosOffers.add(offer2) + backend.resourceOffers(driver, mesosOffers) + assert(backend.slaveStatuses("s1").taskRunning.equals(true)) + assert(backend.slaveStatuses("s1").executorRunning.equals(true)) + + EasyMock.verify(driver) + } +} diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 78358499fd01f..8d9bf26ec5ff0 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -226,6 +226,22 @@ See the [configuration page](configuration.html) for information on Spark config The final total amount of memory allocated is the maximum value between executor memory plus memoryOverhead, and overhead fraction (1.07) plus the executor memory. + + spark.mesos.shuffle.service.cpu + 1 + + The amount of CPU that the Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + This will only apply if shuffle service is enabled and is running under coarse grained mode. + + + + spark.mesos.shuffle.service.mem + 1024 + + The amount of memory that Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + This will only apply if shuffle service is enabled and is running under coarse grained mode. + + # Troubleshooting and Debugging