From b8a3eed0d4a88be8eb0a6ca85ec086bb8846828d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 25 Dec 2020 11:12:31 +0800 Subject: [PATCH 1/3] [SPARK-33908][CORE] Refact SparkSubmitUtils.resolveMavenCoordinates return parameter --- .../org/apache/spark/deploy/SparkSubmit.scala | 19 +++++----- .../spark/deploy/worker/DriverWrapper.scala | 7 ++-- .../apache/spark/util/DependencyUtils.scala | 4 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 38 +++++++++---------- .../hive/client/IsolatedClientLoader.scala | 4 +- 5 files changed, 34 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ad95b18ecaeb..d9f2d6c22f43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -307,7 +307,7 @@ private[spark] class SparkSubmit extends Logging { packagesTransitive = true, args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath) - if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + if (resolvedMavenCoordinates.nonEmpty) { // In K8s client mode, when in the driver, add resolved jars early as we might need // them at the submit time for artifact downloading. // For example we might use the dependencies for downloading @@ -315,17 +315,18 @@ private[spark] class SparkSubmit extends Logging { // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 if (isKubernetesClusterModeDriver) { val loader = getSubmitClassLoader(sparkConf) - for (jar <- resolvedMavenCoordinates.split(",")) { + for (jar <- resolvedMavenCoordinates) { addJarToClasspath(jar, loader) } } else if (isKubernetesCluster) { // We need this in K8s cluster mode so that we can upload local deps // via the k8s application, like in cluster mode driver - childClasspath ++= resolvedMavenCoordinates.split(",") + childClasspath ++= resolvedMavenCoordinates } else { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) if (args.isPython || isInternal(args.primaryResource)) { - args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + args.pyFiles = mergeFileLists(args.pyFiles, + mergeFileLists(resolvedMavenCoordinates: _*)) } } } @@ -1201,7 +1202,7 @@ private[spark] object SparkSubmitUtils { */ def resolveDependencyPaths( artifacts: Array[AnyRef], - cacheDirectory: File): String = { + cacheDirectory: File): Seq[String] = { artifacts.map { artifactInfo => val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId val extraAttrs = artifactInfo.asInstanceOf[Artifact].getExtraAttributes @@ -1212,7 +1213,7 @@ private[spark] object SparkSubmitUtils { } cacheDirectory.getAbsolutePath + File.separator + s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar" - }.mkString(",") + } } /** Adds the given maven coordinates to Ivy's module descriptor. */ @@ -1370,9 +1371,9 @@ private[spark] object SparkSubmitUtils { ivySettings: IvySettings, transitive: Boolean, exclusions: Seq[String] = Nil, - isTest: Boolean = false): String = { + isTest: Boolean = false): Seq[String] = { if (coordinates == null || coordinates.trim.isEmpty) { - "" + Seq.empty[String] } else { val sysOut = System.out // Default configuration name for ivy diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index c1288d64c53f..7cf961f42112 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.worker import java.io.File -import org.apache.commons.lang3.StringUtils - import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{config, Logging} @@ -86,8 +84,9 @@ object DriverWrapper extends Logging { ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath)) val jars = { val jarsProp = sys.props.get(config.JARS.key).orNull - if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates) + if (resolvedMavenCoordinates.nonEmpty) { + DependencyUtils.mergeFileLists(jarsProp, + DependencyUtils.mergeFileLists(resolvedMavenCoordinates: _*)) } else { jarsProp } diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 9956ccedf584..0d78af2dafc9 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -154,7 +154,7 @@ private[spark] object DependencyUtils extends Logging { ivyProperties.repositories, ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath) - ).split(",") + ) } def resolveMavenDependencies( @@ -163,7 +163,7 @@ private[spark] object DependencyUtils extends Logging { packages: String, repositories: String, ivyRepoPath: String, - ivySettingsPath: Option[String]): String = { + ivySettingsPath: Option[String]): Seq[String] = { val exclusions: Seq[String] = if (!StringUtils.isBlank(packagesExclusions)) { packagesExclusions.split(",") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index eaa06ce2aa05..2bd24f483f68 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -123,12 +123,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("ivy path works correctly") { val md = SparkSubmitUtils.getModuleDescriptor val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) - for (i <- 0 until 3) { - val index = jPaths.indexOf(tempIvyPath) - assert(index >= 0) - jPaths = jPaths.substring(index + tempIvyPath.length) - } + val jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) + assert(jPaths.count(_.startsWith(tempIvyPath)) >= 3) val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end @@ -137,7 +133,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") + assert(jarPath.forall(_.indexOf(tempIvyPath) >=0), "should use non-default ivy path") } } @@ -151,8 +147,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf("mydep") >= 0, "should find dependency") + assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") + assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency") } // Local Ivy Repository val settings = new IvySettings @@ -163,8 +159,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf("mydep") >= 0, "should find dependency") + assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") + assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency") } // Local ivy repository with modified home val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator) @@ -176,9 +172,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") - assert(jarPath.indexOf("mydep") >= 0, "should find dependency") + assert(jarPath.exists(_.indexOf("mylib") >=0), "should find artifact") + assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path") + assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency") } } @@ -202,7 +198,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) - assert(path === "", "should return empty path") + assert(path.isEmpty, "should return empty path") val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => val files = SparkSubmitUtils.resolveMavenCoordinates( @@ -210,7 +206,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), transitive = true, isTest = true) - assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") + assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return artifact") } } @@ -224,8 +220,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { exclusions = Seq("my.great.dep:mydep"), transitive = true, isTest = true) - assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") - assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact") + assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return artifact") + assert(files.forall(_.indexOf("my.great.dep") < 0), "Returned excluded artifact") } } @@ -260,9 +256,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { ivySettings = testUtilSettings) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, transitive = true, isTest = true) - assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") - assert(jarPath.indexOf("mydep") >= 0, "should find dependency") + assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") + assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path") + assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 97e685efd27d..02bf86533c89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -118,7 +118,7 @@ private[hive] object IsolatedClientLoader extends Logging { Seq("com.google.guava:guava:14.0.1", s"org.apache.hadoop:hadoop-client:$hadoopVersion") - val classpath = quietly { + val classpaths = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), SparkSubmitUtils.buildIvySettings( @@ -127,7 +127,7 @@ private[hive] object IsolatedClientLoader extends Logging { transitive = true, exclusions = version.exclusions) } - val allFiles = classpath.split(",").map(new File(_)).toSet + val allFiles = classpaths.map(new File(_)).toSet // TODO: Remove copy logic. val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") From c93f5855af0f50e5885fc70c95c7b69bba0051d6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 25 Dec 2020 11:22:02 +0800 Subject: [PATCH 2/3] Update SparkSubmit.scala --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d9f2d6c22f43..3206844976b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1363,7 +1363,7 @@ private[spark] object SparkSubmitUtils { * @param ivySettings An IvySettings containing resolvers to use * @param transitive Whether resolving transitive dependencies, default is true * @param exclusions Exclusions to apply when resolving transitive dependencies - * @return The comma-delimited path to the jars of the given maven artifacts including their + * @return Seq of path to the jars of the given maven artifacts including their * transitive dependencies */ def resolveMavenCoordinates( From 4d8849644ae25ce94e5afbcc9b7270fe909ae869 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 25 Dec 2020 15:08:40 +0800 Subject: [PATCH 3/3] Update --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3206844976b8..7b7d9dd72a34 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1373,7 +1373,7 @@ private[spark] object SparkSubmitUtils { exclusions: Seq[String] = Nil, isTest: Boolean = false): Seq[String] = { if (coordinates == null || coordinates.trim.isEmpty) { - Seq.empty[String] + Nil } else { val sysOut = System.out // Default configuration name for ivy diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 2bd24f483f68..7819b3aef4d6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -133,7 +133,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.forall(_.indexOf(tempIvyPath) >=0), "should use non-default ivy path") + assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should use non-default ivy path") } } @@ -172,7 +172,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) - assert(jarPath.exists(_.indexOf("mylib") >=0), "should find artifact") + assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path") assert(jarPath.exists(_.indexOf("mydep") >= 0), "should find dependency") }