diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e7eabd289699..c28414e05811 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1313,11 +1313,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { - val uri = new URI(path) - val schemeCorrectedPath = uri.getScheme match { - case null | "local" => new File(path).getCanonicalFile.toURI.toString - case _ => path - } + val uri = Utils.resolveURI(path) + val schemeCorrectedPath = uri.toString val hadoopPath = new Path(schemeCorrectedPath) val scheme = new URI(schemeCorrectedPath).getScheme @@ -1346,8 +1343,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration, timestamp, useCache = false) + Utils.fetchFile(schemeCorrectedPath, new File(SparkFiles.getRootDirectory()), conf, + env.securityManager, hadoopConfiguration, timestamp, useCache = false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b258f2f..b174969df79a 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -92,7 +92,8 @@ private[spark] object TestUtils { val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) for (file <- files) { - val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString) + val jarEntry = new JarEntry( + Paths.get(directoryPrefix.getOrElse(""), file.getName).toString.replace("\\", "/")) jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 3d2cabcdfdd5..1fa7f0dd47c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -232,7 +232,8 @@ private[deploy] object RPackageUtils extends Logging { try { filesToBundle.foreach { file => // get the relative paths for proper naming in the zip file - val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "") + val relPath = Utils.normalizePath(file.getAbsolutePath) + .replaceFirst(Utils.normalizePath(dir.getAbsolutePath), "") val fis = new FileInputStream(file) val zipEntry = new ZipEntry(relPath) zipOutputStream.putNextEntry(zipEntry) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ea49991493fd..df69544b6290 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1824,6 +1824,24 @@ private[spark] object Utils extends Logging { } } + /** + * Normalize the local path for windows. If the path is absolute (starts with drive label) + * it will append a leading slash. The back slashes will be replaced with slash + * @param path the path to be normalized + * @return the normalized path + */ + def normalizePath(path: String): String = { + if (isWindows) { + if (path.matches("""^[A-Za-z]:[/\\].*$""")) { + "/" + path.replace("\\", "/") + } else { + path.replace("\\", "/") + } + } else { + path + } + } + /** * Return a well-formed URI for the file described by a user input string. * @@ -1832,7 +1850,15 @@ private[spark] object Utils extends Logging { */ def resolveURI(path: String): URI = { try { - val uri = new URI(path) + // append slash if the path starts with drive label on windows + val escapedPath = + if (Utils.isWindows && path.matches("""^[A-Za-z]:[/\\].*$""")) { + "/" + path // windows path like D:/ will be recognize as schema D + } else { + path + } + + val uri = new URI(escapedPath) if (uri.getScheme() != null) { return uri } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 04f92d60167d..da0b170b228f 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -71,6 +71,7 @@ import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.StatCounter; +import org.apache.spark.util.Utils; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -1071,7 +1072,7 @@ public void wholeTextFiles() throws Exception { byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8); - String tempDirName = tempDir.getAbsolutePath(); + String tempDirName = Utils.normalizePath(tempDir.getAbsolutePath()); Files.write(content1, new File(tempDirName + "/part-00000")); Files.write(content2, new File(tempDirName + "/part-00001")); diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 993834f8d7d4..895bf030aa33 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -232,7 +232,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test("binary file input as byte array") { sc = new SparkContext("local", "test") val outFile = new File(tempDir, "record-bytestream-00000.bin") - val outFileName = outFile.getAbsolutePath() + val outFileName = Utils.normalizePath(outFile.getAbsolutePath()) // create file val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) @@ -516,7 +516,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test("Get input files via old Hadoop API") { sc = new SparkContext("local", "test") - val outDir = new File(tempDir, "output").getAbsolutePath + val outDir = Utils.normalizePath(new File(tempDir, "output").getAbsolutePath) sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir) val inputPaths = @@ -530,7 +530,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test("Get input files via new Hadoop API") { sc = new SparkContext("local", "test") - val outDir = new File(tempDir, "output").getAbsolutePath + val outDir = Utils.normalizePath(new File(tempDir, "output").getAbsolutePath) sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir) val inputPaths = diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 9ecf49b59898..cca862cd58ed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -142,7 +142,7 @@ private[deploy] object IvyTestUtils { |} """.stripMargin val sourceFile = - new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents) + new JavaSourceFromString(new File(dir, className).toURI.toString, contents) createCompiledClass(className, dir, sourceFile, Seq.empty) } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index f8054f5fd770..b6bf038f8b47 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext pw.close() // Path to tmpFile - tmpFilePath = "file://" + tmpFile.getAbsolutePath + tmpFilePath = Utils.resolveURI(tmpFile.getAbsolutePath).toString } after { @@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext sc.textFile(tmpFilePath, 4) .map(key => (key, 1)) .reduceByKey(_ + _) - .saveAsTextFile("file://" + tmpFile.getAbsolutePath) + .saveAsTextFile(Utils.resolveURI(tmpFile.getAbsolutePath).toString) sc.listenerBus.waitUntilEmpty(500) assert(inputRead == numRecords) @@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext val numPartitions = 2 val cartVector = 0 to 9 val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt") - val cartFilePath = "file://" + cartFile.getAbsolutePath + val cartFilePath = Utils.resolveURI(cartFile.getAbsolutePath).toString // write files to disk so we can read them later. sc.parallelize(cartVector).saveAsTextFile(cartFilePath) @@ -265,7 +265,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext // Only supported on newer Hadoop if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + val filePath = Utils.resolveURI(file.getAbsolutePath).toString val records = runAndReturnRecordsWritten { sc.parallelize(1 to numRecords).saveAsTextFile(filePath) @@ -278,7 +278,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext // Only supported on newer Hadoop if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + val filePath = Utils.resolveURI(file.getAbsolutePath).toString val records = runAndReturnRecordsWritten { sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 176d8930aad1..97ea8455e2a4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -107,20 +107,38 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } test("Event log name") { - // without compression - assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( - Utils.resolveURI("/base-dir"), "app1", None)) - // with compression - assert(s"file:/base-dir/app1.lzf" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) - // illegal characters in app ID - assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), - "a fine:mind$dollar{bills}.1", None)) - // illegal characters in app ID with compression - assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), - "a fine:mind$dollar{bills}.1", None, Some("lz4"))) + if (!Utils.isWindows) { + // without compression + assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( + Utils.resolveURI("/base-dir"), "app1", None)) + // with compression + assert(s"file:/base-dir/app1.lzf" === + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf"))) + // illegal characters in app ID + assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + "a fine:mind$dollar{bills}.1", None)) + // illegal characters in app ID with compression + assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), + "a fine:mind$dollar{bills}.1", None, Some("lz4"))) + } else { + // windows needs drive label + // without compression + assert(s"file:/D:/base-dir/app1" === EventLoggingListener.getLogPath( + Utils.resolveURI("D:/base-dir"), "app1", None)) + // with compression + assert(s"file:/D:/base-dir/app1.lzf" === + EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"), "app1", None, Some("lzf"))) + // illegal characters in app ID + assert(s"file:/D:/base-dir/a-fine-mind_dollar_bills__1" === + EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"), + "a fine:mind$dollar{bills}.1", None)) + // illegal characters in app ID with compression + assert(s"file:/D:/base-dir/a-fine-mind_dollar_bills__1.lz4" === + EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"), + "a fine:mind$dollar{bills}.1", None, Some("lz4"))) + } } /* ----------------- * diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index b5385c11a926..5a120b12e1fb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -171,7 +171,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val tempDir = Utils.createTempDir() val srcDir = new File(tempDir, "repro/") srcDir.mkdirs() - val excSource = new JavaSourceFromString(new File(srcDir, "MyException").getAbsolutePath, + val excSource = new JavaSourceFromString(Utils.resolveURI( + new File(srcDir, "MyException").getAbsolutePath).toString, """package repro; | |public class MyException extends Exception { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 7d6b7bde6825..12d969595605 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos +import java.io.File import java.nio.ByteBuffer import java.util.Arrays import java.util.Collection @@ -36,9 +37,9 @@ import org.scalatest.mock.MockitoSugar import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, - TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.util.Utils class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { @@ -111,7 +112,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi test("check spark-class location correctly") { val conf = new SparkConf - conf.set("spark.mesos.executor.home", "/mesos-home") + conf.set("spark.mesos.executor.home", + if (Utils.isWindows) "D:\\mesos-home" else "/mesos-home") val listenerBus = mock[LiveListenerBus] listenerBus.post( @@ -134,9 +136,15 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi mesosSchedulerBackend.createResource("mem", 1024)) // uri is null. val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + val expectedSparkClass = + if (Utils.isWindows) { + "D:\\mesos-home\\bin\\spark-class" + } else { + "/mesos-home/bin/spark-class" + } assert(executorInfo.getCommand.getValue === - s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") - + " " + new File(s"$expectedSparkClass ${classOf[MesosExecutorBackend].getName}") + .getAbsolutePath) // uri exists. conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 4aa4854c36f3..36d98f6f125e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -416,8 +416,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6") if (Utils.isWindows) { + // It seems "#" is not valid for a local file path, and it will be escaped by the File.toURI assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 19fe29a202a6..964b124df9ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -598,7 +598,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { data.write.parquet(dir.getCanonicalPath) val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name()) .head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) + assert(answer.contains(dir.getCanonicalPath.replace("\\", "/"))) checkAnswer(data.select(input_file_name()).limit(1), Row("")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index ec950332c5f6..82746a64506e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -64,7 +64,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { data.write.parquet(dir.getCanonicalPath) sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("test_table") val answer = sql("select input_file_name() from test_table").head().getString(0) - assert(answer.contains(dir.getCanonicalPath)) + assert(answer.contains(dir.getCanonicalPath.replace("\\", "/"))) assert(sql("select input_file_name() from test_table").distinct().collect().length >= 2) sqlContext.dropTempTable("test_table") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index bdbac64b9bc7..a5d6912cdd79 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -627,7 +627,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized { fileInputDStream.batchTimeToSelectedFiles.values.flatten } - filenames.map(_.split(File.separator).last.toInt).toSeq.sorted + // on both win and *nix, the separator will be normalized to slash (/) + // Using File.separator will cause problem on windows. + filenames.map(_.split("/").last.toInt).toSeq.sorted } try {