diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5e7e3be08d0f..9062625319e2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -364,6 +364,10 @@ private[spark] class Client( // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] + // Used to keep track of URIs(files) added to the distribute cache have the same name. If + // same name but different path files are added multiple time, YARN will fail to launch + // containers for the app with an internal error. + val distributedNames = new HashSet[String] YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) @@ -376,11 +380,16 @@ private[spark] class Client( def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() + val fileName = new File(uri.getPath).getName if (distributedUris.contains(uriStr)) { - logWarning(s"Resource $uri added multiple times to distributed cache.") + logWarning(s"Same path resource $uri added multiple times to distributed cache.") + false + } else if (distributedNames.contains(fileName)) { + logWarning(s"Same name resource $uri added multiple times to distributed cache") false } else { distributedUris += uriStr + distributedNames += fileName true } } @@ -519,8 +528,7 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => flist.foreach { file => val (_, localizedPath) = distribute(file, resType = resType) - require(localizedPath != null) - if (addToClasspath) { + if (addToClasspath && localizedPath != null) { cachedSecondaryJarLinks += localizedPath } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 74e268dc4847..23050e8c1d5c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.{File, FileOutputStream} +import java.io.{File, FileInputStream, FileOutputStream} import java.net.URI import java.util.Properties @@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } + test("ignore same name jars") { + val libs = Utils.createTempDir() + val jarsDir = new File(libs, "jars") + assert(jarsDir.mkdir()) + new FileOutputStream(new File(libs, "RELEASE")).close() + val userLibs = Utils.createTempDir() + + val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) + val jar2 = TestUtils.createJarWithFiles(Map(), userLibs) + // Copy jar2 to jar3 with same name + val jar3 = { + val target = new File(userLibs, new File(jar1.toURI).getName) + val input = new FileInputStream(jar2.getPath) + val output = new FileOutputStream(target) + Utils.copyStream(input, output, closeStreams = true) + target.toURI.toURL + } + + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) + + val client = createClient(sparkConf) + val tempDir = Utils.createTempDir() + client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + + // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be + // ignored. + sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) + } + object Fixtures { val knownDefYarnAppCP: Seq[String] =