Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
.get("spark.mesos.executor.docker.volumes")
.map(MesosSchedulerBackendUtil.parseVolumesSpec)
val portmaps = submission.schedulerProperties
.get("spark.mesos.executor.docker.portmaps")
.map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
MesosSchedulerBackendUtil.addDockerInfo(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
submission.schedulerProperties.get,
taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
taskBuilder.getContainerBuilder
)
}

tasks(offer.getId) ::= taskBuilder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
executorInfo.getContainerBuilder()
)
}

(executorInfo.build(), resourcesAfterMem.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

/**
Expand Down Expand Up @@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {

val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)

network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
Expand All @@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}

/**
* Setup a docker containerizer
* Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
conf: SparkConf,
conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
val portmaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)

addDockerInfo(
builder,
imageName,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",
"spark.mesos.executor.docker.forcePullImage" -> "true",
"spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
"spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)

val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(launchedTasks.size == 1)

val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)

val volumes = containerInfo.getVolumesList.asScala
assert(volumes.size == 1)

val volume = volumes.head
assert(volume.getHostPath == "/host_vol")
assert(volume.getContainerPath == "/container_vol")
assert(volume.getMode == Volume.Mode.RO)

val dockerInfo = containerInfo.getDocker

assert(dockerInfo.getImage == "some_image")
assert(dockerInfo.getForcePullImage)

val portMappings = dockerInfo.getPortMappingsList.asScala
assert(portMappings.size == 1)

val portMapping = portMappings.head
assert(portMapping.getHostPort == 8080)
assert(portMapping.getContainerPort == 80)
assert(portMapping.getProtocol == "tcp")
}

test("force-pull-image option is disabled by default") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)

val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(launchedTasks.size == 1)

val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)

val dockerInfo = containerInfo.getDocker

assert(dockerInfo.getImage == "some_image")
assert(!dockerInfo.getForcePullImage)
}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite

val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
.set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")

Expand All @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
MESOS_VERSION: 0.21.0
MESOS_VERSION: 0.22.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
12 changes: 12 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.

Requires Mesos version 0.20.1 or later.

Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.

# Running Alongside Hadoop

You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
Expand Down Expand Up @@ -334,6 +338,14 @@ See the [configuration page](configuration.html) for information on Spark config
the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.forcePullImage</code></td>
<td>false</td>
<td>
Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
By default Mesos agents will not pull images they already have cached.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<mesos.version>0.21.1</mesos.version>
<mesos.version>0.22.2</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
Expand Down