diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 77005aa9040b5..d6f6abf3424cf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -22,6 +22,7 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.URL
import java.security.PrivilegedExceptionAction
import java.text.ParseException
+import java.util.Collections
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -834,33 +835,56 @@ private[spark] object SparkSubmitUtils {
var printStream = SparkSubmit.printStream
/**
- * Represents a Maven Coordinate
+ * Represents a Maven Coordinate. Refer to https://maven.apache.org/pom.html#Maven_Coordinates
+ * for more information. Standard ordering for a full coordinate is
+ * `groupId:artifactId:packaging:classifier:version` although packaging and classifier
+ * are optional.
+ *
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
+ * @param packaging Maven packaging type (e.g. jar), if any
+ * @param classifier Maven classifier, if any
* @param version the version of the coordinate
*/
- private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
- override def toString: String = s"$groupId:$artifactId:$version"
+ private[deploy] case class MavenCoordinate(
+ groupId: String,
+ artifactId: String,
+ packaging: Option[String],
+ classifier: Option[String],
+ version: String) {
+
+ def this(groupId: String, artifactId: String, version: String) =
+ this(groupId, artifactId, None, None, version)
+
+ override def toString: String = {
+ (Seq(groupId, artifactId) ++ packaging ++ classifier ++ Seq(version)).mkString(":")
+ }
}
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
- * @param coordinates Comma-delimited string of maven coordinates
+ * in the format `groupId:artifactId:version`, `groupId:artifactId:packaging:version`, or
+ * `groupId:artifactId:packaging:classifier:version`. '/' can be used as a separator instead
+ * of ':'.
+ *
+ * @param coordinates Comma-delimited string of Maven coordinates
* @return Sequence of Maven coordinates
*/
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
- coordinates.split(",").map { p =>
- val splits = p.replace("/", ":").split(":")
- require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
- s"'groupId:artifactId:version'. The coordinate provided is: $p")
- require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
- s"be whitespace. The groupId provided is: ${splits(0)}")
- require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
- s"be whitespace. The artifactId provided is: ${splits(1)}")
- require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
- s"be whitespace. The version provided is: ${splits(2)}")
- new MavenCoordinate(splits(0), splits(1), splits(2))
+ coordinates.split(",").map { coordinate =>
+ val splits = coordinate.split("[:/]")
+ require(splits.forall(split => split != null && split.trim.nonEmpty),
+ s"All elements of coordinate must be non-null and not whitespace; got $coordinate")
+ splits match {
+ case Array(groupId, artifactId, version) =>
+ new MavenCoordinate(groupId, artifactId, version)
+ case Array(groupId, artifactId, packaging, version) =>
+ new MavenCoordinate(groupId, artifactId, Some(packaging), None, version)
+ case Array(groupId, artifactId, packaging, classifier, version) =>
+ new MavenCoordinate(groupId, artifactId, Some(packaging), Some(classifier), version)
+ case _ => throw new IllegalArgumentException("Coordinates must be of form " +
+ s"groupId:artifactId[:packaging[:classifier]]:version; got $coordinate")
+ }
}
}
@@ -928,12 +952,14 @@ private[spark] object SparkSubmitUtils {
* @return a comma-delimited list of paths for the dependencies
*/
def resolveDependencyPaths(
- artifacts: Array[AnyRef],
+ artifacts: Array[Artifact],
cacheDirectory: File): String = {
artifacts.map { artifactInfo =>
- val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
+ val artifact = artifactInfo.getModuleRevisionId
+ val classifier =
+ Option(artifactInfo.getExtraAttribute("classifier")).map("-" + _).getOrElse("")
cacheDirectory.getAbsolutePath + File.separator +
- s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
+ s"${artifact.getOrganisation}_${artifact.getName}${classifier}-${artifact.getRevision}.jar"
}.mkString(",")
}
@@ -950,6 +976,12 @@ private[spark] object SparkSubmitUtils {
printStream.println(s"${dd.getDependencyId} added as a dependency")
// scalastyle:on println
md.addDependency(dd)
+ if (mvn.classifier.isDefined) {
+ val typeExt = mvn.packaging.getOrElse("jar")
+ dd.addDependencyArtifact(ivyConfName, new DefaultDependencyArtifactDescriptor(
+ dd, mvn.artifactId, typeExt, typeExt, null,
+ Collections.singletonMap("classifier", mvn.classifier.get)))
+ }
}
}
@@ -1133,9 +1165,10 @@ private[spark] object SparkSubmitUtils {
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
- "[organization]_[artifact]-[revision].[ext]",
+ "[organization]_[artifact](-[classifier])-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
- resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+ resolveDependencyPaths(
+ rr.getArtifacts.toArray.map(_.asInstanceOf[Artifact]), packagesDirectory)
} finally {
System.setOut(sysOut)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index f50cb38311db2..a1d5c45d80539 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -34,13 +34,13 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
private[deploy] object IvyTestUtils {
/**
- * Create the path for the jar and pom from the maven coordinate. Extension should be `jar`
- * or `pom`.
+ * Create the path for the jar and pom from the maven coordinate. `extOverride` should
+ * be `jar` or `pom`.
*/
private[deploy] def pathFromCoordinate(
artifact: MavenCoordinate,
prefix: File,
- ext: String,
+ extOverride: Option[String],
useIvyLayout: Boolean): File = {
val groupDirs = artifact.groupId.replace(".", File.separator)
val artifactDirs = artifact.artifactId
@@ -48,6 +48,7 @@ private[deploy] object IvyTestUtils {
if (!useIvyLayout) {
Seq(groupDirs, artifactDirs, artifact.version).mkString(File.separator)
} else {
+ val ext = extOverride.getOrElse(artifact.packaging.getOrElse("jar"))
Seq(artifact.groupId, artifactDirs, artifact.version, ext + "s").mkString(File.separator)
}
new File(prefix, artifactPath)
@@ -57,9 +58,11 @@ private[deploy] object IvyTestUtils {
private[deploy] def artifactName(
artifact: MavenCoordinate,
useIvyLayout: Boolean,
- ext: String = ".jar"): String = {
+ extOverride: Option[String]): String = {
+ val classifier = artifact.classifier.map("-" + _).getOrElse("")
+ val ext = extOverride.getOrElse("." + artifact.packaging.getOrElse("jar"))
if (!useIvyLayout) {
- s"${artifact.artifactId}-${artifact.version}$ext"
+ s"${artifact.artifactId}-${artifact.version}$classifier$ext"
} else {
s"${artifact.artifactId}$ext"
}
@@ -152,11 +155,11 @@ private[deploy] object IvyTestUtils {
dependencies: Option[Seq[MavenCoordinate]],
useIvyLayout: Boolean): File = {
if (useIvyLayout) {
- val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
+ val ivyXmlPath = pathFromCoordinate(artifact, tempPath, Some("ivy"), useIvyLayout)
Files.createParentDirs(new File(ivyXmlPath, "dummy"))
createIvyDescriptor(ivyXmlPath, artifact, dependencies)
} else {
- val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
+ val pomPath = pathFromCoordinate(artifact, tempPath, Some("pom"), useIvyLayout)
Files.createParentDirs(new File(pomPath, "dummy"))
createPom(pomPath, artifact, dependencies)
}
@@ -167,6 +170,12 @@ private[deploy] object IvyTestUtils {
var result = "\n" + " " * tabCount + s"${artifact.groupId}"
result += "\n" + " " * tabCount + s"${artifact.artifactId}"
result += "\n" + " " * tabCount + s"${artifact.version}"
+ if (artifact.classifier.isDefined) {
+ result += "\n" + " " * tabCount + s"${artifact.classifier.get}"
+ }
+ if (artifact.packaging.isDefined) {
+ result += "\n" + " " * tabCount + s"${artifact.packaging.get}"
+ }
result
}
@@ -191,7 +200,7 @@ private[deploy] object IvyTestUtils {
"\n \n" + inside + "\n "
}.getOrElse("")
content += "\n"
- writeFile(dir, artifactName(artifact, false, ".pom"), content.trim)
+ writeFile(dir, artifactName(artifact, false, Some(".pom")), content.trim)
}
/** Helper method to write artifact information in the ivy.xml. */
@@ -206,6 +215,8 @@ private[deploy] object IvyTestUtils {
dir: File,
artifact: MavenCoordinate,
dependencies: Option[Seq[MavenCoordinate]]): File = {
+ val typeExt = artifact.packaging.getOrElse("jar")
+ val classifier = artifact.classifier.map(c => s"classifier=$c").getOrElse("")
var content = s"""
|
|
@@ -221,7 +232,8 @@ private[deploy] object IvyTestUtils {
|
|
|
- |
|
""".stripMargin.trim
@@ -241,7 +253,7 @@ private[deploy] object IvyTestUtils {
useIvyLayout: Boolean,
withR: Boolean,
withManifest: Option[Manifest] = None): File = {
- val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
+ val jarFile = new File(dir, artifactName(artifact, useIvyLayout, None))
val jarFileStream = new FileOutputStream(jarFile)
val manifest = withManifest.getOrElse {
val mani = new Manifest()
@@ -295,7 +307,7 @@ private[deploy] object IvyTestUtils {
val root = new File(tempPath, tempPath.hashCode().toString)
Files.createParentDirs(new File(root, "dummy"))
try {
- val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
+ val jarPath = pathFromCoordinate(artifact, tempPath, None, useIvyLayout)
Files.createParentDirs(new File(jarPath, "dummy"))
val className = "MyLib"
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 005587051b6ad..2fa473cfab279 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -40,13 +40,13 @@ class RPackageUtilsSuite
with BeforeAndAfterEach
with ResetSystemProperties {
- private val main = MavenCoordinate("a", "b", "c")
- private val dep1 = MavenCoordinate("a", "dep1", "c")
- private val dep2 = MavenCoordinate("a", "dep2", "d")
+ private val main = new MavenCoordinate("a", "b", "c")
+ private val dep1 = new MavenCoordinate("a", "dep1", "c")
+ private val dep2 = new MavenCoordinate("a", "dep2", "d")
private def getJarPath(coord: MavenCoordinate, repo: File): File = {
- new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false),
- IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
+ new File(IvyTestUtils.pathFromCoordinate(coord, repo, None, useIvyLayout = false),
+ IvyTestUtils.artifactName(coord, useIvyLayout = false, None))
}
private val lineBuffer = ArrayBuffer[String]()
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 7c2ec01a03d04..5d6b91ba5c773 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -423,8 +423,8 @@ class SparkSubmitSuite
// SPARK-7287
test("includes jars passed in through --packages") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
- val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
- val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
+ val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val dep = new MavenCoordinate("my.great.dep", "mylib", "0.1")
IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
@@ -446,7 +446,7 @@ class SparkSubmitSuite
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
// Check if the SparkR package is installed
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
- val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val rScriptDir =
Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 266c9d33b5a96..2356acf24518d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -58,7 +58,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("incorrect maven coordinate throws error") {
- val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")
+ val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a",
+ "a:b:c:d:e:f")
for (coordinate <- coordinates) {
intercept[IllegalArgumentException] {
SparkSubmitUtils.extractMavenCoordinates(coordinate)
@@ -91,6 +92,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("extractMavenCoordinates parses correctly") {
+ for (s <- Seq("g:a:v,g:a:p:v,g:a:p:c:v", "g/a/v,g/a/p/v,g/a/p/c/v")) {
+ val Seq(gav, gapv, gapcv) = SparkSubmitUtils.extractMavenCoordinates(s)
+ assert(new MavenCoordinate("g", "a", "v") === gav)
+ assert(new MavenCoordinate("g", "a", Some("p"), None, "v") === gapv)
+ assert(new MavenCoordinate("g", "a", Some("p"), Some("c"), "v") === gapcv)
+ }
+ }
+
test("add dependencies works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
@@ -115,7 +125,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(rule2.getOrganisation === "c")
assert(rule2.getName === "d")
intercept[IllegalArgumentException] {
- SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
+ SparkSubmitUtils.createExclusion("e:f:g:h:i:j", new IvySettings, "default")
}
}
@@ -128,7 +138,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(index >= 0)
jPaths = jPaths.substring(index + tempIvyPath.length)
}
- val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
+ val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
@@ -199,7 +209,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
- val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
+ val main = new MavenCoordinate(
+ "org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,