Skip to content

Commit 0d62c5c

Browse files
committed
[SPARK-34472][CORE] Ship ivySettings file to driver in cluster mode
1 parent b26e7b5 commit 0d62c5c

File tree

3 files changed

+94
-4
lines changed

3 files changed

+94
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,15 @@ private[spark] class SparkSubmit extends Logging {
300300
val isKubernetesClusterModeDriver = isKubernetesClient &&
301301
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
302302

303+
// When running in cluster mode, add ivySettings file to files so that the driver can use
304+
// it to resolve ivy packages
305+
if (deployMode == CLUSTER && args.ivySettingsPath.isDefined) {
306+
val ivySettingsFile = new File(args.ivySettingsPath.get)
307+
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
308+
require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a normal file")
309+
args.files = mergeFileLists(args.files, ivySettingsFile.getAbsolutePath)
310+
}
311+
303312
if (!isMesosCluster && !isStandAloneCluster) {
304313
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
305314
// too for packages that include Python code
@@ -1089,7 +1098,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
10891098
}
10901099

10911100
/** Provides utility functions to be used inside SparkSubmit. */
1092-
private[spark] object SparkSubmitUtils {
1101+
private[spark] object SparkSubmitUtils extends Logging {
10931102

10941103
// Exposed for testing
10951104
var printStream = SparkSubmit.printStream
@@ -1278,11 +1287,22 @@ private[spark] object SparkSubmitUtils {
12781287
remoteRepos: Option[String],
12791288
ivyPath: Option[String]): IvySettings = {
12801289
val file = new File(settingsFile)
1281-
require(file.exists(), s"Ivy settings file $file does not exist")
1282-
require(file.isFile(), s"Ivy settings file $file is not a normal file")
1290+
// When running driver in cluster mode, the settingsFile is localized and so needs to be
1291+
// accessed using just the file name and not the full file path
1292+
val localizedFile = new File(file.getName)
1293+
val resolvedFile = if (file.exists()) {
1294+
Some(file)
1295+
} else if (localizedFile.exists()) {
1296+
Some(localizedFile)
1297+
} else {
1298+
None
1299+
}
1300+
require(resolvedFile.isDefined, s"Ivy settings file $file does not exist")
1301+
require(resolvedFile.get.isFile, s"Ivy settings file $file is not a normal file")
1302+
12831303
val ivySettings: IvySettings = new IvySettings
12841304
try {
1285-
ivySettings.load(file)
1305+
ivySettings.load(resolvedFile.get)
12861306
} catch {
12871307
case e @ (_: IOException | _: ParseException) =>
12881308
throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,6 +1525,21 @@ class SparkSubmitSuite
15251525
conf.get(k) should be (v)
15261526
}
15271527
}
1528+
1529+
test("SPARK-34472: ship ivySettings file to the driver in cluster mode") {
1530+
val args = Seq(
1531+
"--class", "Foo",
1532+
"--master", "yarn",
1533+
"--deploy-mode", "cluster",
1534+
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath}",
1535+
"app.jar"
1536+
)
1537+
1538+
val appArgs = new SparkSubmitArguments(args)
1539+
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
1540+
1541+
conf.get("spark.yarn.dist.files") should be(s"file://${emptyIvySettings.getAbsolutePath}")
1542+
}
15281543
}
15291544

15301545
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.concurrent.duration._
2626
import scala.io.Source
2727

2828
import com.google.common.io.{ByteStreams, Files}
29+
import org.apache.commons.io.FileUtils
2930
import org.apache.hadoop.yarn.conf.YarnConfiguration
3031
import org.apache.hadoop.yarn.util.ConverterUtils
3132
import org.scalatest.concurrent.Eventually._
@@ -368,6 +369,19 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
368369
)
369370
checkResult(finalState, result, "true")
370371
}
372+
373+
test("SPARK-34472: ivySettings file should be localized on driver in cluster mode") {
374+
375+
val emptyIvySettings = File.createTempFile("ivy", ".xml")
376+
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
377+
378+
val result = File.createTempFile("result", null, tempDir)
379+
val finalState = runSpark(clientMode = false,
380+
mainClassName(YarnAddJarTest.getClass),
381+
appArgs = Seq(result.getAbsolutePath),
382+
extraConf = Map("spark.jars.ivySettings" -> emptyIvySettings.getAbsolutePath))
383+
checkResult(finalState, result)
384+
}
371385
}
372386

373387
private[spark] class SaveExecutorInfo extends SparkListener {
@@ -583,6 +597,47 @@ private object YarnClasspathTest extends Logging {
583597

584598
}
585599

600+
private object YarnAddJarTest extends Logging {
601+
def main(args: Array[String]): Unit = {
602+
if (args.length != 1) {
603+
// scalastyle:off println
604+
System.err.println(
605+
s"""
606+
|Invalid command line: ${args.mkString(" ")}
607+
|
608+
|Usage: YarnAddJarTest [result file]
609+
""".stripMargin)
610+
// scalastyle:on println
611+
System.exit(1)
612+
}
613+
614+
val resultPath = args(0)
615+
val sc = new SparkContext(new SparkConf())
616+
617+
var result = "failure"
618+
try {
619+
val settingsFile = sc.getConf.get("spark.jars.ivySettings")
620+
// Delete the original ivySettings file, so we ensure that the YARN localized file
621+
// is used by the addJar call
622+
// In a real cluster mode, the original settings file at the absolute path won't be present
623+
// on the driver
624+
new File(settingsFile).delete()
625+
626+
val caught = intercept[Exception] {
627+
sc.addJar("ivy://org.fake-project.test:test:1.0.0")
628+
}
629+
if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) {
630+
// "unresolved dependency" is expected as the dependency does not exist
631+
// but exception like "Ivy settings file <file> does not exist should result in failure
632+
result = "success"
633+
}
634+
} finally {
635+
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
636+
sc.stop()
637+
}
638+
}
639+
}
640+
586641
private object YarnLauncherTestApp {
587642

588643
def main(args: Array[String]): Unit = {

0 commit comments

Comments
 (0)