Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23153][K8s] Support client dependencies with a Hadoop Compatible File System #23546

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 71 additions & 20 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}

import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
val sparkConf = args.toSparkConf()
var childMainClass = ""

// Set the cluster manager
Expand Down Expand Up @@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

if (!isMesosCluster && !isStandAloneCluster) {
Expand All @@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
}

Expand Down Expand Up @@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull

if (isKubernetesClusterModeDriver) {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
args.jars = renameResourcesToLocalFS(args.jars, localJars)
val localFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
args.files = renameResourcesToLocalFS(args.files, localFiles)
}
}

// When running in YARN, for some remote resources with scheme:
Expand Down Expand Up @@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),

// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
Copy link
Contributor Author

@skonto skonto Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Packages in containers maybe slow if your net is slow since ivy cache will be empty. Users in practice should build their dependencies in the image or use a pre-populated cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point might be a good addition to the docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it.

CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.excludes"),

// Yarn only
Expand Down Expand Up @@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
(childArgs, childClasspath, sparkConf, childMainClass)
}

private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
if (resources != null && localResources != null) {
val localResourcesSeq = Utils.stringToSeq(localResources)
Utils.stringToSeq(resources).map { resource =>
srowen marked this conversation as resolved.
Show resolved Hide resolved
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
localResourcesSeq.find { localUri =>
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
filenameRemote == filenameLocal
}.getOrElse(resource)
}.mkString(",")
} else {
resources
}
}

// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
Expand All @@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
sparkConf.set(key, shortUserName)
}

private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}

/**
* Run the main method of the child class using the submit arguments.
*
Expand Down Expand Up @@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}

val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)

val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,12 +1325,12 @@ class SparkSubmitSuite
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
}
}

Expand Down
37 changes: 34 additions & 3 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.
dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
A typical example of this using S3 is via passing the following options:

```
...
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
--conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.secret.key=....
--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
file:///full/path/to/app.jar
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the submission client the user's intention is to upload to S3 instead of say an HDFS cluster? I don't think this can be determined 100% sure only based on the present of those s3a options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you have spark.kubernetes.file.upload.path below, which should also be added here as an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is agnostic of the protocol. I am just using S3 as an example in the docs. If they dont put the properties submit will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.

The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.

Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
srowen marked this conversation as resolved.
Show resolved Hide resolved
file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
cluster mode.

## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Expand Down Expand Up @@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
Some of these include:

* Dynamic Resource Allocation and External Shuffle Service
* Local File Dependency Management
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the fact that this is an hadoop (compatible) FS based solution imply there are use cases for local deps that aren't served by this PR?

Copy link
Contributor Author

@skonto skonto Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there might be cases like the RSS server where users want to upload to a file server within the cluster. I am covering the cases mentioned in the design document which provide an API to use out of the box. The RSS implementation AFAIK needs improvements so its open for now, but we can work on it next. I could add a note there instead of removing that part of the doc saying ("partially done"), but since it is a working solution, I thought I could remove that from future work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, does any s3 object store fit the HCFS category?

Copy link
Contributor Author

@skonto skonto Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick example is minio another is swift.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that's broadly applicable enough to call it "done"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have environments where there is nothing remotely HDFS like available and the systems are typically air-gapped so using external services like S3 isn't an option either. Primary storage is usually a high performance parallel file system (Lustre or IBM Spectrum Scale) which is just a POSIX compliant file system mounted to all nodes over the system interconnect.

Using hostPath volume mounts isn't a realistic option either because these environments have strict security requirements.

Copy link
Contributor Author

@skonto skonto Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion we need both options: a) upload to some dfs/obect store service b) a file server.
We cannot possibly support all systems out there if they have their own client libs.

* Job Queues and Resource Management

# Configuration
Expand Down Expand Up @@ -1069,6 +1091,15 @@ See the [configuration page](configuration.html) for information on Spark config
Specify the grace period in seconds when deleting a Spark application using spark-submit.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.file.upload.path</code></td>
<td>(none)</td>
<td>
Path to store files at the spark submit side in cluster mode. For example:
<code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
File should specified as <code>file://path/to/file </code> or absolute path.
</td>
</tr>
</table>

#### Pod template properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ private[spark] object Config extends Logging {
.timeConf(TimeUnit.SECONDS)
.createOptional

val KUBERNETES_FILE_UPLOAD_PATH =
ConfigBuilder("spark.kubernetes.file.upload.path")
.doc("Hadoop compatible file system path where files from the local file system " +
"will be uploded to in cluster mode.")
.stringConf
.createOptional

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
*/
package org.apache.spark.deploy.k8s

import java.io.File
import java.io.{File, IOException}
import java.net.URI
import java.security.SecureRandom
import java.util.UUID

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.Utils.getHadoopFileSystem

private[spark] object KubernetesUtils extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can be private[k8s] but no big deal

Copy link
Contributor Author

@skonto skonto Apr 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed all the existing convention. All classes in this module use spark, this requires to re-factor the whole thing to be consistent, maybe in another PR?


Expand Down Expand Up @@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
Hex.encodeHexString(random) + time
}

/**
* Upload files and modify their uris
*/
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
: Iterable[String] = {
fileUris.map { uri =>
uploadFileUri(uri, conf)
}
}

private def isLocalDependency(uri: URI): Boolean = {
uri.getScheme match {
case null | "file" => true
case _ => false
}
}

def isLocalAndResolvable(resource: String): Boolean = {
resource != SparkLauncher.NO_RESOURCE &&
isLocalDependency(Utils.resolveURI(resource))
}

def renameMainAppResource(resource: String, conf: SparkConf): String = {
if (isLocalAndResolvable(resource)) {
SparkLauncher.NO_RESOURCE
} else {
resource
}
}

def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
case Some(sConf) =>
if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
val fileUri = Utils.resolveURI(uri)
try {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
val randomDirName = s"spark-upload-${UUID.randomUUID()}"
fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
targetUri
} catch {
case e: Exception =>
throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
}
} else {
throw new SparkException("Please specify " +
"spark.kubernetes.file.upload.path property.")
}
case _ => throw new SparkException("Spark configuration is missing...")
}
}

/**
* Upload a file to a Hadoop-compatible filesystem.
*/
private def uploadFileToHadoopCompatibleFS(
src: Path,
dest: Path,
fs: FileSystem,
delSrc : Boolean = false,
overwrite: Boolean = true): Unit = {
try {
fs.copyFromLocalFile(false, true, src, dest)
} catch {
case e: IOException =>
throw new SparkException(s"Error uploading file ${src.getName}", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -153,6 +152,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
if (resolved.nonEmpty) {
additionalProps.put(key.key, resolved.mkString(","))
}
}
additionalProps.toMap
}
}

Loading