diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index df21ed37e76b..8ef52fa40fb1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -617,7 +617,8 @@ private[spark] object Utils extends Logging { case _ => val fs = getHadoopFileSystem(uri, hadoopConf) val path = new Path(uri) - fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite) + fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite, + Some(filename)) } } @@ -632,20 +633,27 @@ private[spark] object Utils extends Logging { fs: FileSystem, conf: SparkConf, hadoopConf: Configuration, - fileOverwrite: Boolean): Unit = { - if (!targetDir.mkdir()) { + fileOverwrite: Boolean, + filename: Option[String] = None): Unit = { + if (!targetDir.exists() && !targetDir.mkdir()) { throw new IOException(s"Failed to create directory ${targetDir.getPath}") } - fs.listStatus(path).foreach { fileStatus => - val innerPath = fileStatus.getPath - if (fileStatus.isDir) { - fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf, - fileOverwrite) - } else { - val in = fs.open(innerPath) - val targetFile = new File(targetDir, innerPath.getName) - downloadFile(innerPath.toString, in, targetFile, fileOverwrite) + if (fs.isDirectory(path)) { + fs.listStatus(path).foreach { fileStatus => + val innerPath = fileStatus.getPath + if (fileStatus.isDir) { + fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf, + fileOverwrite) + } else { + val in = fs.open(innerPath) + val targetFile = new File(targetDir, innerPath.getName) + downloadFile(innerPath.toString, in, targetFile, fileOverwrite) + } } + } else { + val in = fs.open(path) + val targetFile = new File(targetDir, filename.getOrElse(path.getName)) + downloadFile(path.toString, in, targetFile, fileOverwrite) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fe2b64425115..765dbab0c098 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -389,16 +389,30 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val tempDir = Utils.createTempDir() val innerTempDir = Utils.createTempDir(tempDir.getPath) val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir) + val tempFile1 = File.createTempFile("someprefix1", "somesuffix1", innerTempDir) + val tempFile2 = File.createTempFile("someprefix2", "somesuffix2", innerTempDir) + val tempFile3 = File.createTempFile("someprefix3", "somesuffix3", tempDir) val targetDir = new File("target-dir") + val renameTargetDir = new File("rename-target-dir") + val fileTargetDir = new File("file-target-dir") + val multiFileTargetDir = new File("multi-file-target-dir") Files.write("some text", tempFile, UTF_8) + Files.write("some text", tempFile1, UTF_8) + Files.write("some text", tempFile2, UTF_8) try { val path = new Path("file://" + tempDir.getAbsolutePath) val conf = new Configuration() val fs = Utils.getHadoopFileSystem(path.toString, conf) + // Testing subdirs are copied across Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) assert(targetDir.exists()) assert(targetDir.isDirectory()) + // Testing to make sure it doesn't error if the dir already exists + Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) + val newTempFile = new File(targetDir, tempFile3.getName) + assert(newTempFile.exists()) + assert(newTempFile.isFile()) val newInnerDir = new File(targetDir, innerTempDir.getName) println("inner temp dir: " + innerTempDir.getName) targetDir.listFiles().map(_.getName).foreach(println) @@ -407,9 +421,51 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { val newInnerFile = new File(newInnerDir, tempFile.getName) assert(newInnerFile.exists()) assert(newInnerFile.isFile()) + + + + // Testing that you can copy a single file over + val filePath = new Path("file://" + tempFile.getAbsolutePath) + val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) + Utils.fetchHcfsFile(filePath, fileTargetDir, fs, new SparkConf(), + conf, false) + val newFile = new File(fileTargetDir, tempFile.getName) + assert(newFile.exists()) + assert(newFile.isFile()) + + // Testing that when copying a single file you can rename it + val testFileName = "testFName" + Utils.fetchHcfsFile(filePath, renameTargetDir, testFilefs, + new SparkConf(), conf, false, Some(testFileName)) + val newFileName = new File(renameTargetDir, testFileName) + assert(newFileName.exists()) + assert(newFileName.isFile()) + + // Testing that you can copy a dir with files in it and the filenames + // will be correct + val dirPath = new Path("file://" + innerTempDir.getAbsolutePath) + val testDirfs = Utils.getHadoopFileSystem(dirPath.toString, conf) + Utils.fetchHcfsFile(dirPath, multiFileTargetDir, testDirfs, new SparkConf(), + conf, false, Some(testFileName)) + var newTmpFile1 = new File(multiFileTargetDir, tempFile.getName) + assert(newTmpFile1.exists()) + assert(newTmpFile1.isFile()) + var newTmpFile2 = new File(multiFileTargetDir, tempFile1.getName) + assert(newTmpFile2.exists()) + assert(newTmpFile2.isFile()) + var newTmpFile3 = new File(multiFileTargetDir, tempFile2.getName) + assert(newTmpFile3.exists()) + assert(newTmpFile3.isFile()) + // Testing that a combination of files and dirs within a dir copies correctly + + + } finally { Utils.deleteRecursively(tempDir) Utils.deleteRecursively(targetDir) + Utils.deleteRecursively(renameTargetDir) + Utils.deleteRecursively(fileTargetDir) + Utils.deleteRecursively(multiFileTargetDir) } } }