Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,12 @@ private[spark] object SparkSubmitUtils {
settingsFile: String,
remoteRepos: Option[String],
ivyPath: Option[String]): IvySettings = {
val file = new File(settingsFile)
val uri = new URI(settingsFile)
val file = Option(uri.getScheme).getOrElse("file") match {
case "file" => new File(uri.getPath)
case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
"spark.jars.ivySettings")
}
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile(), s"Ivy settings file $file is not a normal file")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shardulm94, does it work if you set spark.jars.ivySettings to ./ivysettings.xml and pass ivysettings.xml to spark.yarn.files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you might have to copy ivysettings.xml to the current working directory when Spark submit runs but I think it might work for your usecase.

Copy link
Contributor Author

@shardulm94 shardulm94 Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as you pointed out it only works if I have the copy in the current working directory. However we cannot control which directory our users launch spark-submit from. So ideally we would want something which works without user intervention.

What do you think of this? In Yarn Client, we can add the ivySettings file to spark.yarn.dist.files or __spark__conf__.zip and then we can modify the property spark.jars.ivySettings to change it to ./ivysettings.xml or __spark__conf__/ivysettings.xml within Yarn Client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it into __spark__conf__ sounds okay to me ... but I would prefer to have second opinions from Yarn experts such as @tgravescs, @mridulm or @jerryshao.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out, and it looks like it can be handled pretty cleanly this way targeting just YARN. shardulm94@12709f0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the PR with this approach.

val ivySettings: IvySettings = new IvySettings
Expand Down
7 changes: 6 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,12 @@ Apart from these, the following properties are also available, and may be useful
option <code>--repositories</code> or <code>spark.jars.repositories</code> will also be included.
Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house
artifact server like Artifactory. Details on the settings file format can be
found at <a href="http://ant.apache.org/ivy/history/latest-milestone/settings.html">Settings Files</a>
found at <a href="http://ant.apache.org/ivy/history/latest-milestone/settings.html">Settings Files</a>.
Only paths with <code>file://</code> scheme are supported. Paths without a scheme are assumed to have
a <code>file://</code> scheme.
<p/>
When running in YARN cluster mode, this file will also be localized to the remote driver for dependency
resolution within <code>SparkContext#addJar</code>
</td>
<td>2.2.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

Expand All @@ -30,7 +31,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.util.control.NonFatal

import com.google.common.base.Objects
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
Expand Down Expand Up @@ -518,6 +518,32 @@ private[spark] class Client(
require(localizedPath != null, "Keytab file already distributed.")
}

// If we passed in a ivySettings file, make sure we copy the file to the distributed cache
// in cluster mode so that the driver can access it
val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
val ivySettingsLocalizedPath: Option[String] = ivySettings match {
case Some(ivySettingsPath) if isClusterMode =>
val uri = new URI(ivySettingsPath)
Option(uri.getScheme).getOrElse("file") match {
case "file" =>
val ivySettingsFile = new File(uri.getPath)
require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" +
"normal file")
// Generate a file name that can be used for the ivySettings file, that does not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code (and the IllegalArgumentException below) -- convert to URI, check scheme, throw if bad scheme, assert on file existence -- is duplicated between SparkSubmit and Client. Can we put a method like getIvySettingsFile into SparkSubmit which is leveraged in both places?

Not too big of a duplication so if it can't be done cleanly no worries from my end.

Copy link
Contributor Author

@shardulm94 shardulm94 Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertions of file existence are slightly different in both places. In YarnClient we would want to make this assertion only for the file scheme before we process it, however in SparkSubmit we would make to make the assertion for all schemes after the appropriate download steps. E.g. for local scheme too we would want to validate that the final file that loadIvySettings will process does exist.
Once we add handling for local and hdfs schemes, the functions in both classes will differ even more. So I think it is okay to have duplication here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM thanks!

// conflict with any user file.
val localizedFileName = Some(ivySettingsFile.getName() + "-" +
UUID.randomUUID().toString)
val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName)
require(localizedPath != null, "IvySettings file already distributed.")
Some(localizedPath)
case scheme =>
throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
"spark.jars.ivySettings")
}
case _ => None
}

/**
* Add Spark to the cache. There are two settings that control what files to add to the cache:
* - if a Spark archive is defined, use the archive. The archive is expected to contain
Expand Down Expand Up @@ -576,7 +602,7 @@ private[spark] class Client(
jarsDir.listFiles().foreach { f =>
if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
jarsStream.putNextEntry(new ZipEntry(f.getName))
Files.copy(f, jarsStream)
Files.copy(f.toPath, jarsStream)
jarsStream.closeEntry()
}
}
Expand Down Expand Up @@ -672,7 +698,18 @@ private[spark] class Client(
val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())

val localConfArchive = new Path(createConfArchive().toURI())
val confsToOverride = Map.empty[String, String]
// If propagating the keytab to the AM, override the keytab name with the name of the
// distributed file.
amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) }

// If propagating the ivySettings file to the distributed cache, override the ivySettings
// file name with the name of the distributed file.
ivySettingsLocalizedPath.foreach { path =>
confsToOverride.put("spark.jars.ivySettings", path)
}

val localConfArchive = new Path(createConfArchive(confsToOverride).toURI())
copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true,
destName = Some(LOCALIZED_CONF_ARCHIVE))

Expand Down Expand Up @@ -701,8 +738,10 @@ private[spark] class Client(
*
* The archive also contains some Spark configuration. Namely, it saves the contents of
* SparkConf in a file to be loaded by the AM process.
*
* @param confsToOverride configs that should overriden when creating the final spark conf file
*/
private def createConfArchive(): File = {
private def createConfArchive(confsToOverride: Map[String, String]): File = {
val hadoopConfFiles = new HashMap[String, File]()

// SPARK_CONF_DIR shows up in the classpath before HADOOP_CONF_DIR/YARN_CONF_DIR
Expand Down Expand Up @@ -764,7 +803,7 @@ private[spark] class Client(
if url.getProtocol == "file" } {
val file = new File(url.getPath())
confStream.putNextEntry(new ZipEntry(file.getName()))
Files.copy(file, confStream)
Files.copy(file.toPath, confStream)
confStream.closeEntry()
}

Expand All @@ -775,7 +814,7 @@ private[spark] class Client(
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
Files.copy(file, confStream)
Files.copy(file.toPath, confStream)
confStream.closeEntry()
}
}
Expand All @@ -788,11 +827,7 @@ private[spark] class Client(

// Save Spark configuration to a file in the archive.
val props = confToProperties(sparkConf)

// If propagating the keytab to the AM, override the keytab name with the name of the
// distributed file.
amKeytabFileName.foreach { kt => props.setProperty(KEYTAB.key, kt) }

confsToOverride.foreach { case (k, v) => props.setProperty(k, v)}
writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)

// Write the distributed cache config to the archive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.concurrent.Eventually._
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

Expand Down Expand Up @@ -368,6 +369,64 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
)
checkResult(finalState, result, "true")
}

def createEmptyIvySettingsFile: File = {
val emptyIvySettings = File.createTempFile("ivy", ".xml")
Files.write("<ivysettings />", emptyIvySettings, StandardCharsets.UTF_8)
emptyIvySettings
}

test("SPARK-34472: ivySettings file with no scheme or file:// scheme should be " +
"localized on driver in cluster mode") {
val emptyIvySettings = createEmptyIvySettingsFile
// For file:// URIs or URIs without scheme, make sure that ivySettings conf was changed
// to the localized file. So the expected ivySettings path on the driver will start with
// the file name and then some random UUID suffix
testIvySettingsDistribution(clientMode = false, emptyIvySettings.getAbsolutePath,
emptyIvySettings.getName, prefixMatch = true)
testIvySettingsDistribution(clientMode = false, s"file://${emptyIvySettings.getAbsolutePath}",
emptyIvySettings.getName, prefixMatch = true)
}

test("SPARK-34472: ivySettings file with no scheme or file:// scheme should retain " +
"user provided path in client mode") {
val emptyIvySettings = createEmptyIvySettingsFile
// In client mode, the file is present locally on the driver and so does not need to be
// distributed. So the user provided path should be kept as is.
testIvySettingsDistribution(clientMode = true, emptyIvySettings.getAbsolutePath,
emptyIvySettings.getAbsolutePath)
testIvySettingsDistribution(clientMode = true, s"file://${emptyIvySettings.getAbsolutePath}",
s"file://${emptyIvySettings.getAbsolutePath}")
}

test("SPARK-34472: ivySettings file with non-file:// schemes should throw an error") {
val emptyIvySettings = createEmptyIvySettingsFile
val e1 = intercept[TestFailedException] {
testIvySettingsDistribution(clientMode = false,
s"local://${emptyIvySettings.getAbsolutePath}", "")
}
assert(e1.getMessage.contains("IllegalArgumentException: " +
"Scheme local not supported in spark.jars.ivySettings"))
val e2 = intercept[TestFailedException] {
testIvySettingsDistribution(clientMode = false,
s"hdfs://${emptyIvySettings.getAbsolutePath}", "")
}
assert(e2.getMessage.contains("IllegalArgumentException: " +
"Scheme hdfs not supported in spark.jars.ivySettings"))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use NIO for these? No need for commons-io now that NIO supports this kind of stuff built-in.

val emptyIvySettings = Files.createTempFile(...)
Files.write(emptyIvySettings, Seq(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to use Guava's Files which is used in many places within this file. Can I create a followup PR to replace these with NIO?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this comment, sounds fine to me.

def testIvySettingsDistribution(clientMode: Boolean, ivySettingsPath: String,
expectedIvySettingsPrefixOnDriver: String, prefixMatch: Boolean = false): Unit = {
val result = File.createTempFile("result", null, tempDir)
val outFile = File.createTempFile("out", null, tempDir)
val finalState = runSpark(clientMode = clientMode,
mainClassName(YarnAddJarTest.getClass),
appArgs = Seq(result.getAbsolutePath, expectedIvySettingsPrefixOnDriver,
prefixMatch.toString),
extraConf = Map("spark.jars.ivySettings" -> ivySettingsPath),
outFile = Option(outFile))
checkResult(finalState, result, outFile = Option(outFile))
}
}

private[spark] class SaveExecutorInfo extends SparkListener {
Expand Down Expand Up @@ -583,6 +642,50 @@ private object YarnClasspathTest extends Logging {

}

private object YarnAddJarTest extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnAddJarTest [result file] [expected ivy settings path] [prefix match]
""".stripMargin)
// scalastyle:on println
System.exit(1)
}

val resultPath = args(0)
val expectedIvySettingsPath = args(1)
val prefixMatch = args(2).toBoolean
val sc = new SparkContext(new SparkConf())

var result = "failure"
try {
val settingsFile = sc.getConf.get("spark.jars.ivySettings")
if (prefixMatch) {
assert(settingsFile !== expectedIvySettingsPath)
assert(settingsFile.startsWith(expectedIvySettingsPath))
} else {
assert(settingsFile === expectedIvySettingsPath)
}

val caught = intercept[RuntimeException] {
sc.addJar("ivy://org.fake-project.test:test:1.0.0")
}
if (caught.getMessage.contains("unresolved dependency: org.fake-project.test#test")) {
// "unresolved dependency" is expected as the dependency does not exist
// but exception like "Ivy settings file <file> does not exist" should result in failure
result = "success"
}
} finally {
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
sc.stop()
}
}
}

private object YarnLauncherTestApp {

def main(args: Array[String]): Unit = {
Expand Down