Skip to content

Commit ebadaf3

Browse files
committed
Add unit test.
1 parent 934727b commit ebadaf3

File tree

4 files changed

+163
-96
lines changed

4 files changed

+163
-96
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler(
357357
val appJar = CommandInfo.URI.newBuilder()
358358
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
359359
val builder = CommandInfo.newBuilder().addUris(appJar)
360-
val entries =
361-
(conf.getOption("spark.executor.extraLibraryPath").toList ++
362-
desc.command.libraryPathEntries)
360+
val entries = conf.getOption("spark.executor.extraLibraryPath")
361+
.map(path => Seq(path) ++ desc.command.libraryPathEntries)
362+
.getOrElse(desc.command.libraryPathEntries)
363+
363364
val prefixEnv = if (!entries.isEmpty) {
364365
Utils.libraryPathEnvPrefix(entries)
365366
} else {
@@ -445,8 +446,7 @@ private[spark] class MesosClusterScheduler(
445446
private class ResourceOffer(
446447
val offerId: OfferID,
447448
val slaveId: SlaveID,
448-
var resources: JList[Resource],
449-
var used: Boolean) {
449+
var resources: JList[Resource]) {
450450
override def toString(): String = {
451451
s"Offer id: ${offerId}, resources: ${resources}"
452452
}
@@ -460,13 +460,13 @@ private[spark] class MesosClusterScheduler(
460460
private def scheduleTasks(
461461
candidates: Seq[MesosDriverDescription],
462462
afterLaunchCallback: (String) => Boolean,
463-
currentOffers: JList[ResourceOffer],
464-
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): JList[ResourceOffer] = {
463+
currentOffers: List[ResourceOffer],
464+
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
465465
for (submission <- candidates) {
466466
val driverCpu = submission.cores
467467
val driverMem = submission.mem
468468
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
469-
val offerOption = currentOffers.asScala.find { o =>
469+
val offerOption = currentOffers.find { o =>
470470
getResource(o.resources, "cpus") >= driverCpu &&
471471
getResource(o.resources, "mem") >= driverMem
472472
}
@@ -475,22 +475,21 @@ private[spark] class MesosClusterScheduler(
475475
s"cpu: $driverCpu, mem: $driverMem")
476476
} else {
477477
val offer = offerOption.get
478-
offer.used = true
479478
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
480479
val (remainingResources, cpuResourcesToUse) =
481480
partitionResources(offer.resources, "cpus", driverCpu)
482481
val (finalResources, memResourcesToUse) =
483-
partitionResources(remainingResources, "mem", driverMem)
482+
partitionResources(remainingResources.asJava, "mem", driverMem)
484483
val commandInfo = buildDriverCommand(submission)
485484
val appName = submission.schedulerProperties("spark.app.name")
486485
val taskInfo = TaskInfo.newBuilder()
487486
.setTaskId(taskId)
488487
.setName(s"Driver for $appName")
489488
.setSlaveId(offer.slaveId)
490489
.setCommand(commandInfo)
491-
.addAllResources(cpuResourcesToUse)
492-
.addAllResources(memResourcesToUse)
493-
offer.resources = finalResources
490+
.addAllResources(cpuResourcesToUse.asJava)
491+
.addAllResources(memResourcesToUse.asJava)
492+
offer.resources = finalResources.asJava
494493
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
495494
val container = taskInfo.getContainerBuilder()
496495
val volumes = submission.schedulerProperties
@@ -514,32 +513,32 @@ private[spark] class MesosClusterScheduler(
514513
afterLaunchCallback(submission.submissionId)
515514
}
516515
}
517-
currentOffers
518516
}
519517

520518
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
521519
logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
522520
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
523521
val currentTime = new Date()
524522

525-
var currentOffers = offers.asScala.map {
526-
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, false)
527-
}.toList.asJava
523+
val currentOffers = offers.asScala.map {
524+
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
525+
}.toList
526+
528527
stateLock.synchronized {
529528
// We first schedule all the supervised drivers that are ready to retry.
530529
// This list will be empty if none of the drivers are marked as supervise.
531530
val driversToRetry = pendingRetryDrivers.filter { d =>
532531
d.retryState.get.nextRetry.before(currentTime)
533532
}
534533

535-
currentOffers = scheduleTasks(
534+
scheduleTasks(
536535
copyBuffer(driversToRetry),
537536
removeFromPendingRetryDrivers,
538537
currentOffers,
539538
tasks)
540539

541540
// Then we walk through the queued drivers and try to schedule them.
542-
currentOffers = scheduleTasks(
541+
scheduleTasks(
543542
copyBuffer(queuedDrivers),
544543
removeFromQueuedDrivers,
545544
currentOffers,
@@ -548,7 +547,10 @@ private[spark] class MesosClusterScheduler(
548547
tasks.foreach { case (offerId, taskInfos) =>
549548
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
550549
}
551-
currentOffers.asScala.filter(!_.used).foreach(o => driver.declineOffer(o.offerId))
550+
551+
for (o <- currentOffers if !tasks.contains(o.offerId)) {
552+
driver.declineOffer(o.offerId)
553+
}
552554
}
553555

554556
private def copyBuffer(

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
179179
def partitionResources(
180180
resources: JList[Resource],
181181
resourceName: String,
182-
amountToUse: Double): (JList[Resource], JList[Resource]) = {
182+
amountToUse: Double): (List[Resource], List[Resource]) = {
183183
var remain = amountToUse
184184
var requestedResources = new ArrayBuffer[Resource]
185185
val remainingResources = resources.asScala.map {
@@ -202,7 +202,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
202202
val filteredResources =
203203
remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
204204

205-
(filteredResources.toList.asJava, requestedResources.toList.asJava)
205+
(filteredResources.toList, requestedResources.toList)
206206
}
207207

208208
/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster.mesos
19+
20+
import java.util.{Collection, Collections, Date}
21+
22+
import scala.collection.JavaConverters._
23+
24+
import org.apache.mesos.Protos._
25+
import org.apache.mesos.Protos.Value.{Scalar, Type}
26+
import org.apache.mesos.SchedulerDriver
27+
import org.mockito.{ArgumentCaptor, Matchers}
28+
import org.mockito.Mockito._
29+
import org.scalatest.mock.MockitoSugar
30+
31+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
32+
import org.apache.spark.deploy.Command
33+
import org.apache.spark.deploy.mesos.MesosDriverDescription
34+
35+
36+
class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
37+
38+
private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
39+
private var scheduler: MesosClusterScheduler = _
40+
41+
override def beforeEach(): Unit = {
42+
val conf = new SparkConf()
43+
conf.setMaster("mesos://localhost:5050")
44+
conf.setAppName("spark mesos")
45+
scheduler = new MesosClusterScheduler(
46+
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
47+
override def start(): Unit = { ready = true }
48+
}
49+
scheduler.start()
50+
}
51+
52+
test("can queue drivers") {
53+
val response = scheduler.submitDriver(
54+
new MesosDriverDescription("d1", "jar", 1000, 1, true,
55+
command, Map[String, String](), "s1", new Date()))
56+
assert(response.success)
57+
val response2 =
58+
scheduler.submitDriver(new MesosDriverDescription(
59+
"d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
60+
assert(response2.success)
61+
val state = scheduler.getSchedulerState()
62+
val queuedDrivers = state.queuedDrivers.toList
63+
assert(queuedDrivers(0).submissionId == response.submissionId)
64+
assert(queuedDrivers(1).submissionId == response2.submissionId)
65+
}
66+
67+
test("can kill queued drivers") {
68+
val response = scheduler.submitDriver(
69+
new MesosDriverDescription("d1", "jar", 1000, 1, true,
70+
command, Map[String, String](), "s1", new Date()))
71+
assert(response.success)
72+
val killResponse = scheduler.killDriver(response.submissionId)
73+
assert(killResponse.success)
74+
val state = scheduler.getSchedulerState()
75+
assert(state.queuedDrivers.isEmpty)
76+
}
77+
78+
test("can handle multiple roles") {
79+
val driver = mock[SchedulerDriver]
80+
val response = scheduler.submitDriver(
81+
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
82+
command,
83+
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
84+
"s1",
85+
new Date()))
86+
assert(response.success)
87+
val offer = Offer.newBuilder()
88+
.addResources(
89+
Resource.newBuilder().setRole("*")
90+
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
91+
.addResources(
92+
Resource.newBuilder().setRole("*")
93+
.setScalar(Scalar.newBuilder().setValue(1000).build())
94+
.setName("mem")
95+
.setType(Type.SCALAR))
96+
.addResources(
97+
Resource.newBuilder().setRole("role2")
98+
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
99+
.addResources(
100+
Resource.newBuilder().setRole("role2")
101+
.setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
102+
.setId(OfferID.newBuilder().setValue("o1").build())
103+
.setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
104+
.setSlaveId(SlaveID.newBuilder().setValue("s1").build())
105+
.setHostname("host1")
106+
.build()
107+
108+
val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
109+
110+
when(
111+
driver.launchTasks(
112+
Matchers.eq(Collections.singleton(offer.getId)),
113+
capture.capture())
114+
).thenReturn(Status.valueOf(1))
115+
116+
scheduler.resourceOffers(driver, Collections.singletonList(offer))
117+
118+
val taskInfos = capture.getValue
119+
assert(taskInfos.size() == 1)
120+
val taskInfo = taskInfos.iterator().next()
121+
val resources = taskInfo.getResourcesList
122+
assert(scheduler.getResource(resources, "cpus") == 1.5)
123+
assert(scheduler.getResource(resources, "mem") == 1200)
124+
val resourcesSeq: Seq[Resource] = resources.asScala
125+
val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
126+
assert(cpus.size == 2)
127+
assert(cpus.exists(_.getRole().equals("role2")))
128+
assert(cpus.exists(_.getRole().equals("*")))
129+
val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
130+
assert(mem.size == 2)
131+
assert(mem.exists(_.getRole().equals("role2")))
132+
assert(mem.exists(_.getRole().equals("*")))
133+
134+
verify(driver, times(1)).launchTasks(
135+
Matchers.eq(Collections.singleton(offer.getId)),
136+
capture.capture()
137+
)
138+
}
139+
}

core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 0 additions & 74 deletions
This file was deleted.

0 commit comments

Comments
 (0)