Skip to content

Commit 736674b

Browse files
committed
[SPARK-24137] Mount local directories as empty dir volumes.
Dramatically improves performance and won't cause Spark applications to fail because they write too much data to the Docker image's specific file system. The file system's directories that back emptydir volumes are generally larger and more performant.
1 parent 3121b41 commit 736674b

File tree

7 files changed

+223
-13
lines changed

7 files changed

+223
-13
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
455455
private[spark] def validateSettings() {
456456
if (contains("spark.local.dir")) {
457457
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
458-
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
458+
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN)."
459459
logWarning(msg)
460460
}
461461

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.nio.file.Paths
20+
import java.util.UUID
21+
22+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
23+
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
private[spark] class LocalDirsFeatureStep(
27+
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
28+
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
29+
extends KubernetesFeatureConfigStep {
30+
31+
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
32+
// property - we want to instead default to mounting an emptydir volume that doesn't already
33+
// exist in the image.
34+
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
35+
// a bit opinionated about YARN and Mesos.
36+
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
37+
.orElse(conf.getOption("spark.local.dir"))
38+
.getOrElse(defaultLocalDir)
39+
.split(",")
40+
41+
override def configurePod(pod: SparkPod): SparkPod = {
42+
val localDirVolumes = resolvedLocalDirs
43+
.zipWithIndex
44+
.map {
45+
case (localDir, index) =>
46+
new VolumeBuilder()
47+
.withName(s"spark-local-dir-${index + 1}-${Paths.get(localDir).getFileName.toString}")
48+
.withNewEmptyDir()
49+
.endEmptyDir()
50+
.build()
51+
}
52+
val localDirVolumeMounts = localDirVolumes
53+
.zip(resolvedLocalDirs)
54+
.map {
55+
case (localDirVolume, localDirPath) =>
56+
new VolumeMountBuilder()
57+
.withName(localDirVolume.getName)
58+
.withMountPath(localDirPath)
59+
.build()
60+
}
61+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
62+
.editSpec()
63+
.addToVolumes(localDirVolumes: _*)
64+
.endSpec()
65+
.build()
66+
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
67+
.addNewEnv()
68+
.withName("SPARK_LOCAL_DIRS")
69+
.withValue(resolvedLocalDirs.mkString(","))
70+
.endEnv()
71+
.addToVolumeMounts(localDirVolumeMounts: _*)
72+
.build()
73+
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
74+
}
75+
76+
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
77+
78+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
79+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.spark.deploy.k8s.submit
1818

1919
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
20-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep}
20+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
2121

2222
private[spark] class KubernetesDriverBuilder(
2323
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
@@ -29,14 +29,18 @@ private[spark] class KubernetesDriverBuilder(
2929
new DriverServiceFeatureStep(_),
3030
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3131
=> MountSecretsFeatureStep) =
32-
new MountSecretsFeatureStep(_)) {
32+
new MountSecretsFeatureStep(_),
33+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
34+
=> LocalDirsFeatureStep =
35+
new LocalDirsFeatureStep(_)) {
3336

3437
def buildFromFeatures(
3538
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
3639
val baseFeatures = Seq(
3740
provideBasicStep(kubernetesConf),
3841
provideCredentialsStep(kubernetesConf),
39-
provideServiceStep(kubernetesConf))
42+
provideServiceStep(kubernetesConf),
43+
provideLocalDirsStep(kubernetesConf))
4044
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
4145
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
4246
} else baseFeatures

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
20-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountSecretsFeatureStep}
20+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
2121

2222
private[spark] class KubernetesExecutorBuilder(
2323
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
2424
new BasicExecutorFeatureStep(_),
2525
provideSecretsStep:
2626
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
27-
new MountSecretsFeatureStep(_)) {
27+
new MountSecretsFeatureStep(_),
28+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
29+
=> LocalDirsFeatureStep =
30+
new LocalDirsFeatureStep(_)) {
2831

2932
def buildFromFeatures(
3033
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
31-
val baseFeatures = Seq(provideBasicStep(kubernetesConf))
34+
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
3235
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
3336
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
3437
} else baseFeatures
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
20+
import org.mockito.Mockito
21+
import org.scalatest.BeforeAndAfter
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
27+
private val defaultLocalDir = "/var/data/default-local-dir"
28+
private var sparkConf: SparkConf = _
29+
private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] = _
30+
31+
before {
32+
val realSparkConf = new SparkConf(false)
33+
sparkConf = Mockito.spy(realSparkConf)
34+
kubernetesConf = KubernetesConf(
35+
sparkConf,
36+
KubernetesDriverSpecificConf(
37+
None,
38+
"app-name",
39+
"main",
40+
Seq.empty),
41+
"resource",
42+
"app-id",
43+
Map.empty,
44+
Map.empty,
45+
Map.empty,
46+
Map.empty)
47+
}
48+
49+
test("Resolve to default local dir if neither env nor configuration are set") {
50+
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
51+
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
52+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
53+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
54+
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
55+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
56+
new VolumeBuilder()
57+
.withName(s"spark-local-dir-1-default-local-dir")
58+
.withNewEmptyDir()
59+
.endEmptyDir()
60+
.build())
61+
assert(configuredPod.container.getVolumeMounts.size === 1)
62+
assert(configuredPod.container.getVolumeMounts.get(0) ===
63+
new VolumeMountBuilder()
64+
.withName(s"spark-local-dir-1-default-local-dir")
65+
.withMountPath(defaultLocalDir)
66+
.build())
67+
assert(configuredPod.container.getEnv.size === 1)
68+
assert(configuredPod.container.getEnv.get(0) ===
69+
new EnvVarBuilder()
70+
.withName("SPARK_LOCAL_DIRS")
71+
.withValue(defaultLocalDir)
72+
.build())
73+
}
74+
75+
test("Use configured local dirs split on comma if provided.") {
76+
Mockito.doReturn("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
77+
.when(sparkConf).getenv("SPARK_LOCAL_DIRS")
78+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
79+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
80+
assert(configuredPod.pod.getSpec.getVolumes.size === 2)
81+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
82+
new VolumeBuilder()
83+
.withName(s"spark-local-dir-1-my-local-dir-1")
84+
.withNewEmptyDir()
85+
.endEmptyDir()
86+
.build())
87+
assert(configuredPod.pod.getSpec.getVolumes.get(1) ===
88+
new VolumeBuilder()
89+
.withName(s"spark-local-dir-2-my-local-dir-2")
90+
.withNewEmptyDir()
91+
.endEmptyDir()
92+
.build())
93+
assert(configuredPod.container.getVolumeMounts.size === 2)
94+
assert(configuredPod.container.getVolumeMounts.get(0) ===
95+
new VolumeMountBuilder()
96+
.withName(s"spark-local-dir-1-my-local-dir-1")
97+
.withMountPath("/var/data/my-local-dir-1")
98+
.build())
99+
assert(configuredPod.container.getVolumeMounts.get(1) ===
100+
new VolumeMountBuilder()
101+
.withName(s"spark-local-dir-2-my-local-dir-2")
102+
.withMountPath("/var/data/my-local-dir-2")
103+
.build())
104+
assert(configuredPod.container.getEnv.size === 1)
105+
assert(configuredPod.container.getEnv.get(0) ===
106+
new EnvVarBuilder()
107+
.withName("SPARK_LOCAL_DIRS")
108+
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
109+
.build())
110+
}
111+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.submit
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
2020
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
21-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
21+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
2222

2323
class KubernetesDriverBuilderSuite extends SparkFunSuite {
2424

2525
private val BASIC_STEP_TYPE = "basic"
2626
private val CREDENTIALS_STEP_TYPE = "credentials"
2727
private val SERVICE_STEP_TYPE = "service"
28+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
2829
private val SECRETS_STEP_TYPE = "mount-secrets"
2930

3031
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -36,6 +37,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
3637
private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3738
SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
3839

40+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
41+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
42+
3943
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
4044
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
4145

@@ -44,7 +48,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
4448
_ => basicFeatureStep,
4549
_ => credentialsStep,
4650
_ => serviceStep,
47-
_ => secretsStep)
51+
_ => secretsStep,
52+
_ => localDirsStep)
4853

4954
test("Apply fundamental steps all the time.") {
5055
val conf = KubernetesConf(
@@ -64,7 +69,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
6469
builderUnderTest.buildFromFeatures(conf),
6570
BASIC_STEP_TYPE,
6671
CREDENTIALS_STEP_TYPE,
67-
SERVICE_STEP_TYPE)
72+
SERVICE_STEP_TYPE,
73+
LOCAL_DIRS_STEP_TYPE)
6874
}
6975

7076
test("Apply secrets step if secrets are present.") {
@@ -86,6 +92,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
8692
BASIC_STEP_TYPE,
8793
CREDENTIALS_STEP_TYPE,
8894
SERVICE_STEP_TYPE,
95+
LOCAL_DIRS_STEP_TYPE,
8996
SECRETS_STEP_TYPE)
9097
}
9198

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,24 @@ import io.fabric8.kubernetes.api.model.PodBuilder
2020

2121
import org.apache.spark.{SparkConf, SparkFunSuite}
2222
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
23-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
23+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
2424

2525
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
2626
private val BASIC_STEP_TYPE = "basic"
2727
private val SECRETS_STEP_TYPE = "mount-secrets"
28+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
2829

2930
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3031
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
3132
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3233
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
34+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
35+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
3336

3437
private val builderUnderTest = new KubernetesExecutorBuilder(
3538
_ => basicFeatureStep,
36-
_ => mountSecretsStep)
39+
_ => mountSecretsStep,
40+
_ => localDirsStep)
3741

3842
test("Basic steps are consistently applied.") {
3943
val conf = KubernetesConf(
@@ -46,7 +50,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
4650
Map.empty,
4751
Map.empty,
4852
Map.empty)
49-
validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE)
53+
validateStepTypesApplied(
54+
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
5055
}
5156

5257
test("Apply secrets step if secrets are present.") {
@@ -63,6 +68,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
6368
validateStepTypesApplied(
6469
builderUnderTest.buildFromFeatures(conf),
6570
BASIC_STEP_TYPE,
71+
LOCAL_DIRS_STEP_TYPE,
6672
SECRETS_STEP_TYPE)
6773
}
6874

0 commit comments

Comments
 (0)