Skip to content

Commit 9c5b562

Browse files
committed
[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode
With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it. This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode. Author: jerryshao <sshao@hortonworks.com> Closes #18962 from jerryshao/SPARK-21714. (cherry picked from commit 1813c4a) Signed-off-by: jerryshao <sshao@hortonworks.com> Change-Id: Ib2e8cb056707b362bc1c496002bac1472dc78ea7
1 parent 59bb7eb commit 9c5b562

File tree

5 files changed

+115
-48
lines changed

5 files changed

+115
-48
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils {
208208

209209
/**
210210
* Prepare the environment for submitting an application.
211-
* This returns a 4-tuple:
212-
* (1) the arguments for the child process,
213-
* (2) a list of classpath entries for the child,
214-
* (3) a map of system properties, and
215-
* (4) the main class for the child
211+
*
212+
* @param args the parsed SparkSubmitArguments used for environment preparation.
213+
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
214+
* @return a 4-tuple:
215+
* (1) the arguments for the child process,
216+
* (2) a list of classpath entries for the child,
217+
* (3) a map of system properties, and
218+
* (4) the main class for the child
219+
*
216220
* Exposed for testing.
217221
*/
218-
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
222+
private[deploy] def prepareSubmitEnvironment(
223+
args: SparkSubmitArguments,
224+
conf: Option[HadoopConfiguration] = None)
219225
: (Seq[String], Seq[String], Map[String, String], String) = {
220226
// Return values
221227
val childArgs = new ArrayBuffer[String]()
@@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils {
311317
}
312318

313319
// In client mode, download remote files.
320+
var localPrimaryResource: String = null
321+
var localJars: String = null
322+
var localPyFiles: String = null
323+
var localFiles: String = null
314324
if (deployMode == CLIENT) {
315-
val hadoopConf = new HadoopConfiguration()
316-
args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
317-
args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
318-
args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
319-
args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
325+
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
326+
localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
327+
localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
328+
localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
329+
localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
320330
}
321331

322332
// Require all python files to be local, so we can add them to the PYTHONPATH
@@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils {
366376
// If a python file is provided, add it to the child arguments and list of files to deploy.
367377
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
368378
args.mainClass = "org.apache.spark.deploy.PythonRunner"
369-
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
379+
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
370380
if (clusterManager != YARN) {
371381
// The YARN backend distributes the primary file differently, so don't merge it.
372382
args.files = mergeFileLists(args.files, args.primaryResource)
@@ -376,8 +386,8 @@ object SparkSubmit extends CommandLineUtils {
376386
// The YARN backend handles python files differently, so don't merge the lists.
377387
args.files = mergeFileLists(args.files, args.pyFiles)
378388
}
379-
if (args.pyFiles != null) {
380-
sysProps("spark.submit.pyFiles") = args.pyFiles
389+
if (localPyFiles != null) {
390+
sysProps("spark.submit.pyFiles") = localPyFiles
381391
}
382392
}
383393

@@ -431,7 +441,7 @@ object SparkSubmit extends CommandLineUtils {
431441
// If an R file is provided, add it to the child arguments and list of files to deploy.
432442
// Usage: RRunner <main R file> [app arguments]
433443
args.mainClass = "org.apache.spark.deploy.RRunner"
434-
args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
444+
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
435445
args.files = mergeFileLists(args.files, args.primaryResource)
436446
}
437447
}
@@ -468,6 +478,7 @@ object SparkSubmit extends CommandLineUtils {
468478
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
469479
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
470480
sysProp = "spark.executor.instances"),
481+
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
471482
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
472483
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
473484
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
@@ -491,15 +502,28 @@ object SparkSubmit extends CommandLineUtils {
491502
sysProp = "spark.driver.cores"),
492503
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
493504
sysProp = "spark.driver.supervise"),
494-
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
505+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
506+
507+
// An internal option used only for spark-shell to add user jars to repl's classloader,
508+
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
509+
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
510+
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
495511
)
496512

497513
// In client mode, launch the application main class directly
498514
// In addition, add the main application jar and any added jars (if any) to the classpath
499-
// Also add the main application jar and any added jars to classpath in case YARN client
500-
// requires these jars.
501-
if (deployMode == CLIENT || isYarnCluster) {
515+
if (deployMode == CLIENT) {
502516
childMainClass = args.mainClass
517+
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
518+
childClasspath += localPrimaryResource
519+
}
520+
if (localJars != null) { childClasspath ++= localJars.split(",") }
521+
}
522+
// Add the main application jar and any added jars to classpath in case YARN client
523+
// requires these jars.
524+
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
525+
// added to the classpath of YARN client.
526+
if (isYarnCluster) {
503527
if (isUserJar(args.primaryResource)) {
504528
childClasspath += args.primaryResource
505529
}
@@ -556,10 +580,6 @@ object SparkSubmit extends CommandLineUtils {
556580
if (args.isPython) {
557581
sysProps.put("spark.yarn.isPython", "true")
558582
}
559-
560-
if (args.pyFiles != null) {
561-
sysProps("spark.submit.pyFiles") = args.pyFiles
562-
}
563583
}
564584

565585
// assure a keytab is available from any place in a JVM

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ package object config {
8787
.intConf
8888
.createOptional
8989

90-
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
90+
private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
9191
.internal()
9292
.stringConf
9393
.toSequence

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,18 +2580,23 @@ private[spark] object Utils extends Logging {
25802580
}
25812581

25822582
/**
2583-
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
2584-
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
2585-
* only the "spark.jars" property.
2583+
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
2584+
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
2585+
* has its own mechanism to distribute jars.
25862586
*/
2587-
def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
2587+
def getUserJars(conf: SparkConf): Seq[String] = {
25882588
val sparkJars = conf.getOption("spark.jars")
2589-
if (conf.get("spark.master") == "yarn" && isShell) {
2590-
val yarnJars = conf.getOption("spark.yarn.dist.jars")
2591-
unionFileLists(sparkJars, yarnJars).toSeq
2592-
} else {
2593-
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
2594-
}
2589+
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
2590+
}
2591+
2592+
/**
2593+
* Return the local jar files which will be added to REPL's classpath. These jar files are
2594+
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
2595+
* SparkSubmit at first.
2596+
*/
2597+
def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
2598+
val localJars = conf.getOption("spark.repl.local.jars")
2599+
localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
25952600
}
25962601

25972602
private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.io.Source
2727
import com.google.common.io.ByteStreams
2828
import org.apache.commons.io.{FilenameUtils, FileUtils}
2929
import org.apache.hadoop.conf.Configuration
30-
import org.apache.hadoop.fs.Path
30+
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
3131
import org.scalatest.{BeforeAndAfterEach, Matchers}
3232
import org.scalatest.concurrent.Timeouts
3333
import org.scalatest.time.SpanSugar._
@@ -738,10 +738,7 @@ class SparkSubmitSuite
738738

739739
test("downloadFile - file doesn't exist") {
740740
val hadoopConf = new Configuration()
741-
// Set s3a implementation to local file system for testing.
742-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
743-
// Disable file system impl cache to make sure the test file system is picked up.
744-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
741+
updateConfWithFakeS3Fs(hadoopConf)
745742
intercept[FileNotFoundException] {
746743
SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf)
747744
}
@@ -759,10 +756,7 @@ class SparkSubmitSuite
759756
val content = "hello, world"
760757
FileUtils.write(jarFile, content)
761758
val hadoopConf = new Configuration()
762-
// Set s3a implementation to local file system for testing.
763-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
764-
// Disable file system impl cache to make sure the test file system is picked up.
765-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
759+
updateConfWithFakeS3Fs(hadoopConf)
766760
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
767761
val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf)
768762
checkDownloadedFile(sourcePath, outputPath)
@@ -775,10 +769,7 @@ class SparkSubmitSuite
775769
val content = "hello, world"
776770
FileUtils.write(jarFile, content)
777771
val hadoopConf = new Configuration()
778-
// Set s3a implementation to local file system for testing.
779-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
780-
// Disable file system impl cache to make sure the test file system is picked up.
781-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
772+
updateConfWithFakeS3Fs(hadoopConf)
782773
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
783774
val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",")
784775

@@ -789,6 +780,43 @@ class SparkSubmitSuite
789780
}
790781
}
791782

783+
test("Avoid re-upload remote resources in yarn client mode") {
784+
val hadoopConf = new Configuration()
785+
updateConfWithFakeS3Fs(hadoopConf)
786+
787+
val tmpDir = Utils.createTempDir()
788+
val file = File.createTempFile("tmpFile", "", tmpDir)
789+
val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
790+
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
791+
val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
792+
val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
793+
794+
val args = Seq(
795+
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
796+
"--name", "testApp",
797+
"--master", "yarn",
798+
"--deploy-mode", "client",
799+
"--jars", tmpJarPath,
800+
"--files", s"s3a://${file.getAbsolutePath}",
801+
"--py-files", s"s3a://${pyFile.getAbsolutePath}",
802+
s"s3a://$mainResource"
803+
)
804+
805+
val appArgs = new SparkSubmitArguments(args)
806+
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3
807+
808+
// All the resources should still be remote paths, so that YARN client will not upload again.
809+
sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
810+
sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
811+
sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
812+
813+
// Local repl jars should be a local path.
814+
sysProps("spark.repl.local.jars") should (startWith("file:"))
815+
816+
// local py files should not be a URI format.
817+
sysProps("spark.submit.pyFiles") should (startWith("/"))
818+
}
819+
792820
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
793821
private def runSparkSubmit(args: Seq[String]): Unit = {
794822
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -828,6 +856,11 @@ class SparkSubmitSuite
828856
Utils.deleteRecursively(tmpDir)
829857
}
830858
}
859+
860+
private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
861+
conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
862+
conf.set("fs.s3a.impl.disable.cache", "true")
863+
}
831864
}
832865

833866
object JarCreationTest extends Logging {
@@ -897,4 +930,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
897930
// Ignore the scheme for testing.
898931
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
899932
}
933+
934+
override def globStatus(pathPattern: Path): Array[FileStatus] = {
935+
val newPath = new Path(pathPattern.toUri.getPath)
936+
super.globStatus(newPath).map { status =>
937+
val path = s"s3a://${status.getPath.toUri.getPath}"
938+
status.setPath(new Path(path))
939+
status
940+
}
941+
}
900942
}

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ object Main extends Logging {
5757
// Visible for testing
5858
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
5959
interp = _interp
60-
val jars = Utils.getUserJars(conf, isShell = true)
60+
val jars = Utils.getLocalUserJarsForShell(conf)
6161
// Remove file:///, file:// or file:/ scheme if exists for each jar
6262
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
6363
.mkString(File.pathSeparator)

0 commit comments

Comments
 (0)