Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,16 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
if (addToClasspath && localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
// If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
if (addToClasspath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here explaining what exactly thi sis doing to help explain and keep from breaking in future.

Also can you add another unit test to cover this case.

if (localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
}
} else {
if (localizedPath != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here is localizedPath == null ?

throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
" to the distributed cache.")
}
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
}

test("distribute archive multiple times") {
val libs = Utils.createTempDir()
// Create jars dir and RELEASE file to avoid IllegalStateException.
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see jarsDir being used anywhere either

new FileOutputStream(new File(libs, "RELEASE")).close()

val userLib1 = Utils.createTempDir()
val testJar = TestUtils.createJarWithFiles(Map(), userLib1)

// Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))

val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
intercept[IllegalArgumentException] {
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
}

// Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))

val clientFiles = createClient(sparkConfFiles)
val tempDirForFiles = Utils.createTempDir()
intercept[IllegalArgumentException] {
clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
}

// Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))

val clientArchives = createClient(sparkConfArchives)
val tempDirForArchives = Utils.createTempDir()
intercept[IllegalArgumentException] {
clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
}
}

test("distribute local spark jars") {
val temp = Utils.createTempDir()
val jarsDir = new File(temp, "jars")
Expand Down