diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 77b06fcf3374..923d8dbebf3d 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for information on Spark config
since this configuration is just a upper limit and not a guaranteed amount.
-
-
+
+ spark.mesos.fetcherCache.enable |
+ false |
+
+ If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
+ |
+
# Troubleshooting and Debugging
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0b454997772d..635712c00d30 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler(
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+ private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
private val finishedDrivers =
@@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler(
val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
- CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+ CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
}
private def getDriverCommandValue(desc: MesosDriverDescription): String = {
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e67bf3e328f9..5063c1fe988b 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private[this] val shutdownTimeoutMS =
@@ -226,10 +228,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}
- conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
+ conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))
command.build()
}
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 73cc241239c4..9cb60237044a 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -369,9 +369,11 @@ trait MesosSchedulerUtils extends Logging {
sc.executorMemory
}
- def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
+ def setupUris(uris: String,
+ builder: CommandInfo.Builder,
+ useFetcherCache: Boolean = false): Unit = {
uris.split(",").foreach { uri =>
- builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
+ builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
}
}
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 75ba02e470e2..f73638fda623 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -463,6 +463,34 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
}
+ test("mesos supports setting fetcher cache") {
+ val url = "spark.spark.spark.com"
+ setBackend(Map(
+ "spark.mesos.fetcherCache.enable" -> "true",
+ "spark.executor.uri" -> url
+ ), false)
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val uris = launchedTasks.head.getCommand.getUrisList
+ assert(uris.size() == 1)
+ assert(uris.asScala.head.getCache)
+ }
+
+ test("mesos supports disabling fetcher cache") {
+ val url = "spark.spark.spark.com"
+ setBackend(Map(
+ "spark.mesos.fetcherCache.enable" -> "false",
+ "spark.executor.uri" -> url
+ ), false)
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val uris = launchedTasks.head.getCommand.getUrisList
+ assert(uris.size() == 1)
+ assert(!uris.asScala.head.getCache)
+ }
+
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
private def verifyDeclinedOffer(driver: SchedulerDriver,