From 882c4c8bba9793fff7bf4998f81f518bd5af9371 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 26 Jan 2015 12:52:46 -0800 Subject: [PATCH 01/20] added maven dependency download --- bin/resolve-maven-coordinates.sh | 116 ++++++++++++++++++ bin/spark-class | 2 + bin/spark-submit | 4 + bin/utils.sh | 6 +- .../spark/deploy/SparkSubmitArguments.scala | 17 +++ .../scala/org/apache/spark/util/Utils.scala | 2 + 6 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 bin/resolve-maven-coordinates.sh diff --git a/bin/resolve-maven-coordinates.sh b/bin/resolve-maven-coordinates.sh new file mode 100644 index 000000000000..73e6415084e9 --- /dev/null +++ b/bin/resolve-maven-coordinates.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +MAVEN_SETTINGS_DIR=~/.m2/settings.xml +MAVEN_LOCAL_REPO=~/.m2/repository + +# Read user maven configurations if there are any. Update path to local repository if needed. +if [ -e "$MAVEN_SETTINGS_DIR" ]; then + searchAlternateRepo=$(grep "" $MAVEN_SETTINGS_DIR) + if [ ${#searchAlternateRepo[@]} == 1 ]; then + removePrefix=${searchAlternateRepo#} + MAVEN_LOCAL_REPO=${removePrefix%} + fi +fi + +# Given the path for a maven coordinate, returns the artifactId and the version +computeArtifactName() { + coordinatePath=$1 + stripPrefix=${coordinatePath#*/} + echo ${stripPrefix%.*} +} + +# Given a maven coordinate, returns the path to the corresponding jar in the local Maven repo. +# Currently requires the coordinate to be in the form `groupId:artifactId:version` +computeLocalPath() { + coordinate=$1 + split=(${coordinate//:/ }) + if [ ${#split[@]} != 3 ]; then + echo "Provided Maven Coordinates must be in the form 'groupId:artifactId:version'." + echo "The coordinate provided is: coordinate" + exit 1 + fi + groupId=${split[0]//.//} + artifactId=${split[1]} + version=${split[2]} + echo "$MAVEN_LOCAL_REPO/$groupId/$artifactId/$version/$artifactId-$version.jar" +} + +CUR_DIR=$PWD + +# Removes dependency on Spark (if there is one) +removeSparkDependency() { + artifactName=$1 + echo "$artifactName" >> log.txt + # Create empty pom file for the maven plugin to use + > pom.xml + inSpark=false + while read -r line; do + if [[ $line == *"org.apache.spark"* ]]; then + inSpark=true + fi + if [[ $inSpark == true ]] && [[ $line == *""* ]]; then + echo "provided" >> pom.xml + inSpark=false + fi + echo $line >> pom.xml + done < "/$artifactName.pom" + # bash skips the last line for some reason + echo $line >> pom.xml +} + +# Recursive function that gets the first level of dependencies of each maven coordinate. +# We use a recursive function so that if any of the transitive dependencies are Spark, we don't +# include anything related to it in the classpath. +addDependenciesToClasspath() { + pathOfArtifact=$1 + if [ ${#pathOfArtifact} -gt 0 ]; then + artifactName=$(computeArtifactName $pathOfArtifact) + cd ${pathOfArtifact%/*} + mavenPath=$pathOfArtifact + > cp.txt + removeSparkDependency $artifactName + mvn dependency:build-classpath -Dmdep.outputFile=cp.txt -DexcludeScope=provided -DexcludeTransitive=true + depClasspath=`cat cp.txt` + depList=(${depClasspath//:/ }) + for dep in "${depList[@]}"; do + mavenPath="$mavenPath:$(addDependenciesToClasspath $dep)" + done + echo $mavenPath + fi +} + +# The path to jars in the local maven repo that will be appended to the classpath +mavenClasspath="" +if [ ! -z "SPARK_SUBMIT_MAVEN_COORDINATES" ]; then + coordinateList=(${SPARK_SUBMIT_MAVEN_COORDINATES//,/ }) + for i in "${coordinateList[@]}"; do + localPath=$(computeLocalPath "$i") + # if jar doesn't exist, download it and all it's dependencies (except Spark) + if [ ! -e "$localPath.jar" ]; then + mvn dependency:get -Dartifact=$i -DremoteRepositories=$SPARK_SUBMIT_MAVEN_REPOS -Dtransitive=false + fi + # add all dependencies of this jar to the classpath + mavenClasspath="$mavenClasspath:$(addDependenciesToClasspath $localPath)" + done +fi + +cd $CUR_DIR + +echo ${mavenClasspath#:} diff --git a/bin/spark-class b/bin/spark-class index 1b945461fabc..e5aa968c66dd 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -147,6 +147,8 @@ else CLASSPATH="$classpath_output" fi + + if [[ "$1" =~ org.apache.spark.tools.* ]]; then if test -z "$SPARK_TOOLS_JAR"; then echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2 diff --git a/bin/spark-submit b/bin/spark-submit index 3e5cbdbb2439..8cd76833e305 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -40,6 +40,10 @@ while (($#)); do export SPARK_SUBMIT_OPTS=$2 elif [ "$1" = "--master" ]; then export MASTER=$2 + elif [ "$1" = "--maven" ]; then + export SPARK_SUBMIT_MAVEN_COORDINATES=$2 + elif [ "$1" = "--maven_repos" ]; then + export SPARK_SUBMIT_MAVEN_REPOS=$2 fi shift done diff --git a/bin/utils.sh b/bin/utils.sh index 22ea2b9a6d58..4ddbe0e53d60 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() { exit 1 fi - # NOTE: If you add or remove spark-sumbmit options, + # NOTE: If you add or remove spark-submit options, # modify NOT ONLY this script but also SparkSubmitArgument.scala SUBMISSION_OPTS=() APPLICATION_OPTS=() while (($#)); do case "$1" in - --master | --deploy-mode | --class | --name | --jars | --py-files | --files | \ - --conf | --properties-file | --driver-memory | --driver-java-options | \ + --master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \ + --conf | --maven_repos | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) if [[ $# -lt 2 ]]; then diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 81ec08cb6d50..30503a6b5fc9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -50,6 +50,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var name: String = null var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null + var maven: String = null + var maven_repos: String = null var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -224,6 +226,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | name $name | childArgs [${childArgs.mkString(" ")}] | jars $jars + | maven $maven | verbose $verbose | |Spark properties used, including those specified through @@ -330,6 +333,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St jars = Utils.resolveURIs(value) parse(tail) + case ("--maven") :: value :: tail => + maven = value + parse(tail) + + case ("--maven_repos") :: value :: tail => + maven_repos = value + parse(tail) + case ("--conf" | "-c") :: value :: tail => value.split("=", 2).toSeq match { case Seq(k, v) => sparkProperties(k) = v @@ -380,6 +391,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --name NAME A name of your application. | --jars JARS Comma-separated list of local jars to include on the driver | and executor classpaths. + | --maven Comma-separated list of maven coordinates of jars to include + | on the driver and executor classpaths. Will search the local + | maven repo, then maven central and any additional remote + | repositories given by --maven_repos. + | --maven_repos Supply additional remote repositories to search for the + | maven coordinates given with --maven. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2c04e4ddfbcb..02f85018761a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1873,6 +1873,8 @@ private[spark] object Utils extends Logging { } } +private[spark] case class MavenCoordinates(groupId: String, artifactId: String, version: String) + /** * A utility class to redirect the child process's stdout or stderr. */ From 6645af42b65b1e5bc1c37065e8e07bc55c58be1a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 26 Jan 2015 17:07:22 -0800 Subject: [PATCH 02/20] [SPARK-5341] added base implementation --- bin/resolve-maven-coordinates.sh | 116 ------------------ core/pom.xml | 5 + .../org/apache/spark/deploy/SparkSubmit.scala | 94 +++++++++++++- .../scala/org/apache/spark/util/Utils.scala | 2 +- 4 files changed, 99 insertions(+), 118 deletions(-) delete mode 100644 bin/resolve-maven-coordinates.sh diff --git a/bin/resolve-maven-coordinates.sh b/bin/resolve-maven-coordinates.sh deleted file mode 100644 index 73e6415084e9..000000000000 --- a/bin/resolve-maven-coordinates.sh +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -MAVEN_SETTINGS_DIR=~/.m2/settings.xml -MAVEN_LOCAL_REPO=~/.m2/repository - -# Read user maven configurations if there are any. Update path to local repository if needed. -if [ -e "$MAVEN_SETTINGS_DIR" ]; then - searchAlternateRepo=$(grep "" $MAVEN_SETTINGS_DIR) - if [ ${#searchAlternateRepo[@]} == 1 ]; then - removePrefix=${searchAlternateRepo#} - MAVEN_LOCAL_REPO=${removePrefix%} - fi -fi - -# Given the path for a maven coordinate, returns the artifactId and the version -computeArtifactName() { - coordinatePath=$1 - stripPrefix=${coordinatePath#*/} - echo ${stripPrefix%.*} -} - -# Given a maven coordinate, returns the path to the corresponding jar in the local Maven repo. -# Currently requires the coordinate to be in the form `groupId:artifactId:version` -computeLocalPath() { - coordinate=$1 - split=(${coordinate//:/ }) - if [ ${#split[@]} != 3 ]; then - echo "Provided Maven Coordinates must be in the form 'groupId:artifactId:version'." - echo "The coordinate provided is: coordinate" - exit 1 - fi - groupId=${split[0]//.//} - artifactId=${split[1]} - version=${split[2]} - echo "$MAVEN_LOCAL_REPO/$groupId/$artifactId/$version/$artifactId-$version.jar" -} - -CUR_DIR=$PWD - -# Removes dependency on Spark (if there is one) -removeSparkDependency() { - artifactName=$1 - echo "$artifactName" >> log.txt - # Create empty pom file for the maven plugin to use - > pom.xml - inSpark=false - while read -r line; do - if [[ $line == *"org.apache.spark"* ]]; then - inSpark=true - fi - if [[ $inSpark == true ]] && [[ $line == *""* ]]; then - echo "provided" >> pom.xml - inSpark=false - fi - echo $line >> pom.xml - done < "/$artifactName.pom" - # bash skips the last line for some reason - echo $line >> pom.xml -} - -# Recursive function that gets the first level of dependencies of each maven coordinate. -# We use a recursive function so that if any of the transitive dependencies are Spark, we don't -# include anything related to it in the classpath. -addDependenciesToClasspath() { - pathOfArtifact=$1 - if [ ${#pathOfArtifact} -gt 0 ]; then - artifactName=$(computeArtifactName $pathOfArtifact) - cd ${pathOfArtifact%/*} - mavenPath=$pathOfArtifact - > cp.txt - removeSparkDependency $artifactName - mvn dependency:build-classpath -Dmdep.outputFile=cp.txt -DexcludeScope=provided -DexcludeTransitive=true - depClasspath=`cat cp.txt` - depList=(${depClasspath//:/ }) - for dep in "${depList[@]}"; do - mavenPath="$mavenPath:$(addDependenciesToClasspath $dep)" - done - echo $mavenPath - fi -} - -# The path to jars in the local maven repo that will be appended to the classpath -mavenClasspath="" -if [ ! -z "SPARK_SUBMIT_MAVEN_COORDINATES" ]; then - coordinateList=(${SPARK_SUBMIT_MAVEN_COORDINATES//,/ }) - for i in "${coordinateList[@]}"; do - localPath=$(computeLocalPath "$i") - # if jar doesn't exist, download it and all it's dependencies (except Spark) - if [ ! -e "$localPath.jar" ]; then - mvn dependency:get -Dartifact=$i -DremoteRepositories=$SPARK_SUBMIT_MAVEN_REPOS -Dtransitive=false - fi - # add all dependencies of this jar to the classpath - mavenClasspath="$mavenClasspath:$(addDependenciesToClasspath $localPath)" - done -fi - -cd $CUR_DIR - -echo ${mavenClasspath#:} diff --git a/core/pom.xml b/core/pom.xml index d9a49c9e08af..7bcade380f17 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -224,6 +224,11 @@ derby test + + org.apache.ivy + ivy + 2.4.0 + org.tachyonproject tachyon-client diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 050ba91eb2bc..04601c8754ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -21,10 +21,19 @@ import java.io.{File, PrintStream} import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL +import org.apache.ivy.Ivy +import org.apache.ivy.core.module.descriptor.{DefaultDependencyDescriptor, DefaultModuleDescriptor} +import org.apache.ivy.core.module.id.ModuleRevisionId +import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.resolve.ResolveOptions +import org.apache.ivy.core.retrieve.RetrieveOptions +import org.apache.ivy.core.settings.IvySettings +import org.apache.ivy.plugins.resolver.IBiblioResolver + import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import org.apache.spark.executor.ExecutorURLClassLoader -import org.apache.spark.util.Utils +import org.apache.spark.util.{MavenCoordinate, Utils} /** * Main gateway of launching a Spark application. @@ -56,6 +65,12 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 + // Directories for caching downloads through ivy and storing the jars when maven coordinates are + // supplied to spark-submit + // TODO: Take these as arguments? For example, on AWS /mnt/ is a better location. + private val IVY_CACHE = new File("ivy/cache") + private val MAVEN_JARS = new File("ivy/jars") + // Exposed for testing private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] var printStream: PrintStream = System.err @@ -168,6 +183,16 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" + // Resolve maven dependencies if there are any and add classpath to jars + val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.maven_repos) + if (!resolvedMavenCoordinates.trim.isEmpty) { + if (args.jars == null || args.jars.trim.isEmpty) { + args.jars = resolvedMavenCoordinates + } else { + args.jars += s",$resolvedMavenCoordinates" + } + } + // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( @@ -429,6 +454,73 @@ object SparkSubmit { .mkString(",") if (merged == "") null else merged } + + private def resolveMavenCoordinates(coordinates: String, remoteRepos: String): String = { + if (coordinates == null || coordinates.trim.isEmpty) { + "" + } else { + val artifacts = coordinates.split(",").map { p => + val splits = p.split(":") + require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + + s"'groupId:artifactId:version'. The coordinate provided is: $p") + new MavenCoordinate(splits(0), splits(1), splits(2)) + } + // create an ivy instance + val ivySettings: IvySettings = new IvySettings + ivySettings.setDefaultCache(IVY_CACHE) + + // the biblio resolver resolves POM declared dependencies + val br: IBiblioResolver = new IBiblioResolver + br.setM2compatible(true) + br.setUsepoms(true) + br.setName("central") + ivySettings.addResolver(br) + // add any other remote repositories other than maven central + if (remoteRepos != null && !remoteRepos.trim.isEmpty) { + remoteRepos.split(",").foreach { repo => + val brr: IBiblioResolver = new IBiblioResolver + brr.setM2compatible(true) + brr.setUsepoms(true) + brr.setRoot(repo) + ivySettings.addResolver(brr) + } + } + ivySettings.setDefaultResolver(br.getName) + + val ivy = Ivy.newInstance(ivySettings) + // Set resolve options to download transitive dependencies as well + val ro = new ResolveOptions + ro.setTransitive(true) + ro.setDownload(true) + // A Module descriptor must be specified. Entries are dummy strings + val md = DefaultModuleDescriptor.newDefaultInstance( + ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) + + artifacts.foreach { mvn => + val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) + val dd = new DefaultDependencyDescriptor(ri, false, false) + dd.addDependencyConfiguration("default", "default") + md.addDependency(dd) + } + // resolve dependencies + val rr: ResolveReport = ivy.resolve(md, ro) + if (rr.hasError) { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + // retrieve all resolved dependencies + val m = rr.getModuleDescriptor + ivy.retrieve(m.getModuleRevisionId, + MAVEN_JARS.getAbsolutePath + "/[artifact](-[classifier]).[ext]", + new RetrieveOptions().setConfs(Array("default"))) + + // output downloaded jars to classpath (will append to jars). The name of the jar is given + // after a `!` by Ivy. + rr.getArtifacts.toArray.map { case artifactInfo => + val artifactString = artifactInfo.toString + MAVEN_JARS.getAbsolutePath + "/" + artifactString.drop(artifactString.lastIndexOf("!") + 1) + }.mkString(",") + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 02f85018761a..38bfa0655d65 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1873,7 +1873,7 @@ private[spark] object Utils extends Logging { } } -private[spark] case class MavenCoordinates(groupId: String, artifactId: String, version: String) +private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** * A utility class to redirect the child process's stdout or stderr. From a0870af7a16ffce241fac7979e76535f7ef864be Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 26 Jan 2015 17:58:19 -0800 Subject: [PATCH 03/20] add docs. remove unnecesary new lines --- bin/spark-class | 2 -- bin/spark-submit | 4 ---- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 7 +++++++ .../org/apache/spark/deploy/SparkSubmitArguments.scala | 1 + core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ++++++ 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/bin/spark-class b/bin/spark-class index e5aa968c66dd..1b945461fabc 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -147,8 +147,6 @@ else CLASSPATH="$classpath_output" fi - - if [[ "$1" =~ org.apache.spark.tools.* ]]; then if test -z "$SPARK_TOOLS_JAR"; then echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2 diff --git a/bin/spark-submit b/bin/spark-submit index 8cd76833e305..3e5cbdbb2439 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -40,10 +40,6 @@ while (($#)); do export SPARK_SUBMIT_OPTS=$2 elif [ "$1" = "--master" ]; then export MASTER=$2 - elif [ "$1" = "--maven" ]; then - export SPARK_SUBMIT_MAVEN_COORDINATES=$2 - elif [ "$1" = "--maven_repos" ]; then - export SPARK_SUBMIT_MAVEN_REPOS=$2 fi shift done diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 04601c8754ac..2d27649299f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -455,6 +455,13 @@ object SparkSubmit { if (merged == "") null else merged } + /** + * Resolves any dependencies that were supplied through maven coordinates + * @param coordinates Comma-delimited string of maven coordinates + * @param remoteRepos Comma-delimited string of remote repositories other than maven central + * @return The comma-delimited path to the jars of the given maven artifacts including their + * transitive dependencies + */ private def resolveMavenCoordinates(coordinates: String, remoteRepos: String): String = { if (coordinates == null || coordinates.trim.isEmpty) { "" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 30503a6b5fc9..ec9d4cabe1ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -227,6 +227,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | childArgs [${childArgs.mkString(" ")}] | jars $jars | maven $maven + | maven_repos $maven_repos | verbose $verbose | |Spark properties used, including those specified through diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 38bfa0655d65..29b425e97783 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1873,6 +1873,12 @@ private[spark] object Utils extends Logging { } } +/** + * Represents a Maven Coordinate + * @param groupId the groupId of the coordinate + * @param artifactId the artifactId of the coordinate + * @param version the version of the coordinate + */ private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** From 2edc9b59df0a8984c9066cac5fe8f66d2df64c88 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 27 Jan 2015 14:38:52 -0800 Subject: [PATCH 04/20] managed to exclude Spark and it's dependencies --- bin/utils.sh | 2 +- .../scala/org/apache/spark/SparkContext.scala | 14 ++++++- .../org/apache/spark/deploy/SparkSubmit.scala | 42 +++++++++++++++---- .../spark/deploy/SparkSubmitArguments.scala | 15 +++---- 4 files changed, 55 insertions(+), 18 deletions(-) diff --git a/bin/utils.sh b/bin/utils.sh index 4ddbe0e53d60..9d5277cddd12 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -33,7 +33,7 @@ function gatherSparkSubmitOpts() { while (($#)); do case "$1" in --master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \ - --conf | --maven_repos | --properties-file | --driver-memory | --driver-java-options | \ + --conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) if [[ $# -lt 2 ]]; then diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a354ed4d148..8080496732e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1175,7 +1175,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli null } } else { - env.httpFileServer.addJar(new File(uri.getPath)) + try { + env.httpFileServer.addJar(new File(uri.getPath)) + } catch { + case exc: FileNotFoundException => + logError(s"Jar not found at $path") + null + case e: Exception => + // For now just log an error but allow to go through so spark examples work. + // The spark examples don't really need the jar distributed since its also + // the app jar. + logError("Error adding jar (" + e + "), was the --addJars option used?") + null + } } // A JAR file which exists locally on every worker node case "local" => diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2d27649299f3..714b3699cff0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,13 +22,15 @@ import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL import org.apache.ivy.Ivy -import org.apache.ivy.core.module.descriptor.{DefaultDependencyDescriptor, DefaultModuleDescriptor} -import org.apache.ivy.core.module.id.ModuleRevisionId +import org.apache.ivy.ant.IvyDependencyExclude +import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} +import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId} import org.apache.ivy.core.report.ResolveReport import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings -import org.apache.ivy.plugins.resolver.IBiblioResolver +import org.apache.ivy.plugins.matcher.GlobPatternMatcher +import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -184,7 +186,7 @@ object SparkSubmit { sysProps("SPARK_SUBMIT") = "true" // Resolve maven dependencies if there are any and add classpath to jars - val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.maven_repos) + val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { args.jars = resolvedMavenCoordinates @@ -475,24 +477,41 @@ object SparkSubmit { // create an ivy instance val ivySettings: IvySettings = new IvySettings ivySettings.setDefaultCache(IVY_CACHE) + // create a pattern matcher + ivySettings.addMatcher(new GlobPatternMatcher) // the biblio resolver resolves POM declared dependencies val br: IBiblioResolver = new IBiblioResolver br.setM2compatible(true) br.setUsepoms(true) br.setName("central") - ivySettings.addResolver(br) + + // We need a chain resolver if we want to check multiple repositories + val cr = new ChainResolver + cr.setName("list") + cr.add(br) + + // Add an exclusion rule for Spark + val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") + val sparkDependencyExcludeRule = + new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) + sparkDependencyExcludeRule.addConfiguration("default") + // add any other remote repositories other than maven central if (remoteRepos != null && !remoteRepos.trim.isEmpty) { + var i = 1 remoteRepos.split(",").foreach { repo => val brr: IBiblioResolver = new IBiblioResolver brr.setM2compatible(true) brr.setUsepoms(true) brr.setRoot(repo) - ivySettings.addResolver(brr) + brr.setName(s"repo-$i") + cr.add(brr) + i += 1 } } - ivySettings.setDefaultResolver(br.getName) + ivySettings.addResolver(cr) + ivySettings.setDefaultResolver(cr.getName) val ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well @@ -503,12 +522,15 @@ object SparkSubmit { val md = DefaultModuleDescriptor.newDefaultInstance( ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) + md.addExcludeRule(sparkDependencyExcludeRule) + artifacts.foreach { mvn => val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) val dd = new DefaultDependencyDescriptor(ri, false, false) dd.addDependencyConfiguration("default", "default") md.addDependency(dd) } + // resolve dependencies val rr: ResolveReport = ivy.resolve(md, ro) if (rr.hasError) { @@ -521,10 +543,12 @@ object SparkSubmit { new RetrieveOptions().setConfs(Array("default"))) // output downloaded jars to classpath (will append to jars). The name of the jar is given - // after a `!` by Ivy. + // after a `!` by Ivy. It also sometimes contains (bundle) after .jar. Remove that as well. rr.getArtifacts.toArray.map { case artifactInfo => val artifactString = artifactInfo.toString - MAVEN_JARS.getAbsolutePath + "/" + artifactString.drop(artifactString.lastIndexOf("!") + 1) + val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) + MAVEN_JARS.getAbsolutePath + "/" + + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) }.mkString(",") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index ec9d4cabe1ac..c400dd938685 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -51,7 +51,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null var maven: String = null - var maven_repos: String = null + var mavenRepos: String = null var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -227,7 +227,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | childArgs [${childArgs.mkString(" ")}] | jars $jars | maven $maven - | maven_repos $maven_repos + | maven-repos $mavenRepos | verbose $verbose | |Spark properties used, including those specified through @@ -338,8 +338,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St maven = value parse(tail) - case ("--maven_repos") :: value :: tail => - maven_repos = value + case ("--maven-repos") :: value :: tail => + mavenRepos = value parse(tail) case ("--conf" | "-c") :: value :: tail => @@ -395,9 +395,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --maven Comma-separated list of maven coordinates of jars to include | on the driver and executor classpaths. Will search the local | maven repo, then maven central and any additional remote - | repositories given by --maven_repos. - | --maven_repos Supply additional remote repositories to search for the - | maven coordinates given with --maven. + | repositories given by --maven-repos. The format for the + | coordinates should be groupId:artifactId:version. + | --maven-repos Supply additional remote repositories as a comma-delimited + | list to search for the maven coordinates given with --maven. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working From c04d885469e66ba925d33511adc440fec1e2ab15 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 27 Jan 2015 15:42:28 -0800 Subject: [PATCH 05/20] take path to ivy cache as a conf --- bin/utils.sh | 3 +- bin/windows-utils.cmd | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 66 +++++++++++-------- .../spark/deploy/SparkSubmitArguments.scala | 10 +++ 4 files changed, 52 insertions(+), 29 deletions(-) diff --git a/bin/utils.sh b/bin/utils.sh index 9d5277cddd12..1dd277bfb489 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \ --conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) + --total-executor-cores | --executor-cores | --queue | --num-executors | \ + --archives | --ivy-repo ) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 1082a952dac9..2ccfbbfd5f84 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>" SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" -SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>" +SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--maven\> \<--maven-repos\> \<--ivy-repo\>" echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 714b3699cff0..677fb46f37f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,7 +22,6 @@ import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL import org.apache.ivy.Ivy -import org.apache.ivy.ant.IvyDependencyExclude import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId} import org.apache.ivy.core.report.ResolveReport @@ -69,14 +68,15 @@ object SparkSubmit { // Directories for caching downloads through ivy and storing the jars when maven coordinates are // supplied to spark-submit - // TODO: Take these as arguments? For example, on AWS /mnt/ is a better location. - private val IVY_CACHE = new File("ivy/cache") - private val MAVEN_JARS = new File("ivy/jars") + private var IVY_CACHE: File = null + private var MAVEN_JARS: File = null // Exposed for testing private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] var printStream: PrintStream = System.err + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) + private[spark] def printErrorAndExit(str: String) = { printStream.println("Error: " + str) printStream.println("Run with --help for usage help or --verbose for debug output") @@ -94,13 +94,13 @@ object SparkSubmit { /** * @return a tuple containing - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a list of system properties and env vars, and - * (4) the main class for the child + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { // Values to return val childArgs = new ArrayBuffer[String]() @@ -147,7 +147,7 @@ object SparkSubmit { if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + - "This copy of Spark may not have been compiled with YARN support.") + "This copy of Spark may not have been compiled with YARN support.") } } @@ -186,6 +186,8 @@ object SparkSubmit { sysProps("SPARK_SUBMIT") = "true" // Resolve maven dependencies if there are any and add classpath to jars + IVY_CACHE = new File(s"${args.ivyRepoPath}/cache") + MAVEN_JARS = new File(s"${args.ivyRepoPath}/jars") val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { @@ -203,6 +205,7 @@ object SparkSubmit { OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -214,6 +217,7 @@ object SparkSubmit { // Standalone cluster only OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"), + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), @@ -252,18 +256,26 @@ object SparkSubmit { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } - if (args.jars != null) { childClasspath ++= args.jars.split(",") } - if (args.childArgs != null) { childArgs ++= args.childArgs } + if (args.jars != null) { + childClasspath ++= args.jars.split(",") + } + if (args.childArgs != null) { + childArgs ++= args.childArgs + } } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && - (deployMode & opt.deployMode) != 0 && - (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } - if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } + (deployMode & opt.deployMode) != 0 && + (clusterManager & opt.clusterManager) != 0) { + if (opt.clOption != null) { + childArgs +=(opt.clOption, opt.value) + } + if (opt.sysProp != null) { + sysProps.put(opt.sysProp, opt.value) + } } } @@ -286,7 +298,7 @@ object SparkSubmit { childArgs += "--supervise" } childArgs += "launch" - childArgs += (args.master, args.primaryResource, args.mainClass) + childArgs +=(args.master, args.primaryResource, args.mainClass) if (args.childArgs != null) { childArgs ++= args.childArgs } @@ -296,11 +308,11 @@ object SparkSubmit { if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.primaryResource != SPARK_INTERNAL) { - childArgs += ("--jar", args.primaryResource) + childArgs +=("--jar", args.primaryResource) } - childArgs += ("--class", args.mainClass) + childArgs +=("--class", args.mainClass) if (args.childArgs != null) { - args.childArgs.foreach { arg => childArgs += ("--arg", arg) } + args.childArgs.foreach { arg => childArgs +=("--arg", arg)} } } @@ -341,11 +353,11 @@ object SparkSubmit { } private def launch( - childArgs: ArrayBuffer[String], - childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], - childMainClass: String, - verbose: Boolean = false) { + childArgs: ArrayBuffer[String], + childClasspath: ArrayBuffer[String], + sysProps: Map[String, String], + childMainClass: String, + verbose: Boolean = false) { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -452,8 +464,8 @@ object SparkSubmit { */ private[spark] def mergeFileLists(lists: String*): String = { val merged = lists.filter(_ != null) - .flatMap(_.split(",")) - .mkString(",") + .flatMap(_.split(",")) + .mkString(",") if (merged == "") null else merged } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index c400dd938685..943c9a17ca7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -52,6 +52,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var jars: String = null var maven: String = null var mavenRepos: String = null + var ivyRepoPath: String = null var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -125,6 +126,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull + ivyRepoPath = Option(ivyRepoPath) + .orElse(sparkProperties.get("spark.jars.ivy")).orElse(Option("ivy/")).get deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) @@ -228,6 +231,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | jars $jars | maven $maven | maven-repos $mavenRepos + | ivy-repo $ivyRepoPath | verbose $verbose | |Spark properties used, including those specified through @@ -342,6 +346,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St mavenRepos = value parse(tail) + case ("--ivy-repo") :: value :: tail => + ivyRepoPath = Utils.resolveURIs(value) + parse(tail) + case ("--conf" | "-c") :: value :: tail => value.split("=", 2).toSeq match { case Seq(k, v) => sparkProperties(k) = v @@ -399,6 +407,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | coordinates should be groupId:artifactId:version. | --maven-repos Supply additional remote repositories as a comma-delimited | list to search for the maven coordinates given with --maven. + | --ivy-repo The path to use to cache jars downloaded using maven + | coordinates. The default is $PWD/ivy | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working From 3705907dc2f61fa68f64df14a23622cc40aff9d8 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 27 Jan 2015 17:18:34 -0800 Subject: [PATCH 06/20] remove ivy-repo as a command line argument. Use global ivy cache as default --- .../org/apache/spark/deploy/SparkSubmit.scala | 20 +++++++++++++------ .../spark/deploy/SparkSubmitArguments.scala | 10 +--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 677fb46f37f7..b8353b13a796 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -186,9 +186,8 @@ object SparkSubmit { sysProps("SPARK_SUBMIT") = "true" // Resolve maven dependencies if there are any and add classpath to jars - IVY_CACHE = new File(s"${args.ivyRepoPath}/cache") - MAVEN_JARS = new File(s"${args.ivyRepoPath}/jars") - val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos) + val resolvedMavenCoordinates = + resolveMavenCoordinates(args.maven, args.mavenRepos, args.ivyRepoPath) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { args.jars = resolvedMavenCoordinates @@ -476,7 +475,10 @@ object SparkSubmit { * @return The comma-delimited path to the jars of the given maven artifacts including their * transitive dependencies */ - private def resolveMavenCoordinates(coordinates: String, remoteRepos: String): String = { + private def resolveMavenCoordinates( + coordinates: String, + remoteRepos: String, + ivyPath: String): String = { if (coordinates == null || coordinates.trim.isEmpty) { "" } else { @@ -486,9 +488,15 @@ object SparkSubmit { s"'groupId:artifactId:version'. The coordinate provided is: $p") new MavenCoordinate(splits(0), splits(1), splits(2)) } - // create an ivy instance + // set ivy settings for location of cache val ivySettings: IvySettings = new IvySettings - ivySettings.setDefaultCache(IVY_CACHE) + if (ivyPath == null || ivyPath.trim.length == 0) { + MAVEN_JARS = new File(ivySettings.getDefaultIvyUserDir, "jars") + } else { + ivySettings.setDefaultCache(new File(ivyPath, "cache")) + MAVEN_JARS = new File(ivyPath, "jars") + } + // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 943c9a17ca7f..cdd80bb2a958 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -126,8 +126,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull - ivyRepoPath = Option(ivyRepoPath) - .orElse(sparkProperties.get("spark.jars.ivy")).orElse(Option("ivy/")).get + ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) @@ -231,7 +230,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | jars $jars | maven $maven | maven-repos $mavenRepos - | ivy-repo $ivyRepoPath | verbose $verbose | |Spark properties used, including those specified through @@ -346,10 +344,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St mavenRepos = value parse(tail) - case ("--ivy-repo") :: value :: tail => - ivyRepoPath = Utils.resolveURIs(value) - parse(tail) - case ("--conf" | "-c") :: value :: tail => value.split("=", 2).toSeq match { case Seq(k, v) => sparkProperties(k) = v @@ -407,8 +401,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | coordinates should be groupId:artifactId:version. | --maven-repos Supply additional remote repositories as a comma-delimited | list to search for the maven coordinates given with --maven. - | --ivy-repo The path to use to cache jars downloaded using maven - | coordinates. The default is $PWD/ivy | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working From 53423e0cb81dc43e0317cc5ff79fa06ff421a9e4 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 13:34:38 -0800 Subject: [PATCH 07/20] tests added --- bin/utils.sh | 7 +- .../org/apache/spark/deploy/SparkSubmit.scala | 109 ++++++++++++------ .../spark/deploy/SparkSubmitArguments.scala | 24 ++-- .../scala/org/apache/spark/util/Utils.scala | 8 -- .../spark/deploy/SparkSubmitSuite.scala | 40 ++++++- .../spark/deploy/SparkSubmitUtilsSuite.scala | 63 ++++++++++ 6 files changed, 193 insertions(+), 58 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala diff --git a/bin/utils.sh b/bin/utils.sh index 1dd277bfb489..30534949fa96 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -32,11 +32,10 @@ function gatherSparkSubmitOpts() { APPLICATION_OPTS=() while (($#)); do case "$1" in - --master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \ - --conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \ + --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \ + --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | \ - --archives | --ivy-repo ) + --total-executor-cores | --executor-cores | --queue | --num-executors | --archives ) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b8353b13a796..11f44e90d6f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,19 +22,23 @@ import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL import org.apache.ivy.Ivy -import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} +import org.apache.ivy.ant.AntMessageLogger +import org.apache.ivy.core.LogOptions +import org.apache.ivy.core.module.descriptor._ import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId} import org.apache.ivy.core.report.ResolveReport -import org.apache.ivy.core.resolve.ResolveOptions +import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions} import org.apache.ivy.core.retrieve.RetrieveOptions -import org.apache.ivy.core.settings.IvySettings +import org.apache.ivy.core.settings.{IvyVariableContainerImpl, IvySettings} import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} +import org.apache.ivy.util.DefaultMessageLogger import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import org.apache.spark.Logging import org.apache.spark.executor.ExecutorURLClassLoader -import org.apache.spark.util.{MavenCoordinate, Utils} +import org.apache.spark.util.Utils /** * Main gateway of launching a Spark application. @@ -66,11 +70,6 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 - // Directories for caching downloads through ivy and storing the jars when maven coordinates are - // supplied to spark-submit - private var IVY_CACHE: File = null - private var MAVEN_JARS: File = null - // Exposed for testing private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] var printStream: PrintStream = System.err @@ -187,7 +186,7 @@ object SparkSubmit { // Resolve maven dependencies if there are any and add classpath to jars val resolvedMavenCoordinates = - resolveMavenCoordinates(args.maven, args.mavenRepos, args.ivyRepoPath) + SparkSubmitUtils.resolveMavenCoordinates(args.packages, args.repositories, args.ivyRepoPath) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { args.jars = resolvedMavenCoordinates @@ -352,11 +351,11 @@ object SparkSubmit { } private def launch( - childArgs: ArrayBuffer[String], - childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], - childMainClass: String, - verbose: Boolean = false) { + childArgs: ArrayBuffer[String], + childClasspath: ArrayBuffer[String], + sysProps: Map[String, String], + childMainClass: String, + verbose: Boolean = false) { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -467,18 +466,36 @@ object SparkSubmit { .mkString(",") if (merged == "") null else merged } +} + +/** Provides utility functions to be used inside SparkSubmit. */ +private[spark] object SparkSubmitUtils extends Logging { + + // Directories for caching downloads through ivy and storing the jars when maven coordinates are + // supplied to spark-submit + private var PACKAGES_DIRECTORY: File = null + + /** + * Represents a Maven Coordinate + * @param groupId the groupId of the coordinate + * @param artifactId the artifactId of the coordinate + * @param version the version of the coordinate + */ + private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** * Resolves any dependencies that were supplied through maven coordinates * @param coordinates Comma-delimited string of maven coordinates * @param remoteRepos Comma-delimited string of remote repositories other than maven central + * @param ivyPath The path to the local ivy repository * @return The comma-delimited path to the jars of the given maven artifacts including their * transitive dependencies */ - private def resolveMavenCoordinates( + private[spark] def resolveMavenCoordinates( coordinates: String, remoteRepos: String, - ivyPath: String): String = { + ivyPath: String, + isTest: Boolean = false): String = { if (coordinates == null || coordinates.trim.isEmpty) { "" } else { @@ -486,16 +503,26 @@ object SparkSubmit { val splits = p.split(":") require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + s"'groupId:artifactId:version'. The coordinate provided is: $p") + require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + + s"be whitespace. The groupId provided is: ${splits(0)}") + require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + + s"be whitespace. The artifactId provided is: ${splits(1)}") + require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + + s"be whitespace. The version provided is: ${splits(2)}") new MavenCoordinate(splits(0), splits(1), splits(2)) } + // Default configuration name for ivy + val conf = "default" // set ivy settings for location of cache val ivySettings: IvySettings = new IvySettings - if (ivyPath == null || ivyPath.trim.length == 0) { - MAVEN_JARS = new File(ivySettings.getDefaultIvyUserDir, "jars") + if (ivyPath == null || ivyPath.trim.isEmpty) { + PACKAGES_DIRECTORY = new File(ivySettings.getDefaultIvyUserDir, "jars") } else { ivySettings.setDefaultCache(new File(ivyPath, "cache")) - MAVEN_JARS = new File(ivyPath, "jars") + PACKAGES_DIRECTORY = new File(ivyPath, "jars") } + logInfo(s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") + logInfo(s"The jars for the packages stored in: $PACKAGES_DIRECTORY") // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) @@ -515,10 +542,10 @@ object SparkSubmit { val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") val sparkDependencyExcludeRule = new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) - sparkDependencyExcludeRule.addConfiguration("default") + sparkDependencyExcludeRule.addConfiguration(conf) // add any other remote repositories other than maven central - if (remoteRepos != null && !remoteRepos.trim.isEmpty) { + if (remoteRepos != null && remoteRepos.trim.nonEmpty) { var i = 1 remoteRepos.split(",").foreach { repo => val brr: IBiblioResolver = new IBiblioResolver @@ -527,47 +554,63 @@ object SparkSubmit { brr.setRoot(repo) brr.setName(s"repo-$i") cr.add(brr) + logInfo(s"$repo added as a remote repository with the name: ${brr.getName}") i += 1 } } ivySettings.addResolver(cr) ivySettings.setDefaultResolver(cr.getName) - val ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well - val ro = new ResolveOptions - ro.setTransitive(true) - ro.setDownload(true) + val resolveOptions = new ResolveOptions + resolveOptions.setTransitive(true) + val retrieveOptions = new RetrieveOptions + // Turn downloading and logging off for testing + if (isTest) { + resolveOptions.setDownload(false) + resolveOptions.setLog(LogOptions.LOG_QUIET) + retrieveOptions.setLog(LogOptions.LOG_QUIET) + } else { + resolveOptions.setDownload(true) + } + // A Module descriptor must be specified. Entries are dummy strings val md = DefaultModuleDescriptor.newDefaultInstance( ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) + md.setDefaultConf(conf) md.addExcludeRule(sparkDependencyExcludeRule) artifacts.foreach { mvn => val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) val dd = new DefaultDependencyDescriptor(ri, false, false) - dd.addDependencyConfiguration("default", "default") + dd.addDependencyConfiguration(conf, conf) + logInfo(s"${dd.getDependencyId} added as a dependency") md.addDependency(dd) } // resolve dependencies - val rr: ResolveReport = ivy.resolve(md, ro) + val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { throw new RuntimeException(rr.getAllProblemMessages.toString) } + // Log the callers for each dependency + rr.getDependencies.toArray.foreach { case dependency: IvyNode => + logInfo(s"$dependency will be retrieved as a dependency for:") + dependency.getAllCallers.foreach (caller => logInfo(s"\t$caller")) + } // retrieve all resolved dependencies val m = rr.getModuleDescriptor ivy.retrieve(m.getModuleRevisionId, - MAVEN_JARS.getAbsolutePath + "/[artifact](-[classifier]).[ext]", - new RetrieveOptions().setConfs(Array("default"))) + PACKAGES_DIRECTORY.getAbsolutePath + "/[artifact](-[classifier]).[ext]", + retrieveOptions.setConfs(Array(conf))) // output downloaded jars to classpath (will append to jars). The name of the jar is given - // after a `!` by Ivy. It also sometimes contains (bundle) after .jar. Remove that as well. - rr.getArtifacts.toArray.map { case artifactInfo => + // after a '!' by Ivy. It also sometimes contains (bundle) after '.jar'. Remove that as well. + rr.getArtifacts.toArray.map { case artifactInfo: MDArtifact => val artifactString = artifactInfo.toString val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) - MAVEN_JARS.getAbsolutePath + "/" + + PACKAGES_DIRECTORY.getAbsolutePath + "/" + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) }.mkString(",") } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index cdd80bb2a958..08098bfc032d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -50,8 +50,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var name: String = null var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null - var maven: String = null - var mavenRepos: String = null + var packages: String = null + var repositories: String = null var ivyRepoPath: String = null var verbose: Boolean = false var isPython: Boolean = false @@ -228,8 +228,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | name $name | childArgs [${childArgs.mkString(" ")}] | jars $jars - | maven $maven - | maven-repos $mavenRepos + | packages $packages + | repositories $repositories | verbose $verbose | |Spark properties used, including those specified through @@ -336,12 +336,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St jars = Utils.resolveURIs(value) parse(tail) - case ("--maven") :: value :: tail => - maven = value + case ("--packages") :: value :: tail => + packages = value parse(tail) - case ("--maven-repos") :: value :: tail => - mavenRepos = value + case ("--repositories") :: value :: tail => + repositories = value parse(tail) case ("--conf" | "-c") :: value :: tail => @@ -394,13 +394,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --name NAME A name of your application. | --jars JARS Comma-separated list of local jars to include on the driver | and executor classpaths. - | --maven Comma-separated list of maven coordinates of jars to include + | --packages Comma-separated list of maven coordinates of jars to include | on the driver and executor classpaths. Will search the local | maven repo, then maven central and any additional remote - | repositories given by --maven-repos. The format for the + | repositories given by --repositories. The format for the | coordinates should be groupId:artifactId:version. - | --maven-repos Supply additional remote repositories as a comma-delimited - | list to search for the maven coordinates given with --maven. + | --repositories Comma-separated list of additional remote repositories to + | search for the maven coordinates given with --packages. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 29b425e97783..2c04e4ddfbcb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1873,14 +1873,6 @@ private[spark] object Utils extends Logging { } } -/** - * Represents a Maven Coordinate - * @param groupId the groupId of the coordinate - * @param artifactId the artifactId of the coordinate - * @param version the version of the coordinate - */ -private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) - /** * A utility class to redirect the child process's stdout or stderr. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 065b7534cece..c5bef1047fff 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -310,6 +310,19 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties runSparkSubmit(args) } + test("includes jars passed in through --packages") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val packagesString = "com.databricks:spark-csv_2.10:0.1,com.databricks:spark-avro_2.10:0.1" + val args = Seq( + "--class", MavenArtifactDownloadTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[2,1,512]", + "--packages", packagesString, + "--conf", "spark.ui.enabled=false", + unusedJar.toString) + runSparkSubmit(args) + } + test("resolves command line argument paths correctly") { val jars = "/jar1,/jar2" // --jars val files = "hdfs:/file1,file2" // --files @@ -464,7 +477,32 @@ object JarCreationTest extends Logging { var exception: String = null try { Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) - Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) + Class.forName("SparkSubmitClassB", true, Thread.currentThread().getContextClassLoader) + } catch { + case t: Throwable => + exception = t + "\n" + t.getStackTraceString + exception = exception.replaceAll("\n", "\n\t") + } + Option(exception).toSeq.iterator + }.collect() + if (result.nonEmpty) { + throw new Exception("Could not load user class from jar:\n" + result(0)) + } + } +} + +object MavenArtifactDownloadTest extends Logging { + def main(args: Array[String]) { + Utils.configTestLog4j("INFO") + val conf = new SparkConf() + val sc = new SparkContext(conf) + val result = sc.makeRDD(1 to 100, 10).mapPartitions { x => + var exception: String = null + try { + Class.forName("com.databricks.spark.csv.DefaultSource", + true, Thread.currentThread().getContextClassLoader) + Class.forName("com.databricks.spark.avro.DefaultSource", + true, Thread.currentThread().getContextClassLoader) } catch { case t: Throwable => exception = t + "\n" + t.getStackTraceString diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala new file mode 100644 index 000000000000..e0af3372ac07 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.spark.util.ResetSystemProperties +import org.scalatest.{Matchers, FunSuite} + +class SparkSubmitUtilsSuite extends FunSuite with Matchers with ResetSystemProperties { + + def beforeAll() { + System.setProperty("spark.testing", "true") + } + + test("incorrect maven coordinate throws error") { + val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") + for (coordinate <- coordinates) { + intercept[IllegalArgumentException] { + SparkSubmitUtils.resolveMavenCoordinates(coordinate, null, null, true) + } + } + } + + test("dependency not found throws RuntimeException") { + intercept[RuntimeException] { + SparkSubmitUtils.resolveMavenCoordinates("a:b:c", null, null, true) + } + } + + test("neglects Spark and Spark's dependencies") { + val path = SparkSubmitUtils.resolveMavenCoordinates( + "org.apache.spark:spark-core_2.10:1.2.0", null, null, true) + assert(path == "", "should return empty path") + } + + test("search for artifact at other repositories") { + val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3", + "https://oss.sonatype.org/content/repositories/agimatec/", null, true) + assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" + + "if package still exists. If it has been removed, replace the example in this test.") + } + + test("ivy path works correctly") { + val ivyPath = "dummy/ivy" + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + "com.databricks:spark-csv_2.10:0.1", null, ivyPath, true) + assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") + } +} From 3a23f2198302df9fc1f7f5494a20e5c65acd8679 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 14:07:17 -0800 Subject: [PATCH 08/20] excluded ivy dependencies --- core/pom.xml | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index 7bcade380f17..594ae7fe9c8c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -228,6 +228,84 @@ org.apache.ivy ivy 2.4.0 + + + org.apache.ant + ant + + + org.apache.ant + ant-nodeps + + + org.apache.ant + ant-trax + + + commons-httpclient + commons-httpclient + + + oro + oro + + + commons-vfs + commons-vfs + + + com.jcraft + jsch + + + com.jcraft + jsch.agentproxy + + + com.jcraft + jsch.agentproxy.connector-factory + + + com.jcraft + jsch.agentproxy.jsch + + + org.bouncycastle + bcpg-jdk14 + + + org.bouncycastle + bcprov-jdk14 + + + junit + junit + + + commons-lang + commons-lang + + + org.apache.ant + ant-testutil + + + ant + ant-launcher + + + ant-contrib + ant-contrib + + + xerces + xercesImpl + + + xerces + xmlParserAPIs + + org.tachyonproject From dcf5e1358366cb00e4c1998e2c830cce65bc7226 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 14:09:09 -0800 Subject: [PATCH 09/20] fix windows command line flags --- bin/windows-utils.cmd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 2ccfbbfd5f84..9e9b70d7fd31 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>" SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" -SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--maven\> \<--maven-repos\> \<--ivy-repo\>" +SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\> echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( From 9cf077d060bb646749177d31da3ca6ae40ee3b31 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 14:13:37 -0800 Subject: [PATCH 10/20] fix weird IntelliJ formatting --- .../org/apache/spark/deploy/SparkSubmit.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 11f44e90d6f1..d5288fb0e077 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -73,9 +73,7 @@ object SparkSubmit { // Exposed for testing private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] var printStream: PrintStream = System.err - private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) - private[spark] def printErrorAndExit(str: String) = { printStream.println("Error: " + str) printStream.println("Run with --help for usage help or --verbose for debug output") @@ -93,13 +91,13 @@ object SparkSubmit { /** * @return a tuple containing - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a list of system properties and env vars, and - * (4) the main class for the child + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { // Values to return val childArgs = new ArrayBuffer[String]() @@ -146,7 +144,7 @@ object SparkSubmit { if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + - "This copy of Spark may not have been compiled with YARN support.") + "This copy of Spark may not have been compiled with YARN support.") } } @@ -296,7 +294,7 @@ object SparkSubmit { childArgs += "--supervise" } childArgs += "launch" - childArgs +=(args.master, args.primaryResource, args.mainClass) + childArgs += (args.master, args.primaryResource, args.mainClass) if (args.childArgs != null) { childArgs ++= args.childArgs } @@ -306,11 +304,11 @@ object SparkSubmit { if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.primaryResource != SPARK_INTERNAL) { - childArgs +=("--jar", args.primaryResource) + childArgs += ("--jar", args.primaryResource) } - childArgs +=("--class", args.mainClass) + childArgs += ("--class", args.mainClass) if (args.childArgs != null) { - args.childArgs.foreach { arg => childArgs +=("--arg", arg)} + args.childArgs.foreach { arg => childArgs += ("--arg", arg)} } } @@ -462,8 +460,8 @@ object SparkSubmit { */ private[spark] def mergeFileLists(lists: String*): String = { val merged = lists.filter(_ != null) - .flatMap(_.split(",")) - .mkString(",") + .flatMap(_.split(",")) + .mkString(",") if (merged == "") null else merged } } From 97c4a920d66de65a57f7197ba80ca396e565be89 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 14:16:59 -0800 Subject: [PATCH 11/20] fix more weird IntelliJ formatting --- bin/utils.sh | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 30 +++++++------------ 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/bin/utils.sh b/bin/utils.sh index 30534949fa96..224120008201 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,7 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \ --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives ) + --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d5288fb0e077..b7ab4cbdcafc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -91,10 +91,10 @@ object SparkSubmit { /** * @return a tuple containing - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a list of system properties and env vars, and - * (4) the main class for the child + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { @@ -252,26 +252,18 @@ object SparkSubmit { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } - if (args.jars != null) { - childClasspath ++= args.jars.split(",") - } - if (args.childArgs != null) { - childArgs ++= args.childArgs - } + if (args.jars != null) { childClasspath ++= args.jars.split(",") } + if (args.childArgs != null) { childArgs ++= args.childArgs } } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && - (deployMode & opt.deployMode) != 0 && - (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { - childArgs +=(opt.clOption, opt.value) - } - if (opt.sysProp != null) { - sysProps.put(opt.sysProp, opt.value) - } + (deployMode & opt.deployMode) != 0 && + (clusterManager & opt.clusterManager) != 0) { + if (opt.clOption != null) { childArgs +=(opt.clOption, opt.value) } + if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } } @@ -308,7 +300,7 @@ object SparkSubmit { } childArgs += ("--class", args.mainClass) if (args.childArgs != null) { - args.childArgs.foreach { arg => childArgs += ("--arg", arg)} + args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } } From cef0e24fadb19f8f00bc2f5ad3d3d8cd7ff706d0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 14:17:46 -0800 Subject: [PATCH 12/20] IntelliJ is just messing things up --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b7ab4cbdcafc..1c3eeb6810cc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -262,7 +262,7 @@ object SparkSubmit { if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { childArgs +=(opt.clOption, opt.value) } + if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } } From ea44ca46af62698129836f9dcd9716d5713e7bf1 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 15:23:26 -0800 Subject: [PATCH 13/20] add oro back to dependencies --- core/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 594ae7fe9c8c..5313a5c433f3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -245,10 +245,6 @@ commons-httpclient commons-httpclient - - oro - oro - commons-vfs commons-vfs From 85ec5a30f3e22d90bca09b1d1f721eeb4072abc9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 28 Jan 2015 17:54:19 -0800 Subject: [PATCH 14/20] added oro as a dependency explicitly --- core/pom.xml | 81 ++++------------------------------------------------ pom.xml | 2 ++ 2 files changed, 8 insertions(+), 75 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 5313a5c433f3..04ca2bf8b6a7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -227,81 +227,12 @@ org.apache.ivy ivy - 2.4.0 - - - org.apache.ant - ant - - - org.apache.ant - ant-nodeps - - - org.apache.ant - ant-trax - - - commons-httpclient - commons-httpclient - - - commons-vfs - commons-vfs - - - com.jcraft - jsch - - - com.jcraft - jsch.agentproxy - - - com.jcraft - jsch.agentproxy.connector-factory - - - com.jcraft - jsch.agentproxy.jsch - - - org.bouncycastle - bcpg-jdk14 - - - org.bouncycastle - bcprov-jdk14 - - - junit - junit - - - commons-lang - commons-lang - - - org.apache.ant - ant-testutil - - - ant - ant-launcher - - - ant-contrib - ant-contrib - - - xerces - xercesImpl - - - xerces - xmlParserAPIs - - + ${ivy.version} + + + oro + oro + ${oro.version} org.tachyonproject diff --git a/pom.xml b/pom.xml index b993391b1504..b91d017d3d20 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,8 @@ 1.2.3 8.1.14.v20131031 0.5.0 + 2.4.0 + 2.0.8 3.0.0 1.7.6 From 231f72fa048033259eb428cb0f29e09c5905cee9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 20:12:05 -0800 Subject: [PATCH 15/20] addressed code review --- bin/windows-utils.cmd | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 215 +++++++++++------- .../spark/deploy/SparkSubmitSuite.scala | 36 +-- .../spark/deploy/SparkSubmitUtilsSuite.scala | 77 +++++-- 4 files changed, 192 insertions(+), 138 deletions(-) diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 9e9b70d7fd31..567b8733f7f7 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>" SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" -SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\> +SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>" echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1c3eeb6810cc..dffc7714a257 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -184,7 +184,8 @@ object SparkSubmit { // Resolve maven dependencies if there are any and add classpath to jars val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates(args.packages, args.repositories, args.ivyRepoPath) + SparkSubmitUtils.resolveMavenCoordinates( + args.packages, Option(args.repositories), Option(args.ivyRepoPath)) if (!resolvedMavenCoordinates.trim.isEmpty) { if (args.jars == null || args.jars.trim.isEmpty) { args.jars = resolvedMavenCoordinates @@ -461,10 +462,6 @@ object SparkSubmit { /** Provides utility functions to be used inside SparkSubmit. */ private[spark] object SparkSubmitUtils extends Logging { - // Directories for caching downloads through ivy and storing the jars when maven coordinates are - // supplied to spark-submit - private var PACKAGES_DIRECTORY: File = null - /** * Represents a Maven Coordinate * @param groupId the groupId of the coordinate @@ -473,6 +470,95 @@ private[spark] object SparkSubmitUtils extends Logging { */ private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) +/** + * Extracts maven coordinates from a comma-delimited string + * @param coordinates Comma-delimited string of maven coordinates + * @return Sequence of Maven coordinates + */ + private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { + coordinates.split(",").map { p => + val splits = p.split(":") + require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + + s"'groupId:artifactId:version'. The coordinate provided is: $p") + require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + + s"be whitespace. The groupId provided is: ${splits(0)}") + require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + + s"be whitespace. The artifactId provided is: ${splits(1)}") + require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + + s"be whitespace. The version provided is: ${splits(2)}") + new MavenCoordinate(splits(0), splits(1), splits(2)) + } + } + + /** + * Extracts maven coordinates from a comma-delimited string + * @param remoteRepos Comma-delimited string of remote repositories + * @return A ChainResolver used by Ivy to search for and resolve dependencies. + */ + private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = { + // We need a chain resolver if we want to check multiple repositories + val cr = new ChainResolver + cr.setName("list") + + // the biblio resolver resolves POM declared dependencies + val br: IBiblioResolver = new IBiblioResolver + br.setM2compatible(true) + br.setUsepoms(true) + br.setName("central") + cr.add(br) + + val repositoryList = remoteRepos.getOrElse("") + // add any other remote repositories other than maven central + if (repositoryList.trim.nonEmpty) { + repositoryList.split(",").zipWithIndex.foreach { case (repo, i) => + val brr: IBiblioResolver = new IBiblioResolver + brr.setM2compatible(true) + brr.setUsepoms(true) + brr.setRoot(repo) + brr.setName(s"repo-${i + 1}") + cr.add(brr) + logInfo(s"$repo added as a remote repository with the name: ${brr.getName}") + } + } + cr + } + + /** + * Output a comma-delimited list of paths for the downloaded jars to be added to the classpath + * (will append to jars in SparkSubmit). The name of the jar is given + * after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well. + * @param artifacts Sequence of dependencies that were resolved and retrieved + * @param cacheDirectory directory where jars are cached + * @return a comma-delimited list of paths for the dependencies + */ + private[spark] def resolveDependencyPaths( + artifacts: Array[AnyRef], + cacheDirectory: File): String = { + artifacts.map { case artifactInfo: MDArtifact => + val artifactString = artifactInfo.toString + val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) + cacheDirectory.getAbsolutePath + "/" + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) + }.mkString(",") + } + + /** Adds the given maven coordinates to Ivy's module descriptor. */ + private[spark] def addDependenciesToIvy( + md: DefaultModuleDescriptor, + artifacts: Seq[MavenCoordinate], + ivyConfName: String): Unit = { + artifacts.foreach { mvn => + val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) + val dd = new DefaultDependencyDescriptor(ri, false, false) + dd.addDependencyConfiguration(ivyConfName, ivyConfName) + logInfo(s"${dd.getDependencyId} added as a dependency") + md.addDependency(dd) + } + } + + /** A nice function to use in tests as well. Values are dummy strings. */ + private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( + ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) + /** * Resolves any dependencies that were supplied through maven coordinates * @param coordinates Comma-delimited string of maven coordinates @@ -483,73 +569,36 @@ private[spark] object SparkSubmitUtils extends Logging { */ private[spark] def resolveMavenCoordinates( coordinates: String, - remoteRepos: String, - ivyPath: String, + remoteRepos: Option[String], + ivyPath: Option[String], isTest: Boolean = false): String = { if (coordinates == null || coordinates.trim.isEmpty) { "" } else { - val artifacts = coordinates.split(",").map { p => - val splits = p.split(":") - require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + - s"'groupId:artifactId:version'. The coordinate provided is: $p") - require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + - s"be whitespace. The groupId provided is: ${splits(0)}") - require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + - s"be whitespace. The artifactId provided is: ${splits(1)}") - require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + - s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) - } + val artifacts = extractMavenCoordinates(coordinates) // Default configuration name for ivy - val conf = "default" + val ivyConfName = "default" // set ivy settings for location of cache val ivySettings: IvySettings = new IvySettings - if (ivyPath == null || ivyPath.trim.isEmpty) { - PACKAGES_DIRECTORY = new File(ivySettings.getDefaultIvyUserDir, "jars") - } else { - ivySettings.setDefaultCache(new File(ivyPath, "cache")) - PACKAGES_DIRECTORY = new File(ivyPath, "jars") - } + // Directories for caching downloads through ivy and storing the jars when maven coordinates are + // supplied to spark-submit + val alternateIvyCache = ivyPath.getOrElse("") + val packagesDirectory: File = + if (alternateIvyCache.trim.isEmpty) { + new File(ivySettings.getDefaultIvyUserDir, "jars") + } else { + ivySettings.setDefaultCache(new File(alternateIvyCache, "cache")) + new File(alternateIvyCache, "jars") + } logInfo(s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") - logInfo(s"The jars for the packages stored in: $PACKAGES_DIRECTORY") - + logInfo(s"The jars for the packages stored in: $packagesDirectory") // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) + // create the dependency resolvers + val repoResolver = createRepoResolvers(remoteRepos) + ivySettings.addResolver(repoResolver) + ivySettings.setDefaultResolver(repoResolver.getName) - // the biblio resolver resolves POM declared dependencies - val br: IBiblioResolver = new IBiblioResolver - br.setM2compatible(true) - br.setUsepoms(true) - br.setName("central") - - // We need a chain resolver if we want to check multiple repositories - val cr = new ChainResolver - cr.setName("list") - cr.add(br) - - // Add an exclusion rule for Spark - val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") - val sparkDependencyExcludeRule = - new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) - sparkDependencyExcludeRule.addConfiguration(conf) - - // add any other remote repositories other than maven central - if (remoteRepos != null && remoteRepos.trim.nonEmpty) { - var i = 1 - remoteRepos.split(",").foreach { repo => - val brr: IBiblioResolver = new IBiblioResolver - brr.setM2compatible(true) - brr.setUsepoms(true) - brr.setRoot(repo) - brr.setName(s"repo-$i") - cr.add(brr) - logInfo(s"$repo added as a remote repository with the name: ${brr.getName}") - i += 1 - } - } - ivySettings.addResolver(cr) - ivySettings.setDefaultResolver(cr.getName) val ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions @@ -565,19 +614,18 @@ private[spark] object SparkSubmitUtils extends Logging { } // A Module descriptor must be specified. Entries are dummy strings - val md = DefaultModuleDescriptor.newDefaultInstance( - ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) - md.setDefaultConf(conf) + val md = getModuleDescriptor + md.setDefaultConf(ivyConfName) - md.addExcludeRule(sparkDependencyExcludeRule) + // Add an exclusion rule for Spark + val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") + val sparkDependencyExcludeRule = + new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) + sparkDependencyExcludeRule.addConfiguration(ivyConfName) - artifacts.foreach { mvn => - val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) - val dd = new DefaultDependencyDescriptor(ri, false, false) - dd.addDependencyConfiguration(conf, conf) - logInfo(s"${dd.getDependencyId} added as a dependency") - md.addDependency(dd) - } + // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies + md.addExcludeRule(sparkDependencyExcludeRule) + addDependenciesToIvy(md, artifacts, ivyConfName) // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) @@ -586,23 +634,16 @@ private[spark] object SparkSubmitUtils extends Logging { } // Log the callers for each dependency rr.getDependencies.toArray.foreach { case dependency: IvyNode => - logInfo(s"$dependency will be retrieved as a dependency for:") - dependency.getAllCallers.foreach (caller => logInfo(s"\t$caller")) + var logMsg = s"$dependency will be retrieved as a dependency for:" + dependency.getAllCallers.foreach (caller => logMsg += s"\n\t$caller") + logInfo(logMsg) } // retrieve all resolved dependencies - val m = rr.getModuleDescriptor - ivy.retrieve(m.getModuleRevisionId, - PACKAGES_DIRECTORY.getAbsolutePath + "/[artifact](-[classifier]).[ext]", - retrieveOptions.setConfs(Array(conf))) - - // output downloaded jars to classpath (will append to jars). The name of the jar is given - // after a '!' by Ivy. It also sometimes contains (bundle) after '.jar'. Remove that as well. - rr.getArtifacts.toArray.map { case artifactInfo: MDArtifact => - val artifactString = artifactInfo.toString - val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) - PACKAGES_DIRECTORY.getAbsolutePath + "/" + - jarName.substring(0, jarName.lastIndexOf(".jar") + 4) - }.mkString(",") + ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, + packagesDirectory.getAbsolutePath + "/[artifact](-[classifier]).[ext]", + retrieveOptions.setConfs(Array(ivyConfName))) + + resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 5eadc7864ec9..3f1355f82893 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -307,7 +307,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "--name", "testApp", "--master", "local-cluster[2,1,512]", "--jars", jarsString, - unusedJar.toString) + unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB") runSparkSubmit(args) } @@ -315,12 +315,13 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val packagesString = "com.databricks:spark-csv_2.10:0.1,com.databricks:spark-avro_2.10:0.1" val args = Seq( - "--class", MavenArtifactDownloadTest.getClass.getName.stripSuffix("$"), + "--class", JarCreationTest.getClass.getName.stripSuffix("$"), "--name", "testApp", "--master", "local-cluster[2,1,512]", "--packages", packagesString, "--conf", "spark.ui.enabled=false", - unusedJar.toString) + unusedJar.toString, + "com.databricks.spark.csv.DefaultSource", "com.databricks.spark.avro.DefaultSource") runSparkSubmit(args) } @@ -480,33 +481,8 @@ object JarCreationTest extends Logging { val result = sc.makeRDD(1 to 100, 10).mapPartitions { x => var exception: String = null try { - Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) - Class.forName("SparkSubmitClassB", true, Thread.currentThread().getContextClassLoader) - } catch { - case t: Throwable => - exception = t + "\n" + t.getStackTraceString - exception = exception.replaceAll("\n", "\n\t") - } - Option(exception).toSeq.iterator - }.collect() - if (result.nonEmpty) { - throw new Exception("Could not load user class from jar:\n" + result(0)) - } - } -} - -object MavenArtifactDownloadTest extends Logging { - def main(args: Array[String]) { - Utils.configTestLog4j("INFO") - val conf = new SparkConf() - val sc = new SparkContext(conf) - val result = sc.makeRDD(1 to 100, 10).mapPartitions { x => - var exception: String = null - try { - Class.forName("com.databricks.spark.csv.DefaultSource", - true, Thread.currentThread().getContextClassLoader) - Class.forName("com.databricks.spark.avro.DefaultSource", - true, Thread.currentThread().getContextClassLoader) + Class.forName(args(0), true, Thread.currentThread().getContextClassLoader) + Class.forName(args(1), true, Thread.currentThread().getContextClassLoader) } catch { case t: Throwable => exception = t + "\n" + t.getStackTraceString diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index e0af3372ac07..6d8eaa422e82 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -17,47 +17,84 @@ package org.apache.spark.deploy -import org.apache.spark.util.ResetSystemProperties -import org.scalatest.{Matchers, FunSuite} +import java.io.File -class SparkSubmitUtilsSuite extends FunSuite with Matchers with ResetSystemProperties { +import org.apache.ivy.core.module.descriptor.MDArtifact +import org.apache.ivy.plugins.resolver.IBiblioResolver +import org.scalatest.FunSuite - def beforeAll() { - System.setProperty("spark.testing", "true") - } +class SparkSubmitUtilsSuite extends FunSuite { test("incorrect maven coordinate throws error") { val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") for (coordinate <- coordinates) { intercept[IllegalArgumentException] { - SparkSubmitUtils.resolveMavenCoordinates(coordinate, null, null, true) + SparkSubmitUtils.extractMavenCoordinates(coordinate) } } } - test("dependency not found throws RuntimeException") { - intercept[RuntimeException] { - SparkSubmitUtils.resolveMavenCoordinates("a:b:c", null, null, true) + test("create repo resolvers") { + val resolver1 = SparkSubmitUtils.createRepoResolvers(None) + // should have central by default + assert(resolver1.getResolvers.size() === 1) + assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central") + + val repos = "a/1,b/2,c/3" + val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos)) + assert(resolver2.getResolvers.size() === 4) + val expected = repos.split(",").map(r => s"$r/") + resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) => + if (i == 0) { + assert(resolver.getName === "central") + } else { + assert(resolver.getName === s"repo-$i") + assert(resolver.getRoot === expected(i - 1)) + } } } - test("neglects Spark and Spark's dependencies") { - val path = SparkSubmitUtils.resolveMavenCoordinates( - "org.apache.spark:spark-core_2.10:1.2.0", null, null, true) - assert(path == "", "should return empty path") + test("add dependencies works correctly") { + val md = SparkSubmitUtils.getModuleDescriptor + val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," + + "com.databricks:spark-avro_2.10:0.1") + + SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default") + assert(md.getDependencies.length === 2) + } + + test("ivy path works correctly") { + val ivyPath = "dummy/ivy" + val md = SparkSubmitUtils.getModuleDescriptor + val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") + var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)) + for (i <- 0 until 3) { + val index = jPaths.indexOf(ivyPath) + assert(index >= 0) + jPaths = jPaths.substring(index + ivyPath.length) + } + // end to end + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + "com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true) + assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") } test("search for artifact at other repositories") { val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3", - "https://oss.sonatype.org/content/repositories/agimatec/", null, true) + Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true) assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" + "if package still exists. If it has been removed, replace the example in this test.") } - test("ivy path works correctly") { - val ivyPath = "dummy/ivy" - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( - "com.databricks:spark-csv_2.10:0.1", null, ivyPath, true) - assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") + test("dependency not found throws RuntimeException") { + intercept[RuntimeException] { + SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, true) + } + } + + test("neglects Spark and Spark's dependencies") { + val path = SparkSubmitUtils.resolveMavenCoordinates( + "org.apache.spark:spark-core_2.10:1.2.0", None, None, true) + assert(path === "", "should return empty path") } } From 43c229008bb8da199c74b42dc1cda78333b9a28e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 20:25:42 -0800 Subject: [PATCH 16/20] fixed that ONE line --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index dffc7714a257..2cdfdfd7bb9f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -580,8 +580,8 @@ private[spark] object SparkSubmitUtils extends Logging { val ivyConfName = "default" // set ivy settings for location of cache val ivySettings: IvySettings = new IvySettings - // Directories for caching downloads through ivy and storing the jars when maven coordinates are - // supplied to spark-submit + // Directories for caching downloads through ivy and storing the jars when maven coordinates + // are supplied to spark-submit val alternateIvyCache = ivyPath.getOrElse("") val packagesDirectory: File = if (alternateIvyCache.trim.isEmpty) { From 3ada19ac3d569e4d5af35c309436be36ba211f94 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sun, 1 Feb 2015 22:57:42 -0800 Subject: [PATCH 17/20] fixed Jenkins error (hopefully) and added comment on oro --- core/pom.xml | 1 + core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 8b76a7898a7b..b6f894fd4c99 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -225,6 +225,7 @@ oro + oro ${oro.version} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2cdfdfd7bb9f..6ee21d21591c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -534,7 +534,7 @@ private[spark] object SparkSubmitUtils extends Logging { private[spark] def resolveDependencyPaths( artifacts: Array[AnyRef], cacheDirectory: File): String = { - artifacts.map { case artifactInfo: MDArtifact => + artifacts.map { artifactInfo => val artifactString = artifactInfo.toString val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) cacheDirectory.getAbsolutePath + "/" + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) From 9dae87fc8d12d4fb7a2deea9fe1590825893265b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 2 Feb 2015 22:19:44 -0800 Subject: [PATCH 18/20] file separators changed --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7bc10b12982f..c3b017d109bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -579,7 +579,8 @@ private[spark] object SparkSubmitUtils extends Logging { artifacts.map { artifactInfo => val artifactString = artifactInfo.toString val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) - cacheDirectory.getAbsolutePath + "/" + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) + cacheDirectory.getAbsolutePath + File.separator + + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) }.mkString(",") } @@ -682,7 +683,7 @@ private[spark] object SparkSubmitUtils extends Logging { } // retrieve all resolved dependencies ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, - packagesDirectory.getAbsolutePath + "/[artifact](-[classifier]).[ext]", + packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]", retrieveOptions.setConfs(Array(ivyConfName))) resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) From db2a5cc87fda3a54a6b85f52ebaa5773ab70118f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 3 Feb 2015 14:56:12 -0800 Subject: [PATCH 19/20] changed logging to printStream --- .../org/apache/spark/deploy/SparkSubmit.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c3b017d109bd..90fecde1dfa4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -36,7 +36,6 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} -import org.apache.spark.Logging import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.util.Utils @@ -410,8 +409,8 @@ object SparkSubmit { case e: ClassNotFoundException => e.printStackTrace(printStream) if (childMainClass.contains("thriftserver")) { - println(s"Failed to load main class $childMainClass.") - println("You need to build Spark with -Phive and -Phive-thriftserver.") + printStream.println(s"Failed to load main class $childMainClass.") + printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) } @@ -502,7 +501,9 @@ object SparkSubmit { } /** Provides utility functions to be used inside SparkSubmit. */ -private[spark] object SparkSubmitUtils extends Logging { +private[spark] object SparkSubmitUtils { + + private val printStream = SparkSubmit.printStream /** * Represents a Maven Coordinate @@ -559,7 +560,7 @@ private[spark] object SparkSubmitUtils extends Logging { brr.setRoot(repo) brr.setName(s"repo-${i + 1}") cr.add(brr) - logInfo(s"$repo added as a remote repository with the name: ${brr.getName}") + printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") } } cr @@ -593,7 +594,7 @@ private[spark] object SparkSubmitUtils extends Logging { val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) val dd = new DefaultDependencyDescriptor(ri, false, false) dd.addDependencyConfiguration(ivyConfName, ivyConfName) - logInfo(s"${dd.getDependencyId} added as a dependency") + printStream.println(s"${dd.getDependencyId} added as a dependency") md.addDependency(dd) } } @@ -633,8 +634,9 @@ private[spark] object SparkSubmitUtils extends Logging { ivySettings.setDefaultCache(new File(alternateIvyCache, "cache")) new File(alternateIvyCache, "jars") } - logInfo(s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") - logInfo(s"The jars for the packages stored in: $packagesDirectory") + printStream.println( + s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") + printStream.println(s"The jars for the packages stored in: $packagesDirectory") // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) // create the dependency resolvers @@ -679,7 +681,7 @@ private[spark] object SparkSubmitUtils extends Logging { rr.getDependencies.toArray.foreach { case dependency: IvyNode => var logMsg = s"$dependency will be retrieved as a dependency for:" dependency.getAllCallers.foreach (caller => logMsg += s"\n\t$caller") - logInfo(logMsg) + printStream.println(logMsg) } // retrieve all resolved dependencies ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, From 921585157be8e1eec9419715a5a0aa5614e6e16b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 3 Feb 2015 15:58:30 -0800 Subject: [PATCH 20/20] ready to merge --- .../org/apache/spark/deploy/SparkSubmit.scala | 11 +++----- .../spark/deploy/SparkSubmitUtilsSuite.scala | 27 ++++++++++++++++--- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 90fecde1dfa4..8bbfcd2997dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -503,7 +503,8 @@ object SparkSubmit { /** Provides utility functions to be used inside SparkSubmit. */ private[spark] object SparkSubmitUtils { - private val printStream = SparkSubmit.printStream + // Exposed for testing + private[spark] var printStream = SparkSubmit.printStream /** * Represents a Maven Coordinate @@ -601,7 +602,7 @@ private[spark] object SparkSubmitUtils { /** A nice function to use in tests as well. Values are dummy strings. */ private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( - ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0")) + ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")) /** * Resolves any dependencies that were supplied through maven coordinates @@ -677,12 +678,6 @@ private[spark] object SparkSubmitUtils { if (rr.hasError) { throw new RuntimeException(rr.getAllProblemMessages.toString) } - // Log the callers for each dependency - rr.getDependencies.toArray.foreach { case dependency: IvyNode => - var logMsg = s"$dependency will be retrieved as a dependency for:" - dependency.getAllCallers.foreach (caller => logMsg += s"\n\t$caller") - printStream.println(logMsg) - } // retrieve all resolved dependencies ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]", diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 6d8eaa422e82..53665350013c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -17,13 +17,34 @@ package org.apache.spark.deploy -import java.io.File +import java.io.{PrintStream, OutputStream, File} + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.ivy.core.module.descriptor.MDArtifact import org.apache.ivy.plugins.resolver.IBiblioResolver -import org.scalatest.FunSuite -class SparkSubmitUtilsSuite extends FunSuite { +class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { + + private val noOpOutputStream = new OutputStream { + def write(b: Int) = {} + } + + /** Simple PrintStream that reads data into a buffer */ + private class BufferPrintStream extends PrintStream(noOpOutputStream) { + var lineBuffer = ArrayBuffer[String]() + override def println(line: String) { + lineBuffer += line + } + } + + override def beforeAll() { + super.beforeAll() + // We don't want to write logs during testing + SparkSubmitUtils.printStream = new BufferPrintStream + } test("incorrect maven coordinate throws error") { val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")