Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def env: SparkEnv = _env

// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
Expand Down Expand Up @@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration, timestamp, useCache = false)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is putIfAbsent correct here? It won't update the timestamp when you call addFile for the same file again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see you're actually changing the behavior from 1.x instead of restoring it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is actually intentional. I experimented with implementing the 1.x behavior and started writing some tests to verify that newer versions of files took precedence over old ones but then discovered that Spark executors will crash with exceptions if they've downloaded a file with a given name and the new file's contents don't match the old file. Given this behavior it seems that updating the timestamp will work only if the new file has the same contents as the old file (in which case it doesn't matter what we do with the timestamp) or if all executors are replaced before running any further tasks (which seems like an obscure use-case that we don't want to optimize for).

logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this line is unnecessary now that runLocally was removed a few releases ago. However, I suppose that we might need it in order to handle the somewhat obscure corner-case where a user's reduce function / closure executes on the driver and accesses the SparkFiles object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, turns out this is needed in the Python tests so I added it back.

Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
}
}

/**
Expand Down Expand Up @@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
// A JAR file which exists locally on every worker node
Expand All @@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
if (key != null) {
addedJars(key) = System.currentTimeMillis
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
val timestamp = System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
}
}
}
postEnvironmentUpdate()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
}

override def addFile(file: File): String = {
require(files.putIfAbsent(file.getName(), file) == null,
s"File ${file.getName()} already registered.")
val existingPath = files.putIfAbsent(file.getName, file)
require(existingPath == null || existingPath == file,
s"File ${file.getName} was already registered with a different path " +
s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

override def addJar(file: File): String = {
require(jars.putIfAbsent(file.getName(), file) == null,
s"JAR ${file.getName()} already registered.")
val existingPath = jars.putIfAbsent(file.getName, file)
require(existingPath == null || existingPath == file,
s"File ${file.getName} was already registered with a different path " +
s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import java.util.Properties

import scala.collection.mutable
import scala.collection.mutable.HashMap

import org.apache.spark._
Expand Down Expand Up @@ -198,8 +199,8 @@ private[spark] object Task {
*/
def serializeWithDependencies(
task: Task[_],
currentFiles: HashMap[String, Long],
currentJars: HashMap[String, Long],
currentFiles: mutable.Map[String, Long],
currentJars: mutable.Map[String, Long],
serializer: SerializerInstance)
: ByteBuffer = {

Expand Down
51 changes: 51 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("cannot call addFile with different paths that have the same filename") {
val dir = Utils.createTempDir()
try {
val subdir1 = new File(dir, "subdir1")
val subdir2 = new File(dir, "subdir2")
assert(subdir1.mkdir())
assert(subdir2.mkdir())
val file1 = new File(subdir1, "file")
val file2 = new File(subdir2, "file")
Files.write("old", file1, StandardCharsets.UTF_8)
Files.write("new", file2, StandardCharsets.UTF_8)
sc = new SparkContext("local-cluster[1,1,1024]", "test")
sc.addFile(file1.getAbsolutePath)
def getAddedFileContents(): String = {
sc.parallelize(Seq(0)).map { _ =>
scala.io.Source.fromFile(SparkFiles.get("file")).mkString
}.first()
}
assert(getAddedFileContents() === "old")
intercept[IllegalArgumentException] {
sc.addFile(file2.getAbsolutePath)
}
assert(getAddedFileContents() === "old")
} finally {
Utils.deleteRecursively(dir)
}
}

// Regression tests for SPARK-16787
for (
schedulingMode <- Seq("local-mode", "non-local-mode");
method <- Seq("addJar", "addFile")
) {
val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
val master = schedulingMode match {
case "local-mode" => "local"
case "non-local-mode" => "local-cluster[1,1,1024]"
}
test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
sc = new SparkContext(master, "test")
method match {
case "addJar" =>
sc.addJar(jarPath)
sc.addJar(jarPath)
case "addFile" =>
sc.addFile(jarPath)
sc.addFile(jarPath)
}
}
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Expand Down