From 4cdeeed3f04f0ec62c6909e43ffe2d9824d863f7 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Thu, 4 May 2017 10:06:43 -0700 Subject: [PATCH 1/7] Make spark-submit download remote files to local file system for local/standalone client mode --- .../org/apache/spark/deploy/SparkSubmit.scala | 51 ++++++++++++- .../spark/deploy/SparkSubmitSuite.scala | 76 ++++++++++++++++++- 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77005aa9040b..4ec1a28d177f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,15 +20,18 @@ package org.apache.spark.deploy import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL +import java.nio.file.Files import java.security.PrivilegedExceptionAction import java.text.ParseException +import javax.ws.rs.core.UriBuilder import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Properties import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions @@ -308,6 +311,15 @@ object SparkSubmit extends CommandLineUtils { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } + // In client mode of local/standalone cluster, download remotes files. + if ((clusterManager == STANDALONE || clusterManager == LOCAL) && deployMode == CLIENT) { + val hadoopConf = new HadoopConfiguration() + args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull + args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull + args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull + args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull + } + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local. // In Mesos cluster mode, non-local python files are automatically downloaded by Mesos. @@ -825,6 +837,43 @@ object SparkSubmit extends CommandLineUtils { .mkString(",") if (merged == "") null else merged } + + /** + * Download a list of remote files to temp local files. If the file is local, the original file + * will be returned. + * @param fileList A comma separated file list, it cannot be null. + * @return A comma separated local files list. + */ + private[deploy] def downloadFileList( + fileList: String, + hadoopConf: HadoopConfiguration): String = { + fileList.split(",").map(downloadFile(_, hadoopConf)).mkString(",") + } + + /** + * Download remote file to a temporary local file. If the file is local, the original file + * will be returned. + */ + private[deploy] def downloadFile(path: String, hadoopConf: HadoopConfiguration): String = { + val uri = Utils.resolveURI(path) + uri.getScheme match { + case "file" | "local" => + path + + case _ => + val fs = FileSystem.get(uri, hadoopConf) + val tmpFile = new File(Files.createTempDirectory("tmp").toFile, uri.getPath) + // scalastyle:off println + printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.") + // scalastyle:on println + fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath)) + UriBuilder + .fromPath(tmpFile.getAbsolutePath) + .scheme("file") + .build() + .toString + } + } } /** Provides utility functions to be used inside SparkSubmit. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index a43839a8815f..f2d2e7dd2c9f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -18,12 +18,15 @@ package org.apache.spark.deploy import java.io._ +import java.net.URI import java.nio.charset.StandardCharsets import scala.collection.mutable.ArrayBuffer import scala.io.Source import com.google.common.io.ByteStreams +import org.apache.commons.io.{FilenameUtils, FileUtils} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts @@ -535,7 +538,7 @@ class SparkSubmitSuite test("resolves command line argument paths correctly") { val jars = "/jar1,/jar2" // --jars - val files = "hdfs:/file1,file2" // --files + val files = "local:/file1,file2" // --files val archives = "file:/archive1,archive2" // --archives val pyFiles = "py-file1,py-file2" // --py-files @@ -587,7 +590,7 @@ class SparkSubmitSuite test("resolves config paths correctly") { val jars = "/jar1,/jar2" // spark.jars - val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files + val files = "local:/file1,file2" // spark.files / spark.yarn.dist.files val archives = "file:/archive1,archive2" // spark.yarn.dist.archives val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles @@ -705,6 +708,68 @@ class SparkSubmitSuite } // scalastyle:on println + private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = { + if (sourcePath == outputPath) { + return + } + + val sourceUri = new URI(sourcePath) + val outputUri = new URI(outputPath) + assert(outputUri.getScheme === "file") + + // The path and filename are preserved. + assert(outputUri.getPath.endsWith(sourceUri.getPath)) + assert(FileUtils.readFileToString(new File(outputUri.getPath)) === + FileUtils.readFileToString(new File(sourceUri.getPath))) + } + + private def deleteTempOutputFile(outputPath: String): Unit = { + val outputFile = new File(new URI(outputPath).getPath) + if (outputFile.exists) { + outputFile.delete() + } + } + + test("downloadFile does not download local file") { + assert(SparkSubmit.downloadFile("/local/file", new Configuration()) === "/local/file") + } + + test("download one file to local") { + val jarFile = File.createTempFile("test", ".jar") + jarFile.deleteOnExit() + val content = "hello, world" + FileUtils.write(jarFile, content) + val hadoopConf = new Configuration() + // Set s3a implementation to local file system for testing. + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") + // Disable file system impl cache to make sure the test file system is picked up. + hadoopConf.set("fs.s3a.impl.disable.cache", "true") + val sourcePath = s"s3a://${jarFile.getAbsolutePath}" + val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf) + checkDownloadedFile(sourcePath, outputPath) + deleteTempOutputFile(outputPath) + } + + test("download list of files to local") { + val jarFile = File.createTempFile("test", ".jar") + jarFile.deleteOnExit() + val content = "hello, world" + FileUtils.write(jarFile, content) + val hadoopConf = new Configuration() + // Set s3a implementation to local file system for testing. + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") + // Disable file system impl cache to make sure the test file system is picked up. + hadoopConf.set("fs.s3a.impl.disable.cache", "true") + val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}") + val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",") + + assert(outputPaths.length === sourcePaths.length) + sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) => + checkDownloadedFile(sourcePath, outputPath) + deleteTempOutputFile(outputPath) + } + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -807,3 +872,10 @@ object UserClasspathFirstTest { } } } + +class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { + override def copyToLocalFile(src: Path, dst: Path): Unit = { + // Ignore the scheme for testing. + super.copyToLocalFile(new Path(src.toUri.getPath), dst) + } +} From 6e86290a149269b681f3aab3b32f2d829f9d41a1 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Tue, 23 May 2017 17:26:26 -0700 Subject: [PATCH 2/7] enable download for yarn and mesos as well --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4ec1a28d177f..5aefa85d7403 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -311,8 +311,8 @@ object SparkSubmit extends CommandLineUtils { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } - // In client mode of local/standalone cluster, download remotes files. - if ((clusterManager == STANDALONE || clusterManager == LOCAL) && deployMode == CLIENT) { + // In client mode, download remotes files. + if (deployMode == CLIENT) { val hadoopConf = new HadoopConfiguration() args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull From e5171caf0370951265a1a139a91a03532bbda657 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Thu, 25 May 2017 14:23:41 -0700 Subject: [PATCH 3/7] address --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5aefa85d7403..6dc677191add 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -311,7 +311,7 @@ object SparkSubmit extends CommandLineUtils { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } - // In client mode, download remotes files. + // In client mode, download remote files. if (deployMode == CLIENT) { val hadoopConf = new HadoopConfiguration() args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index f2d2e7dd2c9f..7af3d11276a0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -730,6 +730,16 @@ class SparkSubmitSuite } } + test("downloadFile - invalid url") { + intercept[IOException] { + SparkSubmit.downloadFile("abc:/local/file", new Configuration()) + } + } + + test("downloadFile does nothing for empty path") { + assert(SparkSubmit.downloadFile("", new Configuration()) === "") + } + test("downloadFile does not download local file") { assert(SparkSubmit.downloadFile("/local/file", new Configuration()) === "/local/file") } From 62e57df1039435c6b98dfc756ab54320dfbb627a Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Thu, 25 May 2017 14:54:37 -0700 Subject: [PATCH 4/7] add more tests --- .../apache/spark/deploy/SparkSubmitSuite.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 7af3d11276a0..6e9721c45931 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -732,15 +732,24 @@ class SparkSubmitSuite test("downloadFile - invalid url") { intercept[IOException] { - SparkSubmit.downloadFile("abc:/local/file", new Configuration()) + SparkSubmit.downloadFile("abc:/my/file", new Configuration()) } } - test("downloadFile does nothing for empty path") { - assert(SparkSubmit.downloadFile("", new Configuration()) === "") + test("downloadFile - file doesn't exist") { + val hadoopConf = new Configuration() + // Set s3a implementation to local file system for testing. + hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") + // Disable file system impl cache to make sure the test file system is picked up. + hadoopConf.set("fs.s3a.impl.disable.cache", "true") + intercept[FileNotFoundException] { + SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf) + } } test("downloadFile does not download local file") { + // empty path is considered as local file. + assert(SparkSubmit.downloadFile("", new Configuration()) === "") assert(SparkSubmit.downloadFile("/local/file", new Configuration()) === "/local/file") } From 7eb5d1a0f87e22902101f5caeb52c3620c315e18 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Thu, 25 May 2017 23:09:54 -0700 Subject: [PATCH 5/7] comments --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 6dc677191add..e433d69335f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -841,20 +841,22 @@ object SparkSubmit extends CommandLineUtils { /** * Download a list of remote files to temp local files. If the file is local, the original file * will be returned. - * @param fileList A comma separated file list, it cannot be null. + * @param fileList A comma separated file list. * @return A comma separated local files list. */ private[deploy] def downloadFileList( fileList: String, hadoopConf: HadoopConfiguration): String = { + require(fileList != null, "fileList cannot be null.") fileList.split(",").map(downloadFile(_, hadoopConf)).mkString(",") } /** - * Download remote file to a temporary local file. If the file is local, the original file - * will be returned. + * Download a file from the remote to a local temporary directory. If the input path points to + * a local path, returns it with no operation. */ private[deploy] def downloadFile(path: String, hadoopConf: HadoopConfiguration): String = { + require(path != null, "path cannot be null.") val uri = Utils.resolveURI(path) uri.getScheme match { case "file" | "local" => From 3f18b81fd781593525619da54daa0ee3dc9cc6fb Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Thu, 25 May 2017 23:34:38 -0700 Subject: [PATCH 6/7] remove url builder --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e433d69335f1..fe1fcccc6bc8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -23,7 +23,6 @@ import java.net.URL import java.nio.file.Files import java.security.PrivilegedExceptionAction import java.text.ParseException -import javax.ws.rs.core.UriBuilder import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -869,11 +868,7 @@ object SparkSubmit extends CommandLineUtils { printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.") // scalastyle:on println fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath)) - UriBuilder - .fromPath(tmpFile.getAbsolutePath) - .scheme("file") - .build() - .toString + s"file:${tmpFile.getAbsolutePath}" } } } From 2d6f2cdafb30a80ec441ef921238844ace2cd283 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Fri, 26 May 2017 08:34:29 -0700 Subject: [PATCH 7/7] resolveURI --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fe1fcccc6bc8..c60a2a1706d5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -868,7 +868,7 @@ object SparkSubmit extends CommandLineUtils { printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.") // scalastyle:on println fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath)) - s"file:${tmpFile.getAbsolutePath}" + Utils.resolveURI(tmpFile.getAbsolutePath).toString } } }