From 6e9f069b20018fdf0ec95ff7bca0910d73e79480 Mon Sep 17 00:00:00 2001 From: trystanleftwich Date: Tue, 3 Mar 2015 17:20:35 -0800 Subject: [PATCH 1/2] [SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail --- .../src/main/scala/org/apache/spark/util/Utils.scala | 10 ++++++---- .../scala/org/apache/spark/util/UtilsSuite.scala | 12 ++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index df21ed37e76b..b07d269de501 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -617,7 +617,8 @@ private[spark] object Utils extends Logging { case _ => val fs = getHadoopFileSystem(uri, hadoopConf) val path = new Path(uri) - fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite) + fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite, + Some(filename)) } } @@ -632,8 +633,9 @@ private[spark] object Utils extends Logging { fs: FileSystem, conf: SparkConf, hadoopConf: Configuration, - fileOverwrite: Boolean): Unit = { - if (!targetDir.mkdir()) { + fileOverwrite: Boolean, + filename: Option[String] = None): Unit = { + if (!targetDir.exists() && !targetDir.mkdir()) { throw new IOException(s"Failed to create directory ${targetDir.getPath}") } fs.listStatus(path).foreach { fileStatus => @@ -643,7 +645,7 @@ private[spark] object Utils extends Logging { fileOverwrite) } else { val in = fs.open(innerPath) - val targetFile = new File(targetDir, innerPath.getName) + val targetFile = new File(targetDir, filename.getOrElse(innerPath.getName)) downloadFile(innerPath.toString, in, targetFile, fileOverwrite) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fe2b64425115..aa0970f79934 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -390,6 +390,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val innerTempDir = Utils.createTempDir(tempDir.getPath) val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir) val targetDir = new File("target-dir") + val testFileDir = new File("test-filename") Files.write("some text", tempFile, UTF_8) try { @@ -399,6 +400,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) assert(targetDir.exists()) assert(targetDir.isDirectory()) + // Testing to make sure it doesn't error if the dir already exists + Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) val newInnerDir = new File(targetDir, innerTempDir.getName) println("inner temp dir: " + innerTempDir.getName) targetDir.listFiles().map(_.getName).foreach(println) @@ -407,9 +410,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val newInnerFile = new File(newInnerDir, tempFile.getName) assert(newInnerFile.exists()) assert(newInnerFile.isFile()) + val filePath = new Path("file://" + tempFile.getAbsolutePath) + val testFileName = "testFName" + val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) + Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(), + conf, false, Some(testFileName)) + val newFileName = new File(testFileDir, testFileName) + assert(newFileName.exists()) + assert(newFileName.isFile()) } finally { Utils.deleteRecursively(tempDir) Utils.deleteRecursively(targetDir) + Utils.deleteRecursively(testFileDir) } } } From e5b7f0a8105d7bf0ac00c7a29396cee761eaa8de Mon Sep 17 00:00:00 2001 From: trystanleftwich Date: Wed, 4 Mar 2015 10:47:37 -0800 Subject: [PATCH 2/2] [SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail --- .../scala/org/apache/spark/util/Utils.scala | 24 +++++--- .../org/apache/spark/util/UtilsSuite.scala | 56 +++++++++++++++++-- 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b07d269de501..8ef52fa40fb1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -638,16 +638,22 @@ private[spark] object Utils extends Logging { if (!targetDir.exists() && !targetDir.mkdir()) { throw new IOException(s"Failed to create directory ${targetDir.getPath}") } - fs.listStatus(path).foreach { fileStatus => - val innerPath = fileStatus.getPath - if (fileStatus.isDir) { - fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf, - fileOverwrite) - } else { - val in = fs.open(innerPath) - val targetFile = new File(targetDir, filename.getOrElse(innerPath.getName)) - downloadFile(innerPath.toString, in, targetFile, fileOverwrite) + if (fs.isDirectory(path)) { + fs.listStatus(path).foreach { fileStatus => + val innerPath = fileStatus.getPath + if (fileStatus.isDir) { + fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf, + fileOverwrite) + } else { + val in = fs.open(innerPath) + val targetFile = new File(targetDir, innerPath.getName) + downloadFile(innerPath.toString, in, targetFile, fileOverwrite) + } } + } else { + val in = fs.open(path) + val targetFile = new File(targetDir, filename.getOrElse(path.getName)) + downloadFile(path.toString, in, targetFile, fileOverwrite) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index aa0970f79934..765dbab0c098 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -389,19 +389,30 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val tempDir = Utils.createTempDir() val innerTempDir = Utils.createTempDir(tempDir.getPath) val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir) + val tempFile1 = File.createTempFile("someprefix1", "somesuffix1", innerTempDir) + val tempFile2 = File.createTempFile("someprefix2", "somesuffix2", innerTempDir) + val tempFile3 = File.createTempFile("someprefix3", "somesuffix3", tempDir) val targetDir = new File("target-dir") - val testFileDir = new File("test-filename") + val renameTargetDir = new File("rename-target-dir") + val fileTargetDir = new File("file-target-dir") + val multiFileTargetDir = new File("multi-file-target-dir") Files.write("some text", tempFile, UTF_8) + Files.write("some text", tempFile1, UTF_8) + Files.write("some text", tempFile2, UTF_8) try { val path = new Path("file://" + tempDir.getAbsolutePath) val conf = new Configuration() val fs = Utils.getHadoopFileSystem(path.toString, conf) + // Testing subdirs are copied across Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) assert(targetDir.exists()) assert(targetDir.isDirectory()) // Testing to make sure it doesn't error if the dir already exists Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) + val newTempFile = new File(targetDir, tempFile3.getName) + assert(newTempFile.exists()) + assert(newTempFile.isFile()) val newInnerDir = new File(targetDir, innerTempDir.getName) println("inner temp dir: " + innerTempDir.getName) targetDir.listFiles().map(_.getName).foreach(println) @@ -410,18 +421,51 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val newInnerFile = new File(newInnerDir, tempFile.getName) assert(newInnerFile.exists()) assert(newInnerFile.isFile()) + + + + // Testing that you can copy a single file over val filePath = new Path("file://" + tempFile.getAbsolutePath) - val testFileName = "testFName" val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) - Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(), - conf, false, Some(testFileName)) - val newFileName = new File(testFileDir, testFileName) + Utils.fetchHcfsFile(filePath, fileTargetDir, fs, new SparkConf(), + conf, false) + val newFile = new File(fileTargetDir, tempFile.getName) + assert(newFile.exists()) + assert(newFile.isFile()) + + // Testing that when copying a single file you can rename it + val testFileName = "testFName" + Utils.fetchHcfsFile(filePath, renameTargetDir, testFilefs, + new SparkConf(), conf, false, Some(testFileName)) + val newFileName = new File(renameTargetDir, testFileName) assert(newFileName.exists()) assert(newFileName.isFile()) + + // Testing that you can copy a dir with files in it and the filenames + // will be correct + val dirPath = new Path("file://" + innerTempDir.getAbsolutePath) + val testDirfs = Utils.getHadoopFileSystem(dirPath.toString, conf) + Utils.fetchHcfsFile(dirPath, multiFileTargetDir, testDirfs, new SparkConf(), + conf, false, Some(testFileName)) + var newTmpFile1 = new File(multiFileTargetDir, tempFile.getName) + assert(newTmpFile1.exists()) + assert(newTmpFile1.isFile()) + var newTmpFile2 = new File(multiFileTargetDir, tempFile1.getName) + assert(newTmpFile2.exists()) + assert(newTmpFile2.isFile()) + var newTmpFile3 = new File(multiFileTargetDir, tempFile2.getName) + assert(newTmpFile3.exists()) + assert(newTmpFile3.isFile()) + // Testing that a combination of files and dirs within a dir copies correctly + + + } finally { Utils.deleteRecursively(tempDir) Utils.deleteRecursively(targetDir) - Utils.deleteRecursively(testFileDir) + Utils.deleteRecursively(renameTargetDir) + Utils.deleteRecursively(fileTargetDir) + Utils.deleteRecursively(multiFileTargetDir) } } }