diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index a07773c1c71e..41b90d76d87c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -702,7 +702,16 @@ See the [configuration page](configuration.html) for information on Spark config
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
since this configuration is just an upper limit and not a guaranteed amount.
-
+
+
+ spark.mesos.disk |
+ (none) |
+
+ Set the amount of disk to acquire for this job. You might need to set this value depending on the type of disk isolation set up in Mesos.
+ For instance, setting an amount of disk is required when XFS isolator is enabled with hard limit enforced otherwise the isolator will kill
+ the Mesos executor when downloading the Spark executor archive.
+ |
+
spark.mesos.network.name |
(none) |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 79a113792de9..115c1c2d71ae 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -321,6 +321,16 @@ package object config {
.intConf
.createWithDefault(0)
+ private[spark] val EXECUTOR_DISK =
+ ConfigBuilder("spark.mesos.disk")
+ .doc("Set the amount of disk to acquire for this job. You might need to set this value " +
+ "depending on the type of disk isolation set up in Mesos. For instance, setting an " +
+ "amount of disk is required when XFS isolator is enabled with hard limit enforced " +
+ "otherwise the isolator will kill the Mesos executor when downloading the Spark executor " +
+ "archive.")
+ .intConf
+ .createOptional
+
private[spark] val TASK_LABELS =
ConfigBuilder("spark.mesos.task.labels")
.doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " +
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 8bd61c230d8e..193705317ca5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -79,6 +79,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE)
private val maxGpus = conf.get(MAX_GPUS)
+ private val diskPerExecutor = conf.get(EXECUTOR_DISK)
private val taskLabels = conf.get(TASK_LABELS)
@@ -399,6 +400,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
+ val offerDisk = getResource(offer.getResourcesList, "disk")
val offerReservationInfo = offer
.getResourcesList
.asScala
@@ -411,7 +413,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
offerReservationInfo.map(resInfo =>
s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") +
- s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " +
+ s"mem: $offerMem cpu: $offerCpus ports: $offerPorts disk: $offerDisk " +
s"resources: ${offer.getResourcesList.asScala.mkString(",")}." +
s" Launching ${offerTasks.size} Mesos tasks.")
@@ -419,10 +421,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val taskId = task.getTaskId
val mem = getResource(task.getResourcesList, "mem")
val cpus = getResource(task.getResourcesList, "cpus")
+ val disk = getResource(task.getResourcesList, "disk")
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
- s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ")
+ s" disk: $disk ports: $ports on slave with slave id: ${task.getSlaveId.getValue} ")
}
driver.launchTasks(
@@ -497,11 +500,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
+ val taskDisk = diskPerExecutor
slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
val (resourcesLeft, resourcesToUse) =
- partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
+ partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs, taskDisk)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
@@ -534,7 +538,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
resources: JList[Resource],
taskCPUs: Int,
taskMemory: Int,
- taskGPUs: Int)
+ taskGPUs: Int,
+ taskDisk: Option[Int])
: (List[Resource], List[Resource]) = {
// partition cpus & mem
@@ -550,14 +555,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val (nonPortResources, portResourcesToUse) =
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)
- (nonPortResources,
+ var (remainingResources, resourcesToUse) = (nonPortResources,
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
+
+ if (taskDisk.isDefined) {
+ val (afterDiskResources, diskResourcesToUse) =
+ partitionResources(remainingResources.asJava, "disk", taskDisk.get)
+
+ remainingResources = afterDiskResources
+ resourcesToUse ++= diskResourcesToUse
+ }
+ (remainingResources, resourcesToUse)
}
private def canLaunchTask(slaveId: String, offerHostname: String,
resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
+ val offerDisk = getResource(resources, "disk").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
val ports = getRangeResource(resources, "ports")
@@ -568,6 +583,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
+ diskPerExecutor.fold(true)(_ <= offerDisk) &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements &&
satisfiesLocality(offerHostname)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 06993712035f..b871cc246fd5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -180,6 +180,10 @@ trait MesosSchedulerUtils extends Logging {
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}
+ def resourceExists(res: JList[Resource], name: String): Boolean = {
+ res.asScala.exists(_.getName == name)
+ }
+
/**
* Transforms a range resource to a list of ranges
*
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index f26ff04a9a89..661925b7d535 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -314,8 +314,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Utils.createTextAttribute("c2", "b"))
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
- Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
- Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))
+ Utils.createOffer("o2", "s2", mem, cpu, None, 0, None, s2Attributes),
+ Utils.createOffer("o3", "s3", mem, cpu, None, 0, None, s3Attributes))
def submitDriver(driverConstraints: String): Unit = {
val response = scheduler.submitDriver(
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 37c0f5f45075..8fa34f19694d 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -136,6 +136,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}
+ test("mesos supports spark.mesos.disk") {
+ val claimedDisk = 40
+ setBackend(Map("spark.mesos.disk" -> claimedDisk.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ val offers = List(Resources(executorMemory, 1, 0, Some(100)))
+ offerResources(offers)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val taskDisk = backend.getResource(taskInfos.head.getResourcesList, "disk")
+ assert(taskDisk == claimedDisk)
+ }
+
+ test("mesos supports unset spark.mesos.disk") {
+ setBackend()
+
+ val executorMemory = backend.executorMemory(sc)
+ val offers = List(Resources(executorMemory, 1, 0, Some(100)))
+ offerResources(offers)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val taskDiskExist = backend.resourceExists(taskInfos.head.getResourcesList, "disk")
+ assert(!taskDiskExist)
+ }
+
+ test("mesos declines offer if not enough disk available") {
+ val claimedDisk = 400
+ setBackend(Map("spark.mesos.disk" -> claimedDisk.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ val offers = List(Resources(executorMemory, 1, 0, Some(100)))
+ offerResources(offers)
+
+ verifyDeclinedOffer(driver, createOfferId("o1"))
+ }
+
test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map(CORES_MAX.key -> maxCores.toString))
@@ -686,7 +726,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o1")
}
- private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+ private case class Resources(mem: Int, cpus: Int, gpus: Int = 0, disk: Option[Int] = None)
private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
val mockEndpointRef = mock[RpcEndpointRef]
@@ -709,7 +749,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
- createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
+ createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus,
+ offer.disk)}
backend.resourceOffers(driver, mesosOffers.asJava)
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 65e595e3cf2b..9fc7998fe47b 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -50,6 +50,7 @@ object Utils {
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0,
+ disk: Option[Int] = None,
attributes: List[Attribute] = List.empty): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
@@ -73,6 +74,12 @@ object Utils {
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(gpus))
}
+ if (disk.isDefined) {
+ builder.addResourcesBuilder()
+ .setName("disk")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(disk.get))
+ }
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))