diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 8ffcfc0878a42..c0f9129a423f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -17,69 +17,89 @@ package org.apache.spark.deploy.mesos -import java.net.SocketAddress import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import scala.collection.mutable +import scala.collection.JavaConverters._ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver +import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.ThreadUtils /** * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. */ -private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf) +private[mesos] class MesosExternalShuffleBlockHandler( + transportConf: TransportConf, + cleanerIntervalS: Long) extends ExternalShuffleBlockHandler(transportConf, null) with Logging { - // Stores a map of driver socket addresses to app ids - private val connectedApps = new mutable.HashMap[SocketAddress, String] + ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") + .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) + + // Stores a map of app id to app state (timeout value and last heartbeat) + private val connectedApps = new ConcurrentHashMap[String, AppState]() protected override def handleMessage( message: BlockTransferMessage, client: TransportClient, callback: RpcResponseCallback): Unit = { message match { - case RegisterDriverParam(appId) => + case RegisterDriverParam(appId, appState) => val address = client.getSocketAddress - logDebug(s"Received registration request from app $appId (remote address $address).") - if (connectedApps.contains(address)) { - val existingAppId = connectedApps(address) - if (!existingAppId.equals(appId)) { - logError(s"A new app '$appId' has connected to existing address $address, " + - s"removing previously registered app '$existingAppId'.") - applicationRemoved(existingAppId, true) - } + val timeout = appState.heartbeatTimeout + logInfo(s"Received registration request from app $appId (remote address $address, " + + s"heartbeat timeout $timeout ms).") + if (connectedApps.containsKey(appId)) { + logWarning(s"Received a registration request from app $appId, but it was already " + + s"registered") } - connectedApps(address) = appId + connectedApps.put(appId, appState) callback.onSuccess(ByteBuffer.allocate(0)) + case Heartbeat(appId) => + val address = client.getSocketAddress + Option(connectedApps.get(appId)) match { + case Some(existingAppState) => + logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + + s"address $address).") + existingAppState.lastHeartbeat = System.nanoTime() + case None => + logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + + s"address $address, appId '$appId').") + } case _ => super.handleMessage(message, client, callback) } } - /** - * On connection termination, clean up shuffle files written by the associated application. - */ - override def connectionTerminated(client: TransportClient): Unit = { - val address = client.getSocketAddress - if (connectedApps.contains(address)) { - val appId = connectedApps(address) - logInfo(s"Application $appId disconnected (address was $address).") - applicationRemoved(appId, true /* cleanupLocalDirs */) - connectedApps.remove(address) - } else { - logWarning(s"Unknown $address disconnected.") - } - } - /** An extractor object for matching [[RegisterDriver]] message. */ private object RegisterDriverParam { - def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId) + def unapply(r: RegisterDriver): Option[(String, AppState)] = + Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + } + + private object Heartbeat { + def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) + } + + private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) + + private class CleanerThread extends Runnable { + override def run(): Unit = { + val now = System.nanoTime() + connectedApps.asScala.foreach { case (appId, appState) => + if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + logInfo(s"Application $appId timed out. Removing shuffle files.") + connectedApps.remove(appId) + applicationRemoved(appId, true) + } + } + } } } @@ -93,7 +113,8 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage protected override def newShuffleBlockHandler( conf: TransportConf): ExternalShuffleBlockHandler = { - new MesosExternalShuffleBlockHandler(conf) + val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") + new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/RpcEndpointAddress.scala b/core/src/main/scala/org/apache/spark/rpc/netty/RpcEndpointAddress.scala index d2e94f943aba5..65e9b56f81a40 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/RpcEndpointAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/RpcEndpointAddress.scala @@ -29,7 +29,7 @@ import org.apache.spark.rpc.RpcAddress * @param rpcAddress The socket address of the endpint. * @param name Name of the endpoint. */ -private[netty] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) { +private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) { require(name != null, "RpcEndpoint name must be provided.") @@ -44,7 +44,11 @@ private[netty] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val nam } } -private[netty] object RpcEndpointAddress { +private[spark] object RpcEndpointAddress { + + def apply(host: String, port: Int, name: String): RpcEndpointAddress = { + new RpcEndpointAddress(host, port, name) + } def apply(sparkUrl: String): RpcEndpointAddress = { try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b4871..1a0864270f582 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,20 +18,20 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, List => JList} +import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable +import scala.collection.mutable.{Buffer, HashMap, HashSet} -import com.google.common.collect.HashBiMap -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} +import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcAddress +import org.apache.spark.rpc.netty.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -60,28 +60,38 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private[this] val shutdownTimeoutMS = + conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") + .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") + + // Synchronization protected by stateLock + private[this] var stopCalled: Boolean = false + // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] + val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] - - // Maping from slave Id to hostname - private val slaveIdToHost = new HashMap[String, String] - - val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] - // How many times tasks on each slave failed - val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] + // SlaveID -> Slave + // This map accumulates entries for the duration of the job. Slaves are never deleted, because + // we need to maintain e.g. failure state and connection state. + private val slaves = new HashMap[String, Slave] /** - * The total number of executors we aim to have. Undefined when not using dynamic allocation - * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]]. + * The total number of executors we aim to have. Undefined when not using dynamic allocation. + * Initially set to 0 when using dynamic allocation, the executor allocation manager will send + * the real initial limit later. */ - private var executorLimitOption: Option[Int] = None + private var executorLimitOption: Option[Int] = { + if (Utils.isDynamicAllocationEnabled(conf)) { + Some(0) + } else { + None + } + } /** * Return the current executor limit, which may be [[Int.MaxValue]] @@ -89,13 +99,11 @@ private[spark] class CoarseMesosSchedulerBackend( */ private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - private val pendingRemovedSlaveIds = new HashSet[String] - // private lock object protecting mutable state above. Using the intrinsic lock // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock - val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) + val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) // Offer constraints private val slaveOfferConstraints = @@ -105,27 +113,32 @@ private[spark] class CoarseMesosSchedulerBackend( private val rejectOfferDurationForUnmetConstraints = getRejectOfferDurationForUnmetConstraints(sc) - // A client for talking to the external shuffle service, if it is a + // A client for talking to the external shuffle service private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { - Some(new MesosExternalShuffleClient( - SparkTransportConf.fromSparkConf(conf, "shuffle"), - securityManager, - securityManager.isAuthenticationEnabled(), - securityManager.isSaslEncryptionEnabled())) + Some(getShuffleClient()) } else { None } } + // This method is factored out for testability + protected def getShuffleClient(): MesosExternalShuffleClient = { + new MesosExternalShuffleClient( + SparkTransportConf.fromSparkConf(conf, "shuffle"), + securityManager, + securityManager.isAuthenticationEnabled(), + securityManager.isSaslEncryptionEnabled()) + } + var nextMesosTaskId = 0 @volatile var appId: String = _ - def newMesosTaskId(): Int = { + def newMesosTaskId(): String = { val id = nextMesosTaskId nextMesosTaskId += 1 - id + id.toString } override def start() { @@ -136,11 +149,12 @@ private[spark] class CoarseMesosSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.ui.map(_.appUIAddress)) + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) + ) startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { val executorSparkHome = conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) .getOrElse { @@ -179,12 +193,12 @@ private[spark] class CoarseMesosSchedulerBackend( .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) if (uri.isEmpty) { - val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath + val runScript = new File(executorSparkHome, "./bin/spark-class").getPath command.setValue( "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" .format(prefixEnv, runScript) + s" --driver-url $driverURL" + - s" --executor-id ${offer.getSlaveId.getValue}" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -192,12 +206,11 @@ private[spark] class CoarseMesosSchedulerBackend( // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.get.split('/').last.split('.').head - val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString) command.setValue( s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + s" --driver-url $driverURL" + - s" --executor-id $executorId" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -215,10 +228,10 @@ private[spark] class CoarseMesosSchedulerBackend( if (conf.contains("spark.testing")) { "driverURL" } else { - sc.env.rpcEnv.uriOf( - SparkEnv.driverActorSystemName, - RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME) + RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString } } @@ -245,113 +258,221 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { - val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers.asScala) { + if (stopCalled) { + logDebug("Ignoring offers during shutdown") + // Driver should simply return a stopped status on race + // condition between this.stop() and completing here + offers.asScala.map(_.getId).foreach(d.declineOffer) + return + } + + logDebug(s"Received ${offers.size} resource offers.") + + val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => val offerAttributes = toAttributeMap(offer.getAttributesList) - val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } + + declineUnmatchedOffers(d, unmatchedOffers) + handleMatchedOffers(d, matchedOffers) + } + } + + private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { + for (offer <- offers) { + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val filters = Filters.newBuilder() + .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build() + + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" + + s" for $rejectOfferDurationForUnmetConstraints seconds") + + d.declineOffer(offer.getId, filters) + } + } + + /** + * Launches executors on accepted offers, and declines unused offers. Executors are launched + * round-robin on offers. + * + * @param d SchedulerDriver + * @param offers Mesos offers that match attribute constraints + */ + private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { + val tasks = buildMesosTasks(offers) + for (offer <- offers) { + val offerAttributes = toAttributeMap(offer.getAttributesList) + val offerMem = getResource(offer.getResourcesList, "mem") + val offerCpus = getResource(offer.getResourcesList, "cpus") + val id = offer.getId.getValue + + if (tasks.contains(offer.getId)) { // accept + val offerTasks = tasks(offer.getId) + + logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + + for (task <- offerTasks) { + val taskId = task.getTaskId + val mem = getResource(task.getResourcesList, "mem") + val cpus = getResource(task.getResourcesList, "cpus") + + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") + } + + d.launchTasks( + Collections.singleton(offer.getId), + offerTasks.asJava) + } else { // decline + logDebug(s"Declining offer: $id with attributes: $offerAttributes " + + s"mem: $offerMem cpu: $offerCpus") + + d.declineOffer(offer.getId) + } + } + } + + /** + * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize + * per-task memory and IO, tasks are round-robin assigned to offers. + * + * @param offers Mesos offers that match attribute constraints + * @return A map from OfferID to a list of Mesos tasks to launch on that offer + */ + private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { + // offerID -> tasks + val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + + // offerID -> resources + val remainingResources = mutable.Map(offers.map(offer => + (offer.getId.getValue, offer.getResourcesList)): _*) + + var launchTasks = true + + // TODO(mgummelt): combine offers for a single slave + // + // round-robin create executors on the available offers + while (launchTasks) { + launchTasks = false + + for (offer <- offers) { val slaveId = offer.getSlaveId.getValue - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt - val id = offer.getId.getValue - if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId.put(taskId, slaveId) - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.getResourcesList, "cpus", cpusToUse) - val (_, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) - .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) - } - - // Accept the offer and launch the task - logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname - d.launchTasks( - Collections.singleton(offer.getId), - Collections.singleton(taskBuilder.build()), filters) - } else { - // Decline the offer - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - d.declineOffer(offer.getId) + val offerId = offer.getId.getValue + val resources = remainingResources(offerId) + + if (canLaunchTask(slaveId, resources)) { + // Create a task + launchTasks = true + val taskId = newMesosTaskId() + val offerCPUs = getResource(resources, "cpus").toInt + + val taskCPUs = executorCores(offerCPUs) + val taskMemory = executorMemory(sc) + + slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) + + val (afterCPUResources, cpuResourcesToUse) = + partitionResources(resources, "cpus", taskCPUs) + val (resourcesLeft, memResourcesToUse) = + partitionResources(afterCPUResources.asJava, "mem", taskMemory) + + val taskBuilder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) + .setName("Task " + taskId) + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) } - } else { - // This offer does not meet constraints. We don't need to see it again. - // Decline the offer for a long period of time. - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" - + s" for $rejectOfferDurationForUnmetConstraints seconds") - d.declineOffer(offer.getId, Filters.newBuilder() - .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) + + tasks(offer.getId) ::= taskBuilder.build() + remainingResources(offerId) = resourcesLeft.asJava + totalCoresAcquired += taskCPUs + coresByTaskId(taskId) = taskCPUs } } } + tasks.toMap } + private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { + val offerMem = getResource(resources, "mem") + val offerCPUs = getResource(resources, "cpus").toInt + val cpus = executorCores(offerCPUs) + val mem = executorMemory(sc) + + cpus > 0 && + cpus <= offerCPUs && + cpus + totalCoresAcquired <= maxCores && + mem <= offerMem && + numExecutors() < executorLimit && + slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES + } + + private def executorCores(offerCPUs: Int): Int = { + sc.conf.getInt("spark.executor.cores", + math.min(offerCPUs, maxCores - totalCoresAcquired)) + } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue.toInt - val state = status.getState - logInfo(s"Mesos task $taskId is now $state") - val slaveId: String = status.getSlaveId.getValue + val taskId = status.getTaskId.getValue + val slaveId = status.getSlaveId.getValue + val state = TaskState.fromMesos(status.getState) + + logInfo(s"Mesos task $taskId is now ${status.getState}") + stateLock.synchronized { + val slave = slaves(slaveId) + // If the shuffle service is enabled, have the driver register with each one of the // shuffle services. This allows the shuffle services to clean up state associated with // this application when the driver exits. There is currently not a great way to detect // this through Mesos, since the shuffle services are set up independently. - if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && - slaveIdToHost.contains(slaveId) && - shuffleServiceEnabled) { + if (state.equals(TaskState.RUNNING) && + shuffleServiceEnabled && + !slave.shuffleRegistered) { assume(mesosExternalShuffleClient.isDefined, "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect // framework termination when new Mesos Framework HTTP API is available. val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) - val hostname = slaveIdToHost.remove(slaveId).get + logDebug(s"Connecting to shuffle service on slave $slaveId, " + - s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") + s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") + mesosExternalShuffleClient.get - .registerDriverWithShuffleService(hostname, externalShufflePort) + .registerDriverWithShuffleService( + slave.hostname, + externalShufflePort, + sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"), + sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + slave.shuffleRegistered = true } - if (TaskState.isFinished(TaskState.fromMesos(state))) { - val slaveId = taskIdToSlaveId.get(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId.remove(taskId) + if (TaskState.isFinished(state)) { // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores coresByTaskId -= taskId } // If it was a failure, mark the slave as failed for blacklisting purposes - if (TaskState.isFailed(TaskState.fromMesos(state))) { - failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 - if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { + if (TaskState.isFailed(state)) { + slave.taskFailures += 1 + + if (slave.taskFailures >= MAX_SLAVE_FAILURES) { logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + "is Spark installed on it?") } } - executorTerminated(d, slaveId, s"Executor finished with state $state") + executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") // In case we'd rejected everything before but have now lost a node d.reviveOffers() } @@ -364,7 +485,35 @@ private[spark] class CoarseMesosSchedulerBackend( } override def stop() { - super.stop() + // Make sure we're not launching tasks during shutdown + stateLock.synchronized { + if (stopCalled) { + logWarning("Stop called multiple times, ignoring") + return + } + stopCalled = true + super.stop() + } + + // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. + // See SPARK-12330 + val startTime = System.nanoTime() + + // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent + while (numExecutors() > 0 && + System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) { + Thread.sleep(100) + } + + if (numExecutors() > 0) { + logWarning(s"Timed out waiting for ${numExecutors()} remaining executors " + + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files " + + "on the mesos nodes.") + } + + // Close the mesos external shuffle client if used + mesosExternalShuffleClient.foreach(_.close()) + if (mesosDriver != null) { mesosDriver.stop() } @@ -373,40 +522,26 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** - * Called when a slave is lost or a Mesos task finished. Update local view on - * what tasks are running and remove the terminated slave from the list of pending - * slave IDs that we might have asked to be killed. It also notifies the driver - * that an executor was removed. + * Called when a slave is lost or a Mesos task finished. Updates local view on + * what tasks are running. It also notifies the driver that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated( + d: SchedulerDriver, + slaveId: String, + taskId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { - val slaveIdToTaskId = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) - } - // TODO: This assumes one Spark executor per Mesos slave, - // which may no longer be true after SPARK-5095 - pendingRemovedSlaveIds -= slaveId - slaveIdsWithExecutors -= slaveId - } + removeExecutor(taskId, SlaveLost(reason)) + slaves(slaveId).taskIDs.remove(taskId) } } - private def sparkExecutorId(slaveId: String, taskId: String): String = { - s"$slaveId/$taskId" - } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { logInfo(s"Mesos slave lost: ${slaveId.getValue}") - executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + logInfo("Mesos executor lost: %s".format(e.getValue)) } override def applicationId(): String = @@ -426,23 +561,26 @@ private[spark] class CoarseMesosSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Boolean = { if (mesosDriver == null) { logWarning("Asked to kill executors before the Mesos driver was started.") - return false - } - - val slaveIdToTaskId = taskIdToSlaveId.inverse() - for (executorId <- executorIds) { - val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.containsKey(slaveId)) { - mesosDriver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) - pendingRemovedSlaveIds += slaveId - } else { - logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + false + } else { + for (executorId <- executorIds) { + val taskId = TaskID.newBuilder().setValue(executorId).build() + mesosDriver.killTask(taskId) } + // no need to adjust `executorLimitOption` since the AllocationManager already communicated + // the desired limit through a call to `doRequestTotalExecutors`. + // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] + true } - // no need to adjust `executorLimitOption` since the AllocationManager already communicated - // the desired limit through a call to `doRequestTotalExecutors`. - // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] - true } + + private def numExecutors(): Int = { + slaves.values.map(_.taskIDs.size).sum + } +} + +private class Slave(val hostname: String) { + val taskIDs = new HashSet[String]() + var taskFailures = 0 + var shuffleRegistered = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 573355ba58132..be710f9361b7c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import com.google.common.base.Splitter -import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos} +import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext} @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper - * methods the Mesos scheduler will use. + * methods and Mesos scheduler will use. */ private[mesos] trait MesosSchedulerUtils extends Logging { // Lock used to wait for scheduler to be registered @@ -106,44 +106,56 @@ private[mesos] trait MesosSchedulerUtils extends Logging { registerLatch.await() return } + @volatile + var error: Option[Exception] = None + // We create a new thread that will block inside `mesosDriver.run` + // until the scheduler exists new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") { setDaemon(true) - override def run() { - mesosDriver = newDriver try { + mesosDriver = newDriver val ret = mesosDriver.run() logInfo("driver.run() returned with code " + ret) if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { - System.exit(1) + error = Some(new SparkException("Error starting driver, DRIVER_ABORTED")) + markErr() } } catch { - case e: Exception => { + case e: Exception => logError("driver.run() failed", e) - System.exit(1) - } + error = Some(e) + markErr() } } }.start() registerLatch.await() + + // propagate any error to the calling thread. This ensures that SparkContext creation fails + // without leaving a broken context that won't be able to schedule any tasks + error.foreach(throw _) } } - /** - * Signal that the scheduler has registered with Mesos. - */ - protected def getResource(res: JList[Resource], name: String): Double = { + def getResource(res: JList[Resource], name: String): Double = { // A resource can have multiple values in the offer since it can either be from // a specific role or wildcard. res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } + /** + * Signal that the scheduler has registered with Mesos. + */ protected def markRegistered(): Unit = { registerLatch.countDown() } + protected def markErr(): Unit = { + registerLatch.countDown() + } + def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { val builder = Resource.newBuilder() .setName(name) @@ -170,7 +182,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { - case r => { + case r => if (remain > 0 && r.getType == Value.Type.SCALAR && r.getScalar.getValue > 0.0 && @@ -182,7 +194,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } else { r } - } } // Filter any resource that has depleted. @@ -214,7 +225,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @return */ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { - offerAttributes.asScala.map(attr => { + offerAttributes.asScala.map { attr => val attrValue = attr.getType match { case Value.Type.SCALAR => attr.getScalar case Value.Type.RANGES => attr.getRanges @@ -222,7 +233,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { case Value.Type.TEXT => attr.getText } (attr.getName, attrValue) - }).toMap + }.toMap } @@ -269,11 +280,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for * multiple values (comma separated). For example: * {{{ - * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") + * parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") * // would result in * * Map( - * "tachyon" -> Set("true"), + * "os" -> Set("centos7"), * "zone": -> Set("us-east-1a", "us-east-1b") * ) * }}} @@ -317,6 +328,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { private val MEMORY_OVERHEAD_FRACTION = 0.10 private val MEMORY_OVERHEAD_MINIMUM = 384 + def calculateTotalMemory(sc: SparkContext): Int = executorMemory(sc) + /** * Return the amount of memory to allocate to each executor, taking into account * container overheads. @@ -324,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @return memory requirement as (0.1 * ) or MEMORY_OVERHEAD_MINIMUM * (whichever is larger) */ - def calculateTotalMemory(sc: SparkContext): Int = { + def executorMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead", math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 525ee0d3bdc5a..db4dc2d3105d8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -17,26 +17,248 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util import java.util.Collections -import org.apache.mesos.Protos.Value.Scalar -import org.apache.mesos.Protos._ +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.mockito.Matchers import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite} class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with BeforeAndAfter { + private var sparkConf: SparkConf = _ + private var driver: SchedulerDriver = _ + private var taskScheduler: TaskSchedulerImpl = _ + private var backend: CoarseMesosSchedulerBackend = _ + private var externalShuffleClient: MesosExternalShuffleClient = _ + private var driverEndpoint: RpcEndpointRef = _ + + test("mesos supports killing and limiting executors") { + setBackend() + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = backend.executorMemory(sc) + val minCpu = 4 + val offers = List((minMem, minCpu)) + + // launches a task on a valid offer + offerResources(offers) + verifyTaskLaunched("o1") + + // kills executors + backend.doRequestTotalExecutors(0) + assert(backend.doKillExecutors(Seq("0"))) + val taskID0 = createTaskId("0") + verify(driver, times(1)).killTask(taskID0) + + // doesn't launch a new task when requested executors == 0 + offerResources(offers, 2) + verifyDeclinedOffer(driver, createOfferId("o2")) + + // Launches a new task when requested executors is positive + backend.doRequestTotalExecutors(2) + offerResources(offers, 2) + verifyTaskLaunched("o2") + } + + test("mesos supports killing and relaunching tasks with executors") { + setBackend() + + // launches a task on a valid offer + val minMem = backend.executorMemory(sc) + 1024 + val minCpu = 4 + val offer1 = (minMem, minCpu) + val offer2 = (minMem, 1) + offerResources(List(offer1, offer2)) + verifyTaskLaunched("o1") + + // accounts for a killed task + val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED) + backend.statusUpdate(driver, status) + verify(driver, times(1)).reviveOffers() + + // Launches a new task on a valid offer from the same slave + offerResources(List(offer2)) + verifyTaskLaunched("o2") + } + + test("mesos supports spark.executor.cores") { + val executorCores = 4 + setBackend(Map("spark.executor.cores" -> executorCores.toString)) + + val executorMemory = backend.executorMemory(sc) + val offers = List((executorMemory * 2, executorCores + 1)) + offerResources(offers) + + val taskInfos = verifyTaskLaunched("o1") + assert(taskInfos.size() == 1) + + val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + assert(cpus == executorCores) + } + + test("mesos supports unset spark.executor.cores") { + setBackend() + + val executorMemory = backend.executorMemory(sc) + val offerCores = 10 + offerResources(List((executorMemory * 2, offerCores))) + + val taskInfos = verifyTaskLaunched("o1") + assert(taskInfos.size() == 1) + + val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + assert(cpus == offerCores) + } + + test("mesos does not acquire more than spark.cores.max") { + val maxCores = 10 + setBackend(Map("spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List((executorMemory, maxCores + 1))) + + val taskInfos = verifyTaskLaunched("o1") + assert(taskInfos.size() == 1) + + val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + assert(cpus == maxCores) + } + + test("mesos declines offers that violate attribute constraints") { + setBackend(Map("spark.mesos.constraints" -> "x:true")) + offerResources(List((backend.executorMemory(sc), 4))) + verifyDeclinedOffer(driver, createOfferId("o1"), true) + } + + test("mesos assigns tasks round-robin on offers") { + val executorCores = 4 + val maxCores = executorCores * 2 + setBackend(Map("spark.executor.cores" -> executorCores.toString, + "spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + (executorMemory * 2, executorCores * 2), + (executorMemory * 2, executorCores * 2))) + + verifyTaskLaunched("o1") + verifyTaskLaunched("o2") + } + + test("mesos creates multiple executors on a single slave") { + val executorCores = 4 + setBackend(Map("spark.executor.cores" -> executorCores.toString)) + + // offer with room for two executors + val executorMemory = backend.executorMemory(sc) + offerResources(List((executorMemory * 2, executorCores * 2))) + + // verify two executors were started on a single offer + val taskInfos = verifyTaskLaunched("o1") + assert(taskInfos.size() == 2) + } + + test("mesos doesn't register twice with the same shuffle service") { + setBackend(Map("spark.shuffle.service.enabled" -> "true")) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched("o1") + + val offer2 = createOffer("o2", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer2).asJava) + verifyTaskLaunched("o2") + + val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING) + backend.statusUpdate(driver, status1) + + val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING) + backend.statusUpdate(driver, status2) + verify(externalShuffleClient, times(1)) + .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) + } + + test("mesos kills an executor when told") { + setBackend() + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched("o1") + + backend.doKillExecutors(List("0")) + verify(driver, times(1)).killTask(createTaskId("0")) + } + + private def verifyDeclinedOffer(driver: SchedulerDriver, + offerId: OfferID, + filter: Boolean = false): Unit = { + if (filter) { + verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters]) + } else { + verify(driver, times(1)).declineOffer(Matchers.eq(offerId)) + } + } + + private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = { + val mesosOffers = offers.zipWithIndex.map {case (offer, i) => + createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)} + + backend.resourceOffers(driver, mesosOffers.asJava) + } + + private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = { + val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(createOfferId(offerId))), + captor.capture()) + captor.getValue + } + + private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = { + TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId).build()) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) + .setState(state) + .build + } + + + private def createOfferId(offerId: String): OfferID = { + OfferID.newBuilder().setValue(offerId).build() + } + + private def createSlaveId(slaveId: String): SlaveID = { + SlaveID.newBuilder().setValue(slaveId).build() + } + + private def createExecutorId(executorId: String): ExecutorID = { + ExecutorID.newBuilder().setValue(executorId).build() + } + + private def createTaskId(taskId: String): TaskID = { + TaskID.newBuilder().setValue(taskId).build() + } + private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() @@ -47,8 +269,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder() - .setValue(offerId).build()) + builder.setId(createOfferId(offerId)) .setFrameworkId(FrameworkID.newBuilder() .setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) @@ -58,130 +279,61 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver): CoarseMesosSchedulerBackend = { + driver: SchedulerDriver, + shuffleClient: MesosExternalShuffleClient, + endpoint: RpcEndpointRef): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = driver + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = driver + + override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient + + protected def createDriverEndpointRef( + properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint + + // override to avoid race condition with the driver thread on `mesosDriver` + override def startScheduler(newDriver: SchedulerDriver): Unit = { + mesosDriver = newDriver + } + markRegistered() } backend.start() backend } - var sparkConf: SparkConf = _ - - before { + private def setBackend(sparkConfVars: Map[String, String] = null) { sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .setSparkHome("/path") + .set("spark.mesos.driver.webui.url", "http://webui") - sc = new SparkContext(sparkConf) - } - - test("mesos supports killing and limiting executors") { - val driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) - - sparkConf.set("spark.driver.host", "driverHost") - sparkConf.set("spark.driver.port", "1234") - - val backend = createSchedulerBackend(taskScheduler, driver) - val minMem = backend.calculateTotalMemory(sc) - val minCpu = 4 - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) - - val taskID0 = TaskID.newBuilder().setValue("0").build() - - backend.resourceOffers(driver, mesosOffers) - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - any[util.Collection[TaskInfo]], - any[Filters]) - - // simulate the allocation manager down-scaling executors - backend.doRequestTotalExecutors(0) - assert(backend.doKillExecutors(Seq("s1/0"))) - verify(driver, times(1)).killTask(taskID0) - - val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu)) - backend.resourceOffers(driver, mesosOffers2) - - verify(driver, times(1)) - .declineOffer(OfferID.newBuilder().setValue("o2").build()) - - // Verify we didn't launch any new executor - assert(backend.slaveIdsWithExecutors.size === 1) - - backend.doRequestTotalExecutors(2) - backend.resourceOffers(driver, mesosOffers2) - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), - any[util.Collection[TaskInfo]], - any[Filters]) + if (sparkConfVars != null) { + for (attr <- sparkConfVars) { + sparkConf.set(attr._1, attr._2) + } + } - assert(backend.slaveIdsWithExecutors.size === 2) - backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) - assert(backend.slaveIdsWithExecutors.size === 1) - } + sc = new SparkContext(sparkConf) - test("mesos supports killing and relaunching tasks with executors") { - val driver = mock[SchedulerDriver] + driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] + taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + externalShuffleClient = mock[MesosExternalShuffleClient] + driverEndpoint = mock[RpcEndpointRef] - val backend = createSchedulerBackend(taskScheduler, driver) - val minMem = backend.calculateTotalMemory(sc) + 1024 - val minCpu = 4 - - val mesosOffers = new java.util.ArrayList[Offer] - val offer1 = createOffer("o1", "s1", minMem, minCpu) - mesosOffers.add(offer1) - - val offer2 = createOffer("o2", "s1", minMem, 1); - - backend.resourceOffers(driver, mesosOffers) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer1.getId)), - anyObject(), - anyObject[Filters]) - - // Simulate task killed, executor no longer running - val status = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue("0").build()) - .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) - .setState(TaskState.TASK_KILLED) - .build - - backend.statusUpdate(driver, status) - assert(!backend.slaveIdsWithExecutors.contains("s1")) - - mesosOffers.clear() - mesosOffers.add(offer2) - backend.resourceOffers(driver, mesosOffers) - assert(backend.slaveIdsWithExecutors.contains("s1")) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer2.getId)), - anyObject(), - anyObject[Filters]) - - verify(driver, times(1)).reviveOffers() + backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint) } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 675820308bd4c..2add9c83a73d2 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -19,7 +19,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +46,13 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class); + private final ScheduledExecutorService heartbeaterThread = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("mesos-external-shuffle-client-heartbeater") + .build()); + /** * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}. * Please refer to docs on {@link ExternalShuffleClient} for more information. @@ -53,21 +65,59 @@ public MesosExternalShuffleClient( super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); } - public void registerDriverWithShuffleService(String host, int port) throws IOException { + public void registerDriverWithShuffleService( + String host, + int port, + long heartbeatTimeoutMs, + long heartbeatIntervalMs) throws IOException { + checkInit(); - ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer(); + ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); TransportClient client = clientFactory.createClient(host, port); - client.sendRpc(registerDriver, new RpcResponseCallback() { - @Override - public void onSuccess(ByteBuffer response) { - logger.info("Successfully registered app " + appId + " with external shuffle service."); - } - - @Override - public void onFailure(Throwable e) { - logger.warn("Unable to register app " + appId + " with external shuffle service. " + + client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs)); + } + + private class RegisterDriverCallback implements RpcResponseCallback { + private final TransportClient client; + private final long heartbeatIntervalMs; + + private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) { + this.client = client; + this.heartbeatIntervalMs = heartbeatIntervalMs; + } + + @Override + public void onSuccess(ByteBuffer response) { + heartbeaterThread.scheduleAtFixedRate( + new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS); + logger.info("Successfully registered app " + appId + " with external shuffle service."); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + "Please manually remove shuffle data after driver exit. Error: " + e); - } - }); + } + } + + @Override + public void close() { + heartbeaterThread.shutdownNow(); + super.close(); + } + + private class Heartbeater implements Runnable { + + private final TransportClient client; + + private Heartbeater(TransportClient client) { + this.client = client; + } + + @Override + public void run() { + // TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout + client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer()); + } } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 7fbe3384b4d4f..21c0ff4136aa8 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -24,6 +24,7 @@ import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; +import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; /** * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or @@ -40,7 +41,8 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public static enum Type { - OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4); + OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), + HEARTBEAT(5); private final byte id; @@ -64,6 +66,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 2: return RegisterExecutor.decode(buf); case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); + case 5: return ShuffleServiceHeartbeat.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java index 94a61d6caadc4..b0b6d73ee9cd6 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java @@ -31,33 +31,39 @@ */ public class RegisterDriver extends BlockTransferMessage { private final String appId; + private final long heartbeatTimeoutMs; - public RegisterDriver(String appId) { + public RegisterDriver(String appId, long heartbeatTimeoutMs) { this.appId = appId; + this.heartbeatTimeoutMs = heartbeatTimeoutMs; } public String getAppId() { return appId; } + public long getHeartbeatTimeoutMs() { return heartbeatTimeoutMs; } + @Override protected Type type() { return Type.REGISTER_DRIVER; } @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId); + return Encoders.Strings.encodedLength(appId) + Long.SIZE / Byte.SIZE; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeLong(heartbeatTimeoutMs); } @Override public int hashCode() { - return Objects.hashCode(appId); + return Objects.hashCode(appId, heartbeatTimeoutMs); } public static RegisterDriver decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); - return new RegisterDriver(appId); + long heartbeatTimeout = buf.readLong(); + return new RegisterDriver(appId, heartbeatTimeout); } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java new file mode 100644 index 0000000000000..b30bb9aed55b6 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol.mesos; + +import io.netty.buffer.ByteBuf; +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + +/** + * A heartbeat sent from the driver to the MesosExternalShuffleService. + */ +public class ShuffleServiceHeartbeat extends BlockTransferMessage { + private final String appId; + + public ShuffleServiceHeartbeat(String appId) { + this.appId = appId; + } + + public String getAppId() { return appId; } + + @Override + protected Type type() { return Type.HEARTBEAT; } + + @Override + public int encodedLength() { return Encoders.Strings.encodedLength(appId); } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + } + + public static ShuffleServiceHeartbeat decode(ByteBuf buf) { + return new ShuffleServiceHeartbeat(Encoders.Strings.decode(buf)); + } +}