Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,25 +307,26 @@ private[spark] class SparkSubmit extends Logging {
packagesTransitive = true, args.packagesExclusions, args.packages,
args.repositories, args.ivyRepoPath, args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
if (resolvedMavenCoordinates.nonEmpty) {
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
for (jar <- resolvedMavenCoordinates) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
childClasspath ++= resolvedMavenCoordinates
} else {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
args.pyFiles = mergeFileLists(args.pyFiles,
mergeFileLists(resolvedMavenCoordinates: _*))
}
}
}
Expand Down Expand Up @@ -1201,7 +1202,7 @@ private[spark] object SparkSubmitUtils {
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu FYI you missed updating the @return Scaladoc tag here, as well as the description which explicitly mentions a comma-delimited list. Would you mind submitting a follow-on to update?

def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): String = {
cacheDirectory: File): Seq[String] = {
artifacts.map { artifactInfo =>
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
val extraAttrs = artifactInfo.asInstanceOf[Artifact].getExtraAttributes
Expand All @@ -1212,7 +1213,7 @@ private[spark] object SparkSubmitUtils {
}
cacheDirectory.getAbsolutePath + File.separator +
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
}.mkString(",")
}
}

/** Adds the given maven coordinates to Ivy's module descriptor. */
Expand Down Expand Up @@ -1362,17 +1363,17 @@ private[spark] object SparkSubmitUtils {
* @param ivySettings An IvySettings containing resolvers to use
* @param transitive Whether resolving transitive dependencies, default is true
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* @return Seq of path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
ivySettings: IvySettings,
transitive: Boolean,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
isTest: Boolean = false): Seq[String] = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
Nil
} else {
val sysOut = System.out
// Default configuration name for ivy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.deploy.worker

import java.io.File

import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -86,8 +84,9 @@ object DriverWrapper extends Logging {
ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath))
val jars = {
val jarsProp = sys.props.get(config.JARS.key).orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
if (resolvedMavenCoordinates.nonEmpty) {
DependencyUtils.mergeFileLists(jarsProp,
DependencyUtils.mergeFileLists(resolvedMavenCoordinates: _*))
} else {
jarsProp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] object DependencyUtils extends Logging {
ivyProperties.repositories,
ivyProperties.ivyRepoPath,
Option(ivyProperties.ivySettingsPath)
).split(",")
)
}

def resolveMavenDependencies(
Expand All @@ -163,7 +163,7 @@ private[spark] object DependencyUtils extends Logging {
packages: String,
repositories: String,
ivyRepoPath: String,
ivySettingsPath: Option[String]): String = {
ivySettingsPath: Option[String]): Seq[String] = {
val exclusions: Seq[String] =
if (!StringUtils.isBlank(packagesExclusions)) {
packagesExclusions.split(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("ivy path works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
for (i <- 0 until 3) {
val index = jPaths.indexOf(tempIvyPath)
assert(index >= 0)
jPaths = jPaths.substring(index + tempIvyPath.length)
}
val jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
assert(jPaths.count(_.startsWith(tempIvyPath)) >= 3)
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
Expand All @@ -137,7 +133,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should use non-default ivy path")
}
}

Expand All @@ -151,8 +147,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency")
}
// Local Ivy Repository
val settings = new IvySettings
Expand All @@ -163,8 +159,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency")
}
// Local ivy repository with modified home
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
Expand All @@ -176,9 +172,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path")
assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency")
}
}

Expand All @@ -202,15 +198,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(path === "", "should return empty path")
assert(path.isEmpty, "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)),
transitive = true,
isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return artifact")
}
}

Expand All @@ -224,8 +220,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
exclusions = Seq("my.great.dep:mydep"),
transitive = true,
isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return artifact")
assert(files.forall(_.indexOf("my.great.dep") < 0), "Returned excluded artifact")
}
}

Expand Down Expand Up @@ -260,9 +256,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
ivySettings = testUtilSettings) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings,
transitive = true, isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path")
assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[hive] object IsolatedClientLoader extends Logging {
Seq("com.google.guava:guava:14.0.1",
s"org.apache.hadoop:hadoop-client:$hadoopVersion")

val classpath = quietly {
val classpaths = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
SparkSubmitUtils.buildIvySettings(
Expand All @@ -127,7 +127,7 @@ private[hive] object IsolatedClientLoader extends Logging {
transitive = true,
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet
val allFiles = classpaths.map(new File(_)).toSet

// TODO: Remove copy logic.
val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
Expand Down