diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 77b06fcf3374..f06bf664b8ec 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -428,6 +428,13 @@ See the [configuration page](configuration.html) for information on Spark config
and resource weight sharing.
+
+ spark.mesos.ignoreDefaultRoleResources |
+ false |
+
+ Only if `spark.mesos.role` has been set, ignore mesos resources with the role `*`.
+ |
+
spark.mesos.constraints |
(none) |
diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
index d4c7022f006a..9dfa66171571 100644
--- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
+++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -41,7 +41,7 @@ private[spark] class MesosDriverDescription(
val cores: Double,
val supervise: Boolean,
val command: Command,
- schedulerProperties: Map[String, String],
+ val schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0b454997772d..94a0af6570d5 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -551,12 +551,16 @@ private[spark] class MesosClusterScheduler(
currentOffers: List[ResourceOffer],
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
for (submission <- candidates) {
+ val acceptedResourceRoles = getAcceptedResourceRoles(submission.schedulerProperties)
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
- getResource(o.resources, "cpus") >= driverCpu &&
- getResource(o.resources, "mem") >= driverMem
+ val acceptableResources = o.resources.asScala
+ .filter((r: Resource) => acceptedResourceRoles(r.getRole))
+ .asJava
+ getResource(acceptableResources, "cpus") >= driverCpu &&
+ getResource(acceptableResources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e67bf3e328f9..b7129f89a85a 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -382,6 +382,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val remainingResources = mutable.Map(offers.map(offer =>
(offer.getId.getValue, offer.getResourcesList)): _*)
+ val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)
+
var launchTasks = true
// TODO(mgummelt): combine offers for a single slave
@@ -393,15 +395,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
for (offer <- offers) {
val slaveId = offer.getSlaveId.getValue
val offerId = offer.getId.getValue
- val resources = remainingResources(offerId)
+ val resources =
+ remainingResources(offerId).asScala
+ .filter((r: Resource) => acceptedResourceRoles(r.getRole))
+ .asJava
- if (canLaunchTask(slaveId, resources)) {
+ if (canLaunchTask(slaveId, resources, acceptedResourceRoles)) {
// Create a task
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
val taskGPUs = Math.min(
- Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
+ Math.max(0, maxGpus - totalGpusAcquired),
+ getResource(resources, "gpus").toInt)
val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
@@ -466,7 +472,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
}
- private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
+ private def canLaunchTask(
+ slaveId: String,
+ resources: JList[Resource],
+ acceptedResourceRoles: Set[String]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 09a252f3c74a..04395146eb7e 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -249,10 +249,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
+ val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)
+
// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
- val mem = getResource(o.getResourcesList, "mem")
- val cpus = getResource(o.getResourcesList, "cpus")
+ val acceptableResources = o.getResourcesList.asScala
+ .filter((r: Resource) => acceptedResourceRoles(r.getRole))
+ .asJava
+ val mem = getResource(acceptableResources, "mem")
+ val cpus = getResource(acceptableResources, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 73cc241239c4..d8ffa7b563b8 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -49,6 +49,44 @@ trait MesosSchedulerUtils extends Logging {
// Driver for talking to Mesos
protected var mesosDriver: SchedulerDriver = null
+ /**
+ * Returns the configured set of roles that an offer can be selected from
+ * @param conf Spark configuration
+ */
+ protected def getAcceptedResourceRoles(conf: SparkConf): Set[String] = {
+ getAcceptedResourceRoles(
+ conf.getBoolean("spark.mesos.ignoreDefaultRoleResources", false),
+ conf.getOption("spark.mesos.role"))
+ }
+ /**
+ * Returns the configured set of roles that an offer can be selected from
+ * @param props Mesos driver description schedulerProperties map
+ */
+ protected def getAcceptedResourceRoles(props: Map[String, String]): Set[String] = {
+ getAcceptedResourceRoles(
+ props.get("spark.mesos.ignoreDefaultRoleResources") match {
+ case Some(truth) => truth.toBoolean
+ case None => false
+ },
+ props.get("spark.mesos.role"))
+ }
+ /**
+ * Internal version of getAcceptedResourceRoles
+ * @param ignoreDefaultRoleResources user specified property
+ * @param role user specified property
+ */
+ private def getAcceptedResourceRoles(
+ ignoreDefaultRoleResources: Boolean,
+ role: Option[String]) = {
+ val roles = ignoreDefaultRoleResources match {
+ case true if role.isDefined => Set(role)
+ case _ => Set(Some("*"), role)
+ }
+ val acceptedRoles = roles.flatten
+ logDebug(s"Accepting resources from role(s): ${acceptedRoles.mkString(",")}")
+ acceptedRoles
+ }
+
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 87d9080de569..77ce1987fd42 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -17,20 +17,29 @@
package org.apache.spark.scheduler.cluster.mesos
+import java.nio.ByteBuffer
import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers
+import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
+ TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
@@ -85,68 +94,195 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.queuedDrivers.isEmpty)
}
- test("can handle multiple roles") {
- setScheduler()
-
+ test("accept all roles by default") {
val driver = mock[SchedulerDriver]
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
- command,
- Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
- "s1",
- new Date()))
- assert(response.success)
- val offer = Offer.newBuilder()
- .addResources(
- Resource.newBuilder().setRole("*")
- .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("*")
- .setScalar(Scalar.newBuilder().setValue(1000).build())
- .setName("mem")
- .setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("role2")
- .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("role2")
- .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
- .setId(OfferID.newBuilder().setValue("o1").build())
- .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
- .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
- .setHostname("host1")
- .build()
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val conf = new SparkConf
+ conf.set("spark.mesos.role", "dev")
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("*")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ ))
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList.asScala
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("*")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("*")
+ })
+ }
+
+ test("can ignore default role") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val conf = new SparkConf
+ conf.set("spark.mesos.role", "dev")
+ conf.set("spark.mesos.ignoreDefaultRoleResources", "true")
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("*")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ ))
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
when(
driver.launchTasks(
- Matchers.eq(Collections.singleton(offer.getId)),
- capture.capture())
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
).thenReturn(Status.valueOf(1))
- scheduler.resourceOffers(driver, Collections.singletonList(offer))
-
- val taskInfos = capture.getValue
- assert(taskInfos.size() == 1)
- val taskInfo = taskInfos.iterator().next()
- val resources = taskInfo.getResourcesList
- assert(scheduler.getResource(resources, "cpus") == 1.5)
- assert(scheduler.getResource(resources, "mem") == 1200)
- val resourcesSeq: Seq[Resource] = resources.asScala
- val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
- assert(cpus.size == 2)
- assert(cpus.exists(_.getRole().equals("role2")))
- assert(cpus.exists(_.getRole().equals("*")))
- val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
- assert(mem.size == 2)
- assert(mem.exists(_.getRole().equals("role2")))
- assert(mem.exists(_.getRole().equals("*")))
+ backend.resourceOffers(driver, mesosOffers)
verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(offer.getId)),
- capture.capture()
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
)
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList.asScala
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("*")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("*")
+ })
}
test("escapes commandline args for the shell") {
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 1d7a86f4b090..a241c80f5111 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -291,7 +291,7 @@ class MesosFineGrainedSchedulerBackendSuite
verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
- test("can handle multiple roles") {
+ test("accept all roles by default") {
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
@@ -299,11 +299,14 @@ class MesosFineGrainedSchedulerBackendSuite
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+ val conf = new SparkConf
+ conf.set("spark.mesos.role", "dev")
+
val sc = mock[SparkContext]
when(sc.executorMemory).thenReturn(100)
when(sc.getSparkHome()).thenReturn(Option("/path"))
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.conf).thenReturn(new SparkConf)
+ when(sc.conf).thenReturn(conf)
when(sc.listenerBus).thenReturn(listenerBus)
val id = 1
@@ -311,11 +314,11 @@ class MesosFineGrainedSchedulerBackendSuite
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setRole("prod")
+ .setRole("*")
.setScalar(Scalar.newBuilder().setValue(500))
builder.addResourcesBuilder()
.setName("cpus")
- .setRole("prod")
+ .setRole("*")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(1))
builder.addResourcesBuilder()
@@ -339,12 +342,108 @@ class MesosFineGrainedSchedulerBackendSuite
val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
- expectedWorkerOffers += new WorkerOffer(
+ expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2 // Deducting 1 for executor
+ ))
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
)
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList.asScala
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("*")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("*")
+ })
+ }
+
+ test("can ignore default role") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val conf = new SparkConf
+ conf.set("spark.mesos.role", "dev")
+ conf.set("spark.mesos.ignoreDefaultRoleResources", "true")
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("*")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers.append(new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ ))
+
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
@@ -376,10 +475,10 @@ class MesosFineGrainedSchedulerBackendSuite
assert(cpusDev.getRole.equals("dev"))
val executorResources = taskInfo.getExecutor.getResourcesList.asScala
assert(executorResources.exists { r =>
- r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("*")
})
assert(executorResources.exists { r =>
- r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("*")
})
}
}