Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private[spark] class MesosClusterScheduler(
// is registered with Mesos master.
@volatile protected var ready = false
private var masterInfo: Option[MasterInfo] = None
private var schedulerDriver: SchedulerDriver = _

def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
val c = new CreateSubmissionResponse
Expand All @@ -168,9 +169,8 @@ private[spark] class MesosClusterScheduler(
return c
}
c.submissionId = desc.submissionId
queuedDriversState.persist(desc.submissionId, desc)
queuedDrivers += desc
c.success = true
addDriverToQueue(desc)
}
c
}
Expand All @@ -191,7 +191,7 @@ private[spark] class MesosClusterScheduler(
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
mesosDriver.killTask(task.taskId)
schedulerDriver.killTask(task.taskId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
Expand Down Expand Up @@ -324,7 +324,7 @@ private[spark] class MesosClusterScheduler(
ready = false
metricsSystem.report()
metricsSystem.stop()
mesosDriver.stop(true)
schedulerDriver.stop(true)
}

override def registered(
Expand All @@ -340,6 +340,8 @@ private[spark] class MesosClusterScheduler(

stateLock.synchronized {
this.masterInfo = Some(masterInfo)
this.schedulerDriver = driver

if (!pendingRecover.isEmpty) {
// Start task reconciliation if we need to recover.
val statuses = pendingRecover.collect {
Expand Down Expand Up @@ -506,28 +508,27 @@ private[spark] class MesosClusterScheduler(
}

private class ResourceOffer(
val offerId: OfferID,
val slaveId: SlaveID,
var resources: JList[Resource]) {
val offer: Offer,
var remainingResources: JList[Resource]) {
override def toString(): String = {
s"Offer id: ${offerId}, resources: ${resources}"
s"Offer id: ${offer.getId}, resources: ${remainingResources}"
}
}

private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()

val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", desc.cores)
partitionResources(offer.remainingResources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
offer.resources = finalResources.asJava
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.slaveId)
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
Expand All @@ -549,23 +550,29 @@ private[spark] class MesosClusterScheduler(
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
val offerOption = currentOffers.find { offer =>
getResource(offer.remainingResources, "cpus") >= driverCpu &&
getResource(offer.remainingResources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
try {
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None, getDriverFrameworkID(submission))
val newState = new MesosClusterSubmissionState(
submission,
task.getTaskId,
offer.offer.getSlaveId,
None,
new Date(),
None,
getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
Expand All @@ -588,7 +595,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()

val currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
offer => new ResourceOffer(offer, offer.getResourcesList)
}.toList

stateLock.synchronized {
Expand All @@ -615,8 +622,8 @@ private[spark] class MesosClusterScheduler(
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}

for (o <- currentOffers if !tasks.contains(o.offerId)) {
driver.declineOffer(o.offerId)
for (offer <- currentOffers if !tasks.contains(offer.offer.getId)) {
declineOffer(driver, offer.offer, None, Some(getRejectOfferDuration(conf)))
}
}

Expand Down Expand Up @@ -662,6 +669,12 @@ private[spark] class MesosClusterScheduler(

override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val taskId = status.getTaskId.getValue

logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
s" reason=${status.getReason}");

stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
Expand All @@ -682,8 +695,7 @@ private[spark] class MesosClusterScheduler(

val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
pendingRetryDrivers += newDriverDescription
pendingRetryDriversState.persist(taskId, newDriverDescription)
addDriverToPending(newDriverDescription, taskId);
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
removeFromLaunchedDrivers(taskId)
state.finishDate = Some(new Date())
Expand Down Expand Up @@ -746,4 +758,21 @@ private[spark] class MesosClusterScheduler(
def getQueuedDriversSize: Int = queuedDrivers.size
def getLaunchedDriversSize: Int = launchedDrivers.size
def getPendingRetryDriversSize: Int = pendingRetryDrivers.size

private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
queuedDriversState.persist(desc.submissionId, desc)
queuedDrivers += desc
revive()
}

private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
pendingRetryDriversState.persist(taskId, desc)
pendingRetryDrivers += desc
revive()
}

private def revive(): Unit = {
logInfo("Reviving Offers.")
schedulerDriver.reviveOffers()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.concurrent.Future

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
Expand Down Expand Up @@ -119,11 +120,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
getRejectOfferDurationForUnmetConstraints(sc.conf)

// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc)
getRejectOfferDurationForReachedMaxCores(sc.conf)

// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
Expand All @@ -146,6 +147,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

@volatile var appId: String = _

private var schedulerDriver: SchedulerDriver = _

def newMesosTaskId(): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
Expand Down Expand Up @@ -252,9 +255,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}

override def registered(
d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
mesosExternalShuffleClient.foreach(_.init(appId))
driver: org.apache.mesos.SchedulerDriver,
frameworkId: FrameworkID,
masterInfo: MasterInfo) {
this.appId = frameworkId.getValue
this.mesosExternalShuffleClient.foreach(_.init(appId))
this.schedulerDriver = driver
markRegistered()
}

Expand Down Expand Up @@ -293,46 +299,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

private def declineUnmatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
declineOffer(
driver,
offer,
Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
}
}

private def declineOffer(
d: org.apache.mesos.SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {

val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val ports = getRangeResource(offer.getResourcesList, "ports")

logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))

refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filters)
case _ => d.declineOffer(offer.getId)
}
}

/**
* Launches executors on accepted offers, and declines unused offers. Executors are launched
* round-robin on offers.
*
* @param d SchedulerDriver
* @param driver SchedulerDriver
* @param offers Mesos offers that match attribute constraints
*/
private def handleMatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
Expand All @@ -358,15 +343,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
s" ports: $ports")
}

d.launchTasks(
driver.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
declineOffer(d, offer, Some("reached spark.cores.max"),
declineOffer(driver,
offer,
Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
declineOffer(d, offer)
declineOffer(
driver,
offer)
}
}
}
Expand Down Expand Up @@ -582,8 +571,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Close the mesos external shuffle client if used
mesosExternalShuffleClient.foreach(_.close())

if (mesosDriver != null) {
mesosDriver.stop()
if (schedulerDriver != null) {
schedulerDriver.stop()
}
}

Expand Down Expand Up @@ -634,13 +623,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
if (schedulerDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
} else {
for (executorId <- executorIds) {
val taskId = TaskID.newBuilder().setValue(executorId).build()
mesosDriver.killTask(taskId)
schedulerDriver.killTask(taskId)
}
// no need to adjust `executorLimitOption` since the AllocationManager already communicated
// the desired limit through a call to `doRequestTotalExecutors`.
Expand Down
Loading