diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 368835a867493..e294ea67891d1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1016,6 +1016,7 @@ class SparkContext(config: SparkConf) extends Logging { * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ + @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") private[spark] def getSparkHome(): Option[String] = { conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index d9d1c5955ca99..c4007e256fc24 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -480,6 +480,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ + @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome()) /** diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..65a1a8fd7e929 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,7 +22,6 @@ private[spark] class ApplicationDescription( val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, - val sparkHome: Option[String], var appUiUrl: String, val eventLogDir: Option[String] = None) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala index a2b263544c6a2..b771c0491ffd2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Command.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala @@ -25,5 +25,4 @@ private[spark] case class Command( environment: Map[String, String], classPathEntries: Seq[String], libraryPathEntries: Seq[String], - javaOpts: Seq[String]) { -} + javaOpts: Seq[String]) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 58c95dc4f9116..e06fbdcefa236 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -22,7 +22,8 @@ private[spark] class DriverDescription( val mem: Int, val cores: Int, val supervise: Boolean, - val command: Command) + val command: Command, + val driverSparkHome: Option[String] = None) extends Serializable { override def toString: String = s"DriverDescription (${command.mainClass})" diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index c4f5e294a393e..696f32a6f5730 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -56,7 +56,6 @@ private[spark] object JsonProtocol { ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ ("user" -> obj.user) ~ - ("sparkhome" -> obj.sparkHome) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index b8ffa9afb69cb..88a0862b96afe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -48,9 +48,8 @@ private[spark] object TestClient { val conf = new SparkConf val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) - val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), - Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored") + val desc = new ApplicationDescription("TestClient", Some(1), 512, + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fb5252da96519..519aaaf5e3f46 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -111,7 +111,7 @@ private[spark] class Worker( // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() // So attempting to create and then check if directory was created or not. workDir.mkdirs() - if ( !workDir.exists() || !workDir.isDirectory) { + if (!workDir.exists() || !workDir.isDirectory) { logError("Failed to create work directory " + workDir) System.exit(1) } @@ -232,10 +232,8 @@ private[spark] class Worker( } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, - appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), - workDir, akkaUrl, conf, ExecutorState.RUNNING) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, + workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -264,7 +262,7 @@ private[spark] class Worker( val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { executors.get(fullId) match { - case Some(executor) => + case Some(executor) => logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) @@ -382,7 +380,8 @@ private[spark] object Worker extends Logging { cores: Int, memory: Int, masterUrls: Array[String], - workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workDir: String, + workerNumber: Option[Int] = None): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf @@ -392,7 +391,7 @@ private[spark] object Worker extends Logging { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) + masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 48aaaa54bdb35..a28446f6c8a6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend( val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) - val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 9f45400bcf852..c7400d2afebc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -71,9 +71,11 @@ private[spark] class CoarseMesosSchedulerBackend( val taskIdToSlaveId = new HashMap[Int, String] val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) + private val sparkHome = sc.conf.getOption("spark.executor.home") + .orElse(sc.conf.getOption("spark.home")) // deprecated + .getOrElse { + throw new SparkException("Executor Spark home is not set; set it through spark.executor.home") + } val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c717e7c621a8f..386b24b88eee0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -62,6 +62,12 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + private val sparkHome = sc.conf.getOption("spark.executor.home") + .orElse(sc.conf.getOption("spark.home")) // deprecated + .getOrElse { + throw new SparkException("Executor Spark home is not set; set it through spark.executor.home") + } + override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -86,9 +92,6 @@ private[spark] class MesosSchedulerBackend( } def createExecutorInfo(execId: String): ExecutorInfo = { - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index de4bd90c8f7e5..e36902ec81e08 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -34,7 +34,7 @@ import scala.language.postfixOps class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622456..1b91d25ebdf3c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -45,14 +45,12 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { conf.setMaster("local[3]") conf.setAppName("My app") - conf.setSparkHome("/path") conf.setJars(Seq("a.jar", "b.jar")) conf.setExecutorEnv("VAR1", "value1") conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3"))) assert(conf.get("spark.master") === "local[3]") assert(conf.get("spark.app.name") === "My app") - assert(conf.get("spark.home") === "/path") assert(conf.get("spark.jars") === "a.jar,b.jar") assert(conf.get("spark.executorEnv.VAR1") === "value1") assert(conf.get("spark.executorEnv.VAR2") === "value2") diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 093394ad6d142..31aa7ec837f43 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") } def createAppInfo() : ApplicationInfo = { @@ -169,8 +169,7 @@ object JsonConstants { val appDescJsonStr = """ |{"name":"name","cores":4,"memoryperslave":1234, - |"user":"%s","sparkhome":"sparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} + |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} """.format(System.getProperty("user.name", "")).stripMargin val executorRunnerJsonStr = diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 9190b05e2dba2..8126ef1bb23aa 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String]): String = { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") Utils.executeAndGetOutput( Seq("./bin/spark-submit") ++ args, new File(sparkHome), diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index ca4d987619c91..149a2b3d95b86 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -27,12 +27,11 @@ import org.apache.spark.SparkConf class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) + val sparkHome = sys.props("spark.test.home") val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), - sparkHome, "appUiUrl") + Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl") val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) diff --git a/docs/configuration.md b/docs/configuration.md index 2a71d7b820e5f..c966236a654bd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -188,6 +188,15 @@ Apart from these, the following properties are also available, and may be useful Set a special library path to use when launching executor JVM's. + + spark.executor.home + (none) + + Home directory of Spark installation to use when launching executors on the worker machines. + In standalone mode, the Worker's current working directory is used if this is not set. This + is not used in yarn mode. + + spark.files.userClassPathFirst false diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a8bbd55861954..f116433a5cca1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -328,7 +328,8 @@ object TestSettings { lazy val settings = Seq ( // Fork new JVMs for tests and set Java options for those fork := true, - javaOptions in Test += "-Dspark.home=" + sparkHome, + javaOptions in Test += "-Dspark.test.home=" + sparkHome, + javaOptions in Test += "-Dspark.executor.home=" + sparkHome, // For local-cluster mode javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index b4c82f519bd53..c88fbec1fe9a8 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -30,14 +30,9 @@ u'local' >>> sc.appName u'My app' ->>> sc.sparkHome is None +>>> sc.sparkHome is not None True - >>> conf = SparkConf(loadDefaults=False) ->>> conf.setSparkHome("/path") - ->>> conf.get("spark.home") -u'/path' >>> conf.setExecutorEnv("VAR1", "value1") >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) @@ -48,10 +43,8 @@ spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 -spark.home=/path >>> sorted(conf.getAll(), key=lambda p: p[0]) -[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \ -(u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4')] """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7b0f8d83aedc5..5ee0683fef6dd 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -84,7 +84,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @param serializer: The serializer for RDDs. @param conf: A L{SparkConf} object setting Spark properties. @param gateway: Use an existing gateway and JVM, otherwise a new JVM - will be instatiated. + will be instantiated. >>> from pyspark.context import SparkContext @@ -126,8 +126,6 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._conf.setMaster(master) if appName: self._conf.setAppName(appName) - if sparkHome: - self._conf.setSparkHome(sparkHome) if environment: for key, value in environment.iteritems(): self._conf.setExecutorEnv(key, value) @@ -144,7 +142,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, # the classpath or an external config file self.master = self._conf.get("spark.master") self.appName = self._conf.get("spark.app.name") - self.sparkHome = self._conf.get("spark.home", None) + self.sparkHome = os.environ.get("SPARK_HOME") for (k, v) in self._conf.getAll(): if k.startswith("spark.executorEnv."): varName = k[len("spark.executorEnv."):] diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 42c7e511dc3f5..65788f4646d91 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -969,9 +969,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, if (execUri != null) { conf.set("spark.executor.uri", execUri) } - if (System.getenv("SPARK_HOME") != null) { - conf.setSparkHome(System.getenv("SPARK_HOME")) - } sparkContext = new SparkContext(conf) logInfo("Created spark context..") sparkContext diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ac56ff709c1c4..b780282bdac37 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -35,7 +35,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName - val sparkHome = ssc.sc.getSparkHome.getOrElse(null) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 7b33d3b235466..f0fefc4830039 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -59,11 +59,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.sparkContext.conf.get("spark.app.name") === appName) } - test("from no conf + spark home") { - ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil) - assert(ssc.conf.get("spark.home") === sparkHome) - } - test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair))