Skip to content

Commit e34b4e1

Browse files
drcrallensrowen
authored andcommitted
[SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend
Mesos 0.23.0 introduces a Fetch Cache feature http://mesos.apache.org/documentation/latest/fetcher/ which allows caching of resources specified in command URIs. This patch: - Updates the Mesos shaded protobuf dependency to 0.23.0 - Allows setting `spark.mesos.fetcherCache.enable` to enable the fetch cache for all specified URIs. (URIs must be specified for the setting to have any affect) - Updates documentation for Mesos configuration with the new setting. This patch does NOT: - Allow for per-URI caching configuration. The cache setting is global to ALL URIs for the command. Author: Charles Allen <charles@allen-net.com> Closes #13713 from drcrallen/SPARK15994.
1 parent cb80edc commit e34b4e1

File tree

5 files changed

+45
-7
lines changed

5 files changed

+45
-7
lines changed

docs/running-on-mesos.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for information on Spark config
506506
since this configuration is just a upper limit and not a guaranteed amount.
507507
</td>
508508
</tr>
509-
510-
509+
<tr>
510+
<td><code>spark.mesos.fetcherCache.enable</code></td>
511+
<td><code>false</code></td>
512+
<td>
513+
If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
514+
</td>
515+
</tr>
511516
</table>
512517

513518
# Troubleshooting and Debugging

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler(
129129
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
130130
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
131131
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
132+
private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
132133
private val schedulerState = engineFactory.createEngine("scheduler")
133134
private val stateLock = new Object()
134135
private val finishedDrivers =
@@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler(
396397
val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
397398

398399
((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
399-
CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
400+
CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
400401
}
401402

402403
private def getDriverCommandValue(desc: MesosDriverDescription): String = {

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
5959
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
6060
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
6161

62+
val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
63+
6264
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
6365

6466
private[this] val shutdownTimeoutMS =
@@ -226,10 +228,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
226228
s" --hostname ${offer.getHostname}" +
227229
s" --cores $numCores" +
228230
s" --app-id $appId")
229-
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
231+
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
230232
}
231233

232-
conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
234+
conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))
233235

234236
command.build()
235237
}

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,11 @@ trait MesosSchedulerUtils extends Logging {
369369
sc.executorMemory
370370
}
371371

372-
def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
372+
def setupUris(uris: String,
373+
builder: CommandInfo.Builder,
374+
useFetcherCache: Boolean = false): Unit = {
373375
uris.split(",").foreach { uri =>
374-
builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
376+
builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
375377
}
376378
}
377379

mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,34 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
463463
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
464464
}
465465

466+
test("mesos supports setting fetcher cache") {
467+
val url = "spark.spark.spark.com"
468+
setBackend(Map(
469+
"spark.mesos.fetcherCache.enable" -> "true",
470+
"spark.executor.uri" -> url
471+
), false)
472+
val offers = List(Resources(backend.executorMemory(sc), 1))
473+
offerResources(offers)
474+
val launchedTasks = verifyTaskLaunched(driver, "o1")
475+
val uris = launchedTasks.head.getCommand.getUrisList
476+
assert(uris.size() == 1)
477+
assert(uris.asScala.head.getCache)
478+
}
479+
480+
test("mesos supports disabling fetcher cache") {
481+
val url = "spark.spark.spark.com"
482+
setBackend(Map(
483+
"spark.mesos.fetcherCache.enable" -> "false",
484+
"spark.executor.uri" -> url
485+
), false)
486+
val offers = List(Resources(backend.executorMemory(sc), 1))
487+
offerResources(offers)
488+
val launchedTasks = verifyTaskLaunched(driver, "o1")
489+
val uris = launchedTasks.head.getCommand.getUrisList
490+
assert(uris.size() == 1)
491+
assert(!uris.asScala.head.getCache)
492+
}
493+
466494
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
467495

468496
private def verifyDeclinedOffer(driver: SchedulerDriver,

0 commit comments

Comments
 (0)