diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8f1425fbb849..a4d76c0a2505 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1277,7 +1277,12 @@ private[spark] object SparkSubmitUtils { settingsFile: String, remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = { - val file = new File(settingsFile) + val uri = new URI(settingsFile) + val file = Option(uri.getScheme).getOrElse("file") match { + case "file" => new File(uri.getPath) + case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " + + "spark.jars.ivySettings") + } require(file.exists(), s"Ivy settings file $file does not exist") require(file.isFile(), s"Ivy settings file $file is not a normal file") val ivySettings: IvySettings = new IvySettings diff --git a/docs/configuration.md b/docs/configuration.md index 612d62a96f30..18a230370a30 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -771,7 +771,12 @@ Apart from these, the following properties are also available, and may be useful option --repositories or spark.jars.repositories will also be included. Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house artifact server like Artifactory. Details on the settings file format can be - found at Settings Files + found at Settings Files. + Only paths with file:// scheme are supported. Paths without a scheme are assumed to have + a file:// scheme. +

+ When running in YARN cluster mode, this file will also be localized to the remote driver for dependency + resolution within SparkContext#addJar 2.2.0 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 618faef2d58b..427202fbe962 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{FileSystem => _, _} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -30,7 +31,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.control.NonFatal import com.google.common.base.Objects -import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission @@ -518,6 +518,32 @@ private[spark] class Client( require(localizedPath != null, "Keytab file already distributed.") } + // If we passed in a ivySettings file, make sure we copy the file to the distributed cache + // in cluster mode so that the driver can access it + val ivySettings = sparkConf.getOption("spark.jars.ivySettings") + val ivySettingsLocalizedPath: Option[String] = ivySettings match { + case Some(ivySettingsPath) if isClusterMode => + val uri = new URI(ivySettingsPath) + Option(uri.getScheme).getOrElse("file") match { + case "file" => + val ivySettingsFile = new File(uri.getPath) + require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found") + require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" + + "normal file") + // Generate a file name that can be used for the ivySettings file, that does not + // conflict with any user file. + val localizedFileName = Some(ivySettingsFile.getName() + "-" + + UUID.randomUUID().toString) + val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName) + require(localizedPath != null, "IvySettings file already distributed.") + Some(localizedPath) + case scheme => + throw new IllegalArgumentException(s"Scheme $scheme not supported in " + + "spark.jars.ivySettings") + } + case _ => None + } + /** * Add Spark to the cache. There are two settings that control what files to add to the cache: * - if a Spark archive is defined, use the archive. The archive is expected to contain @@ -576,7 +602,7 @@ private[spark] class Client( jarsDir.listFiles().foreach { f => if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) { jarsStream.putNextEntry(new ZipEntry(f.getName)) - Files.copy(f, jarsStream) + Files.copy(f.toPath, jarsStream) jarsStream.closeEntry() } } @@ -672,7 +698,18 @@ private[spark] class Client( val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf) cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) - val localConfArchive = new Path(createConfArchive().toURI()) + val confsToOverride = Map.empty[String, String] + // If propagating the keytab to the AM, override the keytab name with the name of the + // distributed file. + amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) } + + // If propagating the ivySettings file to the distributed cache, override the ivySettings + // file name with the name of the distributed file. + ivySettingsLocalizedPath.foreach { path => + confsToOverride.put("spark.jars.ivySettings", path) + } + + val localConfArchive = new Path(createConfArchive(confsToOverride).toURI()) copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE)) @@ -701,8 +738,10 @@ private[spark] class Client( * * The archive also contains some Spark configuration. Namely, it saves the contents of * SparkConf in a file to be loaded by the AM process. + * + * @param confsToOverride configs that should overriden when creating the final spark conf file */ - private def createConfArchive(): File = { + private def createConfArchive(confsToOverride: Map[String, String]): File = { val hadoopConfFiles = new HashMap[String, File]() // SPARK_CONF_DIR shows up in the classpath before HADOOP_CONF_DIR/YARN_CONF_DIR @@ -764,7 +803,7 @@ private[spark] class Client( if url.getProtocol == "file" } { val file = new File(url.getPath()) confStream.putNextEntry(new ZipEntry(file.getName())) - Files.copy(file, confStream) + Files.copy(file.toPath, confStream) confStream.closeEntry() } @@ -775,7 +814,7 @@ private[spark] class Client( hadoopConfFiles.foreach { case (name, file) => if (file.canRead()) { confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name")) - Files.copy(file, confStream) + Files.copy(file.toPath, confStream) confStream.closeEntry() } } @@ -788,11 +827,7 @@ private[spark] class Client( // Save Spark configuration to a file in the archive. val props = confToProperties(sparkConf) - - // If propagating the keytab to the AM, override the keytab name with the name of the - // distributed file. - amKeytabFileName.foreach { kt => props.setProperty(KEYTAB.key, kt) } - + confsToOverride.foreach { case (k, v) => props.setProperty(k, v)} writePropertiesToArchive(props, SPARK_CONF_FILE, confStream) // Write the distributed cache config to the archive. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9bc934d246fe..26ff3bf2971f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.scalatest.concurrent.Eventually._ +import org.scalatest.exceptions.TestFailedException import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ @@ -368,6 +369,64 @@ class YarnClusterSuite extends BaseYarnClusterSuite { ) checkResult(finalState, result, "true") } + + def createEmptyIvySettingsFile: File = { + val emptyIvySettings = File.createTempFile("ivy", ".xml") + Files.write("", emptyIvySettings, StandardCharsets.UTF_8) + emptyIvySettings + } + + test("SPARK-34472: ivySettings file with no scheme or file:// scheme should be " + + "localized on driver in cluster mode") { + val emptyIvySettings = createEmptyIvySettingsFile + // For file:// URIs or URIs without scheme, make sure that ivySettings conf was changed + // to the localized file. So the expected ivySettings path on the driver will start with + // the file name and then some random UUID suffix + testIvySettingsDistribution(clientMode = false, emptyIvySettings.getAbsolutePath, + emptyIvySettings.getName, prefixMatch = true) + testIvySettingsDistribution(clientMode = false, s"file://${emptyIvySettings.getAbsolutePath}", + emptyIvySettings.getName, prefixMatch = true) + } + + test("SPARK-34472: ivySettings file with no scheme or file:// scheme should retain " + + "user provided path in client mode") { + val emptyIvySettings = createEmptyIvySettingsFile + // In client mode, the file is present locally on the driver and so does not need to be + // distributed. So the user provided path should be kept as is. + testIvySettingsDistribution(clientMode = true, emptyIvySettings.getAbsolutePath, + emptyIvySettings.getAbsolutePath) + testIvySettingsDistribution(clientMode = true, s"file://${emptyIvySettings.getAbsolutePath}", + s"file://${emptyIvySettings.getAbsolutePath}") + } + + test("SPARK-34472: ivySettings file with non-file:// schemes should throw an error") { + val emptyIvySettings = createEmptyIvySettingsFile + val e1 = intercept[TestFailedException] { + testIvySettingsDistribution(clientMode = false, + s"local://${emptyIvySettings.getAbsolutePath}", "") + } + assert(e1.getMessage.contains("IllegalArgumentException: " + + "Scheme local not supported in spark.jars.ivySettings")) + val e2 = intercept[TestFailedException] { + testIvySettingsDistribution(clientMode = false, + s"hdfs://${emptyIvySettings.getAbsolutePath}", "") + } + assert(e2.getMessage.contains("IllegalArgumentException: " + + "Scheme hdfs not supported in spark.jars.ivySettings")) + } + + def testIvySettingsDistribution(clientMode: Boolean, ivySettingsPath: String, + expectedIvySettingsPrefixOnDriver: String, prefixMatch: Boolean = false): Unit = { + val result = File.createTempFile("result", null, tempDir) + val outFile = File.createTempFile("out", null, tempDir) + val finalState = runSpark(clientMode = clientMode, + mainClassName(YarnAddJarTest.getClass), + appArgs = Seq(result.getAbsolutePath, expectedIvySettingsPrefixOnDriver, + prefixMatch.toString), + extraConf = Map("spark.jars.ivySettings" -> ivySettingsPath), + outFile = Option(outFile)) + checkResult(finalState, result, outFile = Option(outFile)) + } } private[spark] class SaveExecutorInfo extends SparkListener { @@ -583,6 +642,50 @@ private object YarnClasspathTest extends Logging { } +private object YarnAddJarTest extends Logging { + def main(args: Array[String]): Unit = { + if (args.length != 3) { + // scalastyle:off println + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnAddJarTest [result file] [expected ivy settings path] [prefix match] + """.stripMargin) + // scalastyle:on println + System.exit(1) + } + + val resultPath = args(0) + val expectedIvySettingsPath = args(1) + val prefixMatch = args(2).toBoolean + val sc = new SparkContext(new SparkConf()) + + var result = "failure" + try { + val settingsFile = sc.getConf.get("spark.jars.ivySettings") + if (prefixMatch) { + assert(settingsFile !== expectedIvySettingsPath) + assert(settingsFile.startsWith(expectedIvySettingsPath)) + } else { + assert(settingsFile === expectedIvySettingsPath) + } + + val caught = intercept[RuntimeException] { + sc.addJar("ivy://org.fake-project.test:test:1.0.0") + } + if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) { + // "unresolved dependency" is expected as the dependency does not exist + // but exception like "Ivy settings file does not exist" should result in failure + result = "success" + } + } finally { + Files.write(result, new File(resultPath), StandardCharsets.UTF_8) + sc.stop() + } + } +} + private object YarnLauncherTestApp { def main(args: Array[String]): Unit = {