Skip to content
9 changes: 7 additions & 2 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for information on Spark config
since this configuration is just a upper limit and not a guaranteed amount.
</td>
</tr>


<tr>
<td><code>spark.mesos.fetcherCache.enable</code></td>
<td><code>false</code></td>
<td>
If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler(
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need to factor out all these conf vars like YARN does at some point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea but outside scope of this PR

private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
private val finishedDrivers =
Expand Down Expand Up @@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler(
val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")

((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
}

private def getDriverCommandValue(desc: MesosDriverDescription): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)

val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)

private[this] val shutdownTimeoutMS =
Expand Down Expand Up @@ -226,10 +228,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}

conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))

command.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,11 @@ trait MesosSchedulerUtils extends Logging {
sc.executorMemory
}

def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
def setupUris(uris: String,
builder: CommandInfo.Builder,
useFetcherCache: Boolean = false): Unit = {
uris.split(",").foreach { uri =>
builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,34 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
}

test("mesos supports setting fetcher cache") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
), false)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
val uris = launchedTasks.head.getCommand.getUrisList
assert(uris.size() == 1)
assert(uris.asScala.head.getCache)
}

test("mesos supports disabling fetcher cache") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
), false)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
val uris = launchedTasks.head.getCommand.getUrisList
assert(uris.size() == 1)
assert(!uris.asScala.head.getCache)
}

private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)

private def verifyDeclinedOffer(driver: SchedulerDriver,
Expand Down