From 99d9855c109233bbeb4a3501041e4ecf4825c278 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 28 Jul 2016 18:36:24 -0700 Subject: [PATCH 1/6] Add failing regression test. --- .../org/apache/spark/SparkContextSuite.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 4fa3cab18184..fba6c11f5db3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -216,6 +216,29 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } + // Regression tests for SPARK-16787 + for ( + schedulingMode <- Seq("local-mode", "non-local-mode"); + method <- Seq("addJar", "addFile") + ) { + val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString + val master = schedulingMode match { + case "local-mode" => "local" + case "non-local-mode" => "local-cluster[1,1,1024]" + } + test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") { + sc = new SparkContext(master, "test") + method match { + case "addJar" => + sc.addJar(jarPath) + sc.addJar(jarPath) + case "addFile" => + sc.addFile(jarPath) + sc.addFile(jarPath) + } + } + } + test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) From 3ecdb88f52fa21dc3b8c89b27a82a33e6d2d937d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 28 Jul 2016 18:36:34 -0700 Subject: [PATCH 2/6] Remove catch case which masked error for addJar. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d48e2b420d71..1a50afeaa00d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli case exc: FileNotFoundException => logError(s"Jar not found at $path") null - case e: Exception => - // For now just log an error but allow to go through so spark examples work. - // The spark examples don't really need the jar distributed since its also - // the app jar. - logError("Error adding jar (" + e + "), was the --addJars option used?") - null } } // A JAR file which exists locally on every worker node From c412f991c145cf02affd7c2d38de016a9b7f548d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 28 Jul 2016 18:56:49 -0700 Subject: [PATCH 3/6] Fix bug. --- .../spark/rpc/netty/NettyStreamManager.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index afcb023a99da..728c4c74ca87 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.rpc.netty import java.io.File import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.server.StreamManager import org.apache.spark.rpc.RpcEnvFileServer @@ -37,7 +38,7 @@ import org.apache.spark.util.Utils * Only streaming (openStream) is supported. */ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) - extends StreamManager with RpcEnvFileServer { + extends StreamManager with RpcEnvFileServer with Logging { private val files = new ConcurrentHashMap[String, File]() private val jars = new ConcurrentHashMap[String, File]() @@ -66,15 +67,21 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) } override def addFile(file: File): String = { - require(files.putIfAbsent(file.getName(), file) == null, - s"File ${file.getName()} already registered.") + val oldFile = files.put(file.getName, file) + if (oldFile != file) { + logInfo( + s"File ${file.getName} will now be served from $file (was previously served from $oldFile)") + } s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } - override def addJar(file: File): String = { - require(jars.putIfAbsent(file.getName(), file) == null, - s"JAR ${file.getName()} already registered.") - s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}" + override def addJar(jar: File): String = { + val oldJar = jars.put(jar.getName, jar) + if (oldJar != jar) { + logInfo( + s"JAR ${jar.getName} will now be served from $jar (was previously served from $oldJar)") + } + s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(jar.getName())}" } override def addDirectory(baseUri: String, path: File): String = { From b98d1492ec6a6cea4e5a8cca8e1e23fee2c120e9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 28 Jul 2016 19:54:34 -0700 Subject: [PATCH 4/6] Add back require but weaken it to only accept identical paths. --- .../spark/rpc/netty/NettyStreamManager.scala | 25 ++++++++--------- .../org/apache/spark/SparkContextSuite.scala | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 728c4c74ca87..780fadd5bda8 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -19,7 +19,6 @@ package org.apache.spark.rpc.netty import java.io.File import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.server.StreamManager import org.apache.spark.rpc.RpcEnvFileServer @@ -38,7 +37,7 @@ import org.apache.spark.util.Utils * Only streaming (openStream) is supported. */ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) - extends StreamManager with RpcEnvFileServer with Logging { + extends StreamManager with RpcEnvFileServer { private val files = new ConcurrentHashMap[String, File]() private val jars = new ConcurrentHashMap[String, File]() @@ -67,21 +66,19 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) } override def addFile(file: File): String = { - val oldFile = files.put(file.getName, file) - if (oldFile != file) { - logInfo( - s"File ${file.getName} will now be served from $file (was previously served from $oldFile)") - } + val existingPath = files.putIfAbsent(file.getName, file) + require(existingPath == null || existingPath == file, + s"File ${file.getName} was already registered with a different path " + + s"(old path = $existingPath, new path = $file") s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } - override def addJar(jar: File): String = { - val oldJar = jars.put(jar.getName, jar) - if (oldJar != jar) { - logInfo( - s"JAR ${jar.getName} will now be served from $jar (was previously served from $oldJar)") - } - s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(jar.getName())}" + override def addJar(file: File): String = { + val existingPath = jars.putIfAbsent(file.getName, file) + require(existingPath == null || existingPath == file, + s"File ${file.getName} was already registered with a different path " + + s"(old path = $existingPath, new path = $file") + s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}" } override def addDirectory(baseUri: String, path: File): String = { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index fba6c11f5db3..f8d143dc610c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -216,6 +216,34 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } + test("cannot call addFile with different paths that have the same filename") { + val dir = Utils.createTempDir() + try { + val subdir1 = new File(dir, "subdir1") + val subdir2 = new File(dir, "subdir2") + assert(subdir1.mkdir()) + assert(subdir2.mkdir()) + val file1 = new File(subdir1, "file") + val file2 = new File(subdir2, "file") + Files.write("old", file1, StandardCharsets.UTF_8) + Files.write("new", file2, StandardCharsets.UTF_8) + sc = new SparkContext("local-cluster[1,1,1024]", "test") + sc.addFile(file1.getAbsolutePath) + def getAddedFileContents(): String = { + sc.parallelize(Seq(0)).map { _ => + scala.io.Source.fromFile(SparkFiles.get("file")).mkString + }.first() + } + assert(getAddedFileContents() === "old") + intercept[IllegalArgumentException] { + sc.addFile(file2.getAbsolutePath) + } + assert(getAddedFileContents() === "old") + } finally { + Utils.deleteRecursively(dir) + } + } + // Regression tests for SPARK-16787 for ( schedulingMode <- Seq("local-mode", "non-local-mode"); From 0d7dd0d12ace12bf54c3d6b62b802ffa4de800a3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 28 Jul 2016 21:32:03 -0700 Subject: [PATCH 5/6] Improve thread-safety; do not update timestamp for file under assumption of immutability. --- .../scala/org/apache/spark/SparkContext.scala | 26 +++++++++---------- .../org/apache/spark/scheduler/Task.scala | 5 ++-- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1a50afeaa00d..1e857347d52d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID} -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} import scala.collection.JavaConverters._ @@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def env: SparkEnv = _env // Used to store a URL for each static file/jar together with the file's local timestamp - private[spark] val addedFiles = HashMap[String, Long]() - private[spark] val addedJars = HashMap[String, Long]() + private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala + private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala // Keeps track of all persisted RDDs private[spark] val persistentRdds = { @@ -1430,14 +1430,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli schemeCorrectedPath } val timestamp = System.currentTimeMillis - addedFiles(key) = timestamp - - // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration, timestamp, useCache = false) - - logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) - postEnvironmentUpdate() + if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { + logInfo(s"Added file $path at $key with timestamp $timestamp") + postEnvironmentUpdate() + } } /** @@ -1715,11 +1711,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } if (key != null) { - addedJars(key) = System.currentTimeMillis - logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) + val timestamp = System.currentTimeMillis + if (addedJars.putIfAbsent(key, timestamp).isEmpty) { + logInfo(s"Added JAR $path at $key with timestamp $timestamp") + postEnvironmentUpdate() + } } } - postEnvironmentUpdate() } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 15f863b66c6e..35c4dafe9c19 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream} import java.nio.ByteBuffer import java.util.Properties +import scala.collection.mutable import scala.collection.mutable.HashMap import org.apache.spark._ @@ -198,8 +199,8 @@ private[spark] object Task { */ def serializeWithDependencies( task: Task[_], - currentFiles: HashMap[String, Long], - currentJars: HashMap[String, Long], + currentFiles: mutable.Map[String, Long], + currentJars: mutable.Map[String, Long], serializer: SerializerInstance) : ByteBuffer = { From 9aa32b394cd80c776ac567a662c9471351228b93 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 29 Jul 2016 12:59:47 -0700 Subject: [PATCH 6/6] Attempt to fix PySpark tests by re-adding driver download of added files. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1e857347d52d..48126c255fb8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1432,6 +1432,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val timestamp = System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") + // Fetch the file locally so that closures which are run on the driver can still use the + // SparkFiles API to access files. + Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } }