Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
89522ef
Add class path isolation support for Yarn cluster mode.
Oct 29, 2014
a853e74
Re-work CoarseGrainedExecutorBackend command line arguments.
Oct 29, 2014
91f7e54
[yarn] Enable executor class path isolation.
Oct 29, 2014
a314f2d
Enable driver class path isolation in SparkSubmit.
Oct 30, 2014
46d8cf2
Update doc with new option, change name to "userClassPathFirst".
Oct 30, 2014
d0394b8
Add "deprecated configs" to SparkConf.
Oct 30, 2014
4a84d87
Fix the child-first class loader.
Nov 1, 2014
0b64d92
Add deprecation warning to yarn option.
Nov 1, 2014
55c88fa
Run all Yarn integration tests via spark-submit.
Nov 5, 2014
20373f5
Fix ClientBaseSuite.
Nov 5, 2014
7f8603c
Fix yarn-cluster mode without userClassPathFirst.
Nov 11, 2014
7d14397
Register user jars in executor up front.
Nov 10, 2014
50afa5f
Fix Yarn executor command line.
Nov 11, 2014
a963ea3
Implement spark.driver.userClassPathFirst for standalone cluster mode.
Nov 11, 2014
89d8072
Cleanups.
Nov 12, 2014
fa1aafa
Remove write check on user jars.
Nov 12, 2014
d1273b2
Add test file to rat exclude.
Nov 12, 2014
54e1a98
Merge branch 'master' into SPARK-2996
Nov 13, 2014
35949c8
Merge branch 'master' into SPARK-2996
Nov 13, 2014
5304d64
Merge branch 'master' into SPARK-2996
Nov 25, 2014
7b57cba
Remove now outdated message.
Nov 25, 2014
44010b6
Merge branch 'master' into SPARK-2996
Dec 9, 2014
f513871
Merge branch 'master' into SPARK-2996
Dec 24, 2014
3730151
Merge branch 'master' into SPARK-2996
Jan 8, 2015
a10f379
Some feedback.
Jan 14, 2015
b6497f9
Merge branch 'master' into SPARK-2996
Jan 14, 2015
2e6c4b7
Mention new setting in documentation.
Jan 20, 2015
fbb8ab5
Add locking in loadClass() to avoid deadlocks.
Jan 22, 2015
3cb6498
Call the right loadClass() method on the parent.
Jan 22, 2015
25d4fed
Merge branch 'master' into SPARK-2996
Jan 29, 2015
fe970a7
Review feedback.
Jan 30, 2015
0e6ef19
Move class loaders around and make names more meaninful.
Jan 30, 2015
0fe7777
Merge branch 'master' into SPARK-2996
Feb 3, 2015
70d4044
Fix pyspark/yarn-cluster test.
Feb 4, 2015
0e6d6be
Merge branch 'master' into SPARK-2996
Feb 4, 2015
2ce3c7a
Merge branch 'master' into SPARK-2996
Feb 6, 2015
3f768e3
Merge branch 'master' into SPARK-2996
Feb 6, 2015
a1b8d7e
Merge branch 'master' into SPARK-2996
Feb 6, 2015
cabf962
Merge branch 'master' into SPARK-2996
Feb 9, 2015
a8c69f1
Review feedback.
Feb 9, 2015
fa7df88
Remove 'test.resource' file, create it dynamically.
Feb 9, 2015
a1499e2
Remove SPARK_HOME propagation.
Feb 9, 2015
9cf9cf1
Merge branch 'master' into SPARK-2996
Feb 10, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
Expand Down Expand Up @@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
settings.put(key, value)
settings.put(translateConfKey(key, warn = true), value)
this
}

Expand Down Expand Up @@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
settings.putIfAbsent(key, value)
settings.putIfAbsent(translateConfKey(key, warn = true), value)
this
}

Expand Down Expand Up @@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Option(settings.get(translateConfKey(key)))
}

/** Get all parameters as a list of pairs */
Expand Down Expand Up @@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.containsKey(key)
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))

/** Copy this object */
override def clone: SparkConf = {
Expand Down Expand Up @@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
Expand Down Expand Up @@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}

}

private[spark] object SparkConf {
private[spark] object SparkConf extends Logging {

private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
configs.map { x => (x.oldName, x) }.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use parentheses instead of curly braces here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like both patterns are used in Spark. I kinda prefer the braces, makes it clearer (to me) that it's a closure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I actually prefer the style here

}

/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
Expand All @@ -380,4 +392,63 @@ private[spark] object SparkConf {
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}

/**
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
* returns the provided key.
*
* @param userKey Configuration key from the user / caller.
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
* only once for each key.
*/
def translateConfKey(userKey: String, warn: Boolean = false): String = {
deprecatedConfigs.get(userKey)
.map { deprecatedKey =>
if (warn) {
deprecatedKey.warn()
}
deprecatedKey.newName.getOrElse(userKey)
}.getOrElse(userKey)
}

/**
* Holds information about keys that have been deprecated or renamed.
*
* @param oldName Old configuration key.
* @param newName New configuration key, or `null` if key has no replacement, in which case the
* deprecated key will be used (but the warning message will still be printed).
* @param version Version of Spark where key was deprecated.
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
* `newName` is not provided.
*/
private case class DeprecatedConfig(
oldName: String,
_newName: String,
version: String,
deprecationMessage: String = null) {

private val warned = new AtomicBoolean(false)
val newName = Option(_newName)

if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
throw new IllegalArgumentException("Need new config name or deprecation message.")
}

def warn(): Unit = {
if (warned.compareAndSet(false, true)) {
if (newName != null) {
val message = Option(deprecationMessage).getOrElse(
s"Please use the alternative '$newName' instead.")
logWarning(
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
s"may be removed in the future. $message")
} else {
logWarning(
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
s"may be removed in the future. $deprecationMessage")
}
}
}

}
}
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark

import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConversions._

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

Expand Down Expand Up @@ -59,6 +60,22 @@ private[spark] object TestUtils {
createJar(files1 ++ files2, jarFile)
}

/**
* Create a jar file containing multiple files. The `files` map contains a mapping of
* file names in the jar file to their contents.
*/
def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
val tempDir = Option(dir).getOrElse(Utils.createTempDir())
val jarFile = File.createTempFile("testJar", ".jar", tempDir)
val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
files.foreach { case (k, v) =>
val entry = new JarEntry(k)
jarStream.putNextEntry(entry)
ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
}
jarStream.close()
jarFile.toURI.toURL
}

/**
* Create a jar file that contains this set of files. All files will be located at the root
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
* Whether to submit, kill, or request the status of an application.
Expand Down Expand Up @@ -467,11 +467,11 @@ object SparkSubmit {
}

val loader =
if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
new ChildExecutorURLClassLoader(new Array[URL](0),
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we just use ChildExecutorURLClassLoader and ExecutorURLClassLoader that it has existed. so we reduce code to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That kinda clashes with Sandy's comment, which I agree with, that calling them "Executor" is misleading since they're used in other contexts. And frankly, compared to the other changes here, changing those class names is a very small one.

Thread.currentThread.getContextClassLoader)
} else {
new ExecutorURLClassLoader(new Array[URL](0),
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
<td>{driver.desc.command.arguments(1)}</td>
<td>{driver.desc.command.arguments(2)}</td>
</tr>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private class SubmitRequestServlet(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ private[spark] class DriverRunner(
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)

// Make sure user application jar is on the classpath
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}

// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
Expand Down Expand Up @@ -111,12 +116,6 @@ private[spark] class DriverRunner(
}
}

/** Replace variables in a command argument passed to us */
private def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case other => other
}

/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,44 @@

package org.apache.spark.deploy.worker

import java.io.File

import akka.actor._

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
* Utility object for launching driver programs such that they share fate with the Worker process.
*/
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
case workerUrl :: mainClass :: extraArgs =>
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)

// Delegate to supplied main class
val clazz = Class.forName(args(1))
val clazz = Class.forName(mainClass, true, loader)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

actorSystem.shutdown()

case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're technically changing the public API here, which is probably fine since there's not really a way around it in this case. @pwendell thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this object supposed to be public in the first place? Seems like it's only used through SparkSubmit (much like Yarn's Client, although that one is public only for backwards compatibility).

System.exit(-1)
}
}
Expand Down
Loading