Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
exit 1
fi

# NOTE: If you add or remove spark-sumbmit options,
# NOTE: If you add or remove spark-submit options,
# modify NOT ONLY this script but also SparkSubmitArgument.scala
SUBMISSION_OPTS=()
APPLICATION_OPTS=()
while (($#)); do
case "$1" in
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
--conf | --properties-file | --driver-memory | --driver-java-options | \
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
if [[ $# -lt 2 ]]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/windows-utils.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
Expand Down
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,17 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<version>${ivy.version}</version>
</dependency>
<dependency>
<groupId>oro</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is oro used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ivy's pattern matcher uses it. Jenkins returned a ClassNotFoundException when that wasn't in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz add a comment here:

<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->

<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
<artifactId>oro</artifactId>
<version>${oro.version}</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
null
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
try {
env.httpFileServer.addJar(new File(uri.getPath))
} catch {
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
// A JAR file which exists locally on every worker node
case "local" =>
Expand Down
217 changes: 215 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path

import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -194,6 +205,18 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
}

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
Expand All @@ -202,6 +225,7 @@ object SparkSubmit {
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -213,6 +237,7 @@ object SparkSubmit {

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -384,8 +409,8 @@ object SparkSubmit {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
println(s"Failed to load main class $childMainClass.")
println("You need to build Spark with -Phive and -Phive-thriftserver.")
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
Expand Down Expand Up @@ -475,6 +500,194 @@ object SparkSubmit {
}
}

/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils {

// Exposed for testing
private[spark] var printStream = SparkSubmit.printStream

/**
* Represents a Maven Coordinate
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* Extracts maven coordinates from a comma-delimited string
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
s"be whitespace. The groupId provided is: ${splits(0)}")
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
}

/**
* Extracts maven coordinates from a comma-delimited string
* @param remoteRepos Comma-delimited string of remote repositories
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("list")

// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
br.setName("central")
cr.add(br)

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
}
}
cr
}

/**
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
* (will append to jars in SparkSubmit). The name of the jar is given
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
* @param artifacts Sequence of dependencies that were resolved and retrieved
* @param cacheDirectory directory where jars are cached
* @return a comma-delimited list of paths for the dependencies
*/
private[spark] def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): String = {
artifacts.map { artifactInfo =>
val artifactString = artifactInfo.toString
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
cacheDirectory.getAbsolutePath + File.separator +
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
}.mkString(",")
}

/** Adds the given maven coordinates to Ivy's module descriptor. */
private[spark] def addDependenciesToIvy(
md: DefaultModuleDescriptor,
artifacts: Seq[MavenCoordinate],
ivyConfName: String): Unit = {
artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
printStream.println(s"${dd.getDependencyId} added as a dependency")
md.addDependency(dd)
}
}

/** A nice function to use in tests as well. Values are dummy strings. */
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))

/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
private[spark] def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
isTest: Boolean = false): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is really long and has a bunch of different responsibilities, so I wonder whether it makes sense to split it into a few smaller helper functions (this could simplify testing as well). I'm not sure what's the best way to do this, but one starting point might be to extract the coordinates -> MavenCoordinate function into its own function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, maybe the configuration of the ChainResolver could be done in its own helper method that takes the comma-separated list of remoteRepos and returns a resolver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what do you think about returning a Seq of paths and leaving it up to the caller to join them with commas?

if (coordinates == null || coordinates.trim.isEmpty) {
""
} else {
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
resolveOptions.setDownload(true)
}

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add an exclusion rule for Spark
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)

// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))

resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
}

/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var packages: String = null
var repositories: String = null
var ivyRepoPath: String = null
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
Expand Down Expand Up @@ -123,6 +126,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're not adding a command-line option for overriding the Ivy repo because we don't expect it to be changed on the fly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually in order to not expose it to users in spark-submit. I still wanted to have it as a configuration just for the flexibility.

deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
Expand Down Expand Up @@ -212,6 +216,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| name $name
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| packages $packages
| repositories $repositories
| verbose $verbose
|
|Spark properties used, including those specified through
Expand Down Expand Up @@ -318,6 +324,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
jars = Utils.resolveURIs(value)
parse(tail)

case ("--packages") :: value :: tail =>
packages = value
parse(tail)

case ("--repositories") :: value :: tail =>
repositories = value
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
Expand Down Expand Up @@ -368,6 +382,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths.
| --packages Comma-separated list of maven coordinates of jars to include
| on the driver and executor classpaths. Will search the local
| maven repo, then maven central and any additional remote
| repositories given by --repositories. The format for the
| coordinates should be groupId:artifactId:version.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down
Loading