Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1313,11 +1313,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}
val uri = Utils.resolveURI(path)
val schemeCorrectedPath = uri.toString

val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
Expand Down Expand Up @@ -1346,8 +1343,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration, timestamp, useCache = false)
Utils.fetchFile(schemeCorrectedPath, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private[spark] object TestUtils {
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())

for (file <- files) {
val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString)
val jarEntry = new JarEntry(
Paths.get(directoryPrefix.getOrElse(""), file.getName).toString.replace("\\", "/"))
jarStream.putNextEntry(jarEntry)

val in = new FileInputStream(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ private[deploy] object RPackageUtils extends Logging {
try {
filesToBundle.foreach { file =>
// get the relative paths for proper naming in the zip file
val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
val relPath = Utils.normalizePath(file.getAbsolutePath)
.replaceFirst(Utils.normalizePath(dir.getAbsolutePath), "")
val fis = new FileInputStream(file)
val zipEntry = new ZipEntry(relPath)
zipOutputStream.putNextEntry(zipEntry)
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,24 @@ private[spark] object Utils extends Logging {
}
}

/**
* Normalize the local path for windows. If the path is absolute (starts with drive label)
* it will append a leading slash. The back slashes will be replaced with slash
* @param path the path to be normalized
* @return the normalized path
*/
def normalizePath(path: String): String = {
if (isWindows) {
if (path.matches("""^[A-Za-z]:[/\\].*$""")) {
"/" + path.replace("\\", "/")
} else {
path.replace("\\", "/")
}
} else {
path
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand All @@ -1832,7 +1850,15 @@ private[spark] object Utils extends Logging {
*/
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
// append slash if the path starts with drive label on windows
val escapedPath =
if (Utils.isWindows && path.matches("""^[A-Za-z]:[/\\].*$""")) {
"/" + path // windows path like D:/ will be recognize as schema D
} else {
path
}

val uri = new URI(escapedPath)
if (uri.getScheme() != null) {
return uri
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.StatCounter;
import org.apache.spark.util.Utils;

// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
Expand Down Expand Up @@ -1071,7 +1072,7 @@ public void wholeTextFiles() throws Exception {
byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);

String tempDirName = tempDir.getAbsolutePath();
String tempDirName = Utils.normalizePath(tempDir.getAbsolutePath());
Files.write(content1, new File(tempDirName + "/part-00000"));
Files.write(content2, new File(tempDirName + "/part-00001"));

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test("binary file input as byte array") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()
val outFileName = Utils.normalizePath(outFile.getAbsolutePath())

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
Expand Down Expand Up @@ -516,7 +516,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

test("Get input files via old Hadoop API") {
sc = new SparkContext("local", "test")
val outDir = new File(tempDir, "output").getAbsolutePath
val outDir = Utils.normalizePath(new File(tempDir, "output").getAbsolutePath)
sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)

val inputPaths =
Expand All @@ -530,7 +530,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

test("Get input files via new Hadoop API") {
sc = new SparkContext("local", "test")
val outDir = new File(tempDir, "output").getAbsolutePath
val outDir = Utils.normalizePath(new File(tempDir, "output").getAbsolutePath)
sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)

val inputPaths =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[deploy] object IvyTestUtils {
|}
""".stripMargin
val sourceFile =
new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents)
new JavaSourceFromString(new File(dir, className).toURI.toString, contents)
createCompiledClass(className, dir, sourceFile, Seq.empty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
pw.close()

// Path to tmpFile
tmpFilePath = "file://" + tmpFile.getAbsolutePath
tmpFilePath = Utils.resolveURI(tmpFile.getAbsolutePath).toString
}

after {
Expand Down Expand Up @@ -181,7 +181,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.textFile(tmpFilePath, 4)
.map(key => (key, 1))
.reduceByKey(_ + _)
.saveAsTextFile("file://" + tmpFile.getAbsolutePath)
.saveAsTextFile(Utils.resolveURI(tmpFile.getAbsolutePath).toString)

sc.listenerBus.waitUntilEmpty(500)
assert(inputRead == numRecords)
Expand All @@ -197,7 +197,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val numPartitions = 2
val cartVector = 0 to 9
val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
val cartFilePath = "file://" + cartFile.getAbsolutePath
val cartFilePath = Utils.resolveURI(cartFile.getAbsolutePath).toString

// write files to disk so we can read them later.
sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
Expand Down Expand Up @@ -265,7 +265,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
// Only supported on newer Hadoop
if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
val file = new File(tmpDir, getClass.getSimpleName)
val filePath = "file://" + file.getAbsolutePath
val filePath = Utils.resolveURI(file.getAbsolutePath).toString

val records = runAndReturnRecordsWritten {
sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
Expand All @@ -278,7 +278,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
// Only supported on newer Hadoop
if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
val file = new File(tmpDir, getClass.getSimpleName)
val filePath = "file://" + file.getAbsolutePath
val filePath = Utils.resolveURI(file.getAbsolutePath).toString

val records = runAndReturnRecordsWritten {
sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,38 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}

test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
if (!Utils.isWindows) {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
} else {
// windows needs drive label
// without compression
assert(s"file:/D:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("D:/base-dir"), "app1", None))
// with compression
assert(s"file:/D:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/D:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"),
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/D:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("D:/base-dir"),
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
}

/* ----------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "repro/")
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, "MyException").getAbsolutePath,
val excSource = new JavaSourceFromString(Utils.resolveURI(
new File(srcDir, "MyException").getAbsolutePath).toString,
"""package repro;
|
|public class MyException extends Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.nio.ByteBuffer
import java.util.Arrays
import java.util.Collection
Expand All @@ -36,9 +37,9 @@ import org.scalatest.mock.MockitoSugar

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, TaskDescription, TaskSchedulerImpl, WorkerOffer}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not remove newlines between imports, (See databricks/scala-style-guide#imports)

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils

class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {

Expand Down Expand Up @@ -111,7 +112,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home", "/mesos-home")
conf.set("spark.mesos.executor.home",
if (Utils.isWindows) "D:\\mesos-home" else "/mesos-home")

val listenerBus = mock[LiveListenerBus]
listenerBus.post(
Expand All @@ -134,9 +136,15 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
mesosSchedulerBackend.createResource("mem", 1024))
// uri is null.
val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
val expectedSparkClass =
if (Utils.isWindows) {
"D:\\mesos-home\\bin\\spark-class"
} else {
"/mesos-home/bin/spark-class"
}
assert(executorInfo.getCommand.getValue ===
s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")

" " + new File(s"$expectedSparkClass ${classOf[MesosExecutorBackend].getName}")
.getAbsolutePath)
// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
if (Utils.isWindows) {
// It seems "#" is not valid for a local file path, and it will be escaped by the File.toURI
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
data.write.parquet(dir.getCanonicalPath)
val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name())
.head.getString(0)
assert(answer.contains(dir.getCanonicalPath))
assert(answer.contains(dir.getCanonicalPath.replace("\\", "/")))

checkAnswer(data.select(input_file_name()).limit(1), Row(""))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
data.write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("test_table")
val answer = sql("select input_file_name() from test_table").head().getString(0)
assert(answer.contains(dir.getCanonicalPath))
assert(answer.contains(dir.getCanonicalPath.replace("\\", "/")))
assert(sql("select input_file_name() from test_table").distinct().collect().length >= 2)
sqlContext.dropTempTable("test_table")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
{ fileInputDStream.batchTimeToSelectedFiles.values.flatten }
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
// on both win and *nix, the separator will be normalized to slash (/)
// Using File.separator will cause problem on windows.
filenames.map(_.split("/").last.toInt).toSeq.sorted
}

try {
Expand Down