Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
// Used to keep track of URIs(files) added to the distribute cache have the same name. If
// same name but different path files are added multiple time, YARN will fail to launch
// containers for the app with an internal error.
val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)

Expand All @@ -376,11 +380,16 @@ private[spark] class Client(

def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) {
logWarning(s"Resource $uri added multiple times to distributed cache.")
logWarning(s"Same path resource $uri added multiple times to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
false
} else {
distributedUris += uriStr
distributedNames += fileName
true
}
}
Expand Down Expand Up @@ -519,8 +528,7 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
require(localizedPath != null)
if (addToClasspath) {
if (addToClasspath && localizedPath != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this change about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 , in the previous code we assume all the files will be uploaded into distributed cache, so this localizedPath should not be null. But here with my change, some duplicated files will be neglected, this is return localizedPath as null instead, so here I change to this way.

cachedSecondaryJarLinks += localizedPath
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{File, FileOutputStream}
import java.io.{File, FileInputStream, FileOutputStream}
import java.net.URI
import java.util.Properties

Expand Down Expand Up @@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}

test("ignore same name jars") {
val libs = Utils.createTempDir()
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()
val userLibs = Utils.createTempDir()

val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
// Copy jar2 to jar3 with same name
val jar3 = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have used java.nio.file.Files.copy, but no need to change that now.

val target = new File(userLibs, new File(jar1.toURI).getName)
val input = new FileInputStream(jar2.getPath)
val output = new FileOutputStream(target)
Utils.copyStream(input, output, closeStreams = true)
target.toURI.toURL
}

val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))

val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)

// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
// ignored.
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
Expand Down