Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)

client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

// A thread pool to perform receive then reply actions in a thread so as not to block the
// event loop.
private val askAndReplyThreadPool =
ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")

override def onStart(): Unit = {
try {
registerWithMaster(1)
Expand Down Expand Up @@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
endpointRef: RpcEndpointRef,
context: RpcCallContext,
msg: T): Unit = {
// Create a thread to ask a message and reply with the result. Allow thread to be
// Ask a message and create a thread to reply with the result. Allow thread to be
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
askAndReplyThreadPool.execute(new Runnable {
override def run(): Unit = {
try {
context.reply(endpointRef.askWithRetry[Boolean](msg))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(t) =>
context.sendFailure(t)
}
}
})
endpointRef.ask[Boolean](msg).andThen {
case Success(b) => context.reply(b)
case Failure(ie: InterruptedException) => // Cancelled
case Failure(NonFatal(t)) => context.sendFailure(t)
}(ThreadUtils.sameThread)
}

override def onDisconnected(address: RpcAddress): Unit = {
Expand Down Expand Up @@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
registrationRetryThread.shutdownNow()
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
askAndReplyThreadPool.shutdownNow()
}

}
Expand Down Expand Up @@ -301,25 +291,25 @@ private[spark] class StandaloneAppClient(
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
Future.successful(false)
}
}

/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false
Future.successful(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import scala.concurrent.duration.Duration

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected val totalRegisteredExecutors = new AtomicInteger(0)
protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
private val _minRegisteredRatio =
Expand Down Expand Up @@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Remove a disconnected slave from the cluster
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
Expand Down Expand Up @@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
if (numAdditionalExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of additional executor(s) " +
s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
}
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
logDebug(s"Number of pending executors is now $numPendingExecutors")

numPendingExecutors += numAdditionalExecutors
// Account for executors pending to be added or removed
val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
doRequestTotalExecutors(newTotal)
val response = synchronized {
numPendingExecutors += numAdditionalExecutors
logDebug(s"Number of pending executors is now $numPendingExecutors")

// Account for executors pending to be added or removed
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
}

defaultAskTimeout.awaitResult(response)
}

/**
Expand All @@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
): Boolean = synchronized {
): Boolean = {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}

this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount
val response = synchronized {
this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount

numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)

numPendingExecutors =
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
doRequestTotalExecutors(numExecutors)
doRequestTotalExecutors(numExecutors)
}

defaultAskTimeout.awaitResult(response)
}

/**
Expand All @@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
* @return whether the request is acknowledged.
* @return a future whose evaluation indicates whether the request is acknowledged.
*/
protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager kill the specified executors.
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
final override def killExecutors(executorIds: Seq[String]): Boolean = {
killExecutors(executorIds, replace = false, force = false)
}

Expand All @@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = synchronized {
force: Boolean): Boolean = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
} else {
numPendingExecutors += knownExecutors.size
val response = synchronized {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
} else {
numPendingExecutors += knownExecutors.size
Future.successful(true)
}

val killExecutors: Boolean => Future[Boolean] =
if (!executorsToKill.isEmpty) {
_ => doKillExecutors(executorsToKill)
} else {
_ => Future.successful(false)
}

adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
}

!executorsToKill.isEmpty && doKillExecutors(executorsToKill)
defaultAskTimeout.awaitResult(response)
}

/**
* Kill the given list of executors through the cluster manager.
* @return whether the kill request is acknowledged.
*/
protected def doKillExecutors(executorIds: Seq[String]): Boolean = false

protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster

import java.util.concurrent.Semaphore

import scala.concurrent.Future

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
Expand Down Expand Up @@ -173,25 +175,25 @@ private[spark] class StandaloneSchedulerBackend(
*
* @return whether the request is acknowledged.
*/
protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
Option(client) match {
case Some(c) => c.requestTotalExecutors(requestedTotal)
case None =>
logWarning("Attempted to request executors before driver fully initialized.")
false
Future.successful(false)
}
}

/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Option(client) match {
case Some(c) => c.killExecutors(executorIds)
case None =>
logWarning("Attempted to kill executors before driver fully initialized.")
false
Future.successful(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{Buffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}

Expand Down Expand Up @@ -577,15 +578,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
super.applicationId
}

override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
executorLimitOption = Some(requestedTotal)
true
}

override def doKillExecutors(executorIds: Seq[String]): Boolean = {
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}

import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -270,13 +271,13 @@ private class FakeSchedulerBackend(
clusterManagerEndpoint: RpcEndpointRef)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
clusterManagerEndpoint.askWithRetry[Boolean](
protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}

protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
}
}

Expand Down
Loading