Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -517,16 +517,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
.get("spark.mesos.executor.docker.volumes")
.map(MesosSchedulerBackendUtil.parseVolumesSpec)
val portmaps = submission.schedulerProperties
.get("spark.mesos.executor.docker.portmaps")
.map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
MesosSchedulerBackendUtil.addDockerInfo(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
submission.schedulerProperties.get,
taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
taskBuilder.getContainerBuilder
)
}

tasks(offer.getId) ::= taskBuilder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
executorInfo.getContainerBuilder()
)
}

(executorInfo.build(), resourcesAfterMem.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

/**
Expand Down Expand Up @@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on this, I figured that the network parameter is used nowhere. Just wanted to point this out, not sure if there is WIP for adding this feature.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'm not sure what happened here. Thanks for pointing it out.

portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {

val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)

network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
Expand All @@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}

/**
* Setup a docker containerizer
* Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
conf: SparkConf,
conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
val portmaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)

addDockerInfo(
builder,
imageName,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",
"spark.mesos.executor.docker.forcePullImage" -> "true",
"spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
"spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)

val launchedTasks = verifyTaskLaunched("o1").asScala
assert(launchedTasks.size == 1)

val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)

val volumes = containerInfo.getVolumesList.asScala
assert(volumes.size == 1)

val volume = volumes.head
assert(volume.getHostPath == "/host_vol")
assert(volume.getContainerPath == "/container_vol")
assert(volume.getMode == Volume.Mode.RO)

val dockerInfo = containerInfo.getDocker

assert(dockerInfo.getImage == "some_image")
assert(dockerInfo.getForcePullImage)

val portMappings = dockerInfo.getPortMappingsList.asScala
assert(portMappings.size == 1)

val portMapping = portMappings.head
assert(portMapping.getHostPort == 8080)
assert(portMapping.getContainerPort == 80)
assert(portMapping.getProtocol == "tcp")
}

test("force-pull-image option is disabled by default") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)

val launchedTasks = verifyTaskLaunched("o1").asScala
assert(launchedTasks.size == 1)

val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)

val dockerInfo = containerInfo.getDocker

assert(dockerInfo.getImage == "some_image")
assert(!dockerInfo.getForcePullImage)
}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite

val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
.set("spark.mesos.executor.docker.forcePullImage", "true")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the test to at least one of CoarseGrained and Cluster scheduler. Fine grained is deprecated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did so :)

.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")

Expand All @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these generated files? What are they used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These files are part of the test suite. The tests will fail if the dependencies in these files don't match the deps in the pull request. At least thats my interpretation, whats the usual way of suggestion dependency updated in a pull request?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno. I'll assume this is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have another PR in flight that will update this to 0.28. Is this going to be OK? maybe will merge that first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thats fine

metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
MESOS_VERSION: 0.21.0
MESOS_VERSION: 0.22.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
12 changes: 12 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.

Requires Mesos version 0.20.1 or later.

Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.

# Running Alongside Hadoop

You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
Expand Down Expand Up @@ -334,6 +338,14 @@ See the [configuration page](configuration.html) for information on Spark config
the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.forcePullImage</code></td>
<td>false</td>
<td>
Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
By default Mesos agents will not pull images they already have cached.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<mesos.version>0.21.1</mesos.version>
<mesos.version>0.22.2</mesos.version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a line in running-on-mesos.md

Spark 1.6.1 is designed for use with Mesos 0.21.0

You'll need to change that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, fixed that

<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
Expand Down