From 75923697a08e035c8e46b53b67a9d98938212915 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 11:42:18 -0700 Subject: [PATCH 1/9] Allow applications to specify their executor/driver spark homes This allows the worker to launch a driver or an executor from a different installation of Spark on the same machine. To do so, the user needs to set "spark.executor.home" and/or "spark.driver.home". Note that this was already possible for the executors even before this commit. However, it used to rely on "spark.home", which was also used for 20 other things. The next step is to remove all usages of "spark.home", which was confusing to many users (myself included). --- .../spark/deploy/ApplicationDescription.scala | 2 +- .../org/apache/spark/deploy/Client.scala | 4 +++- .../spark/deploy/DriverDescription.scala | 3 ++- .../apache/spark/deploy/JsonProtocol.scala | 2 +- .../spark/deploy/client/TestClient.scala | 5 ++--- .../spark/deploy/worker/DriverRunner.scala | 1 + .../spark/deploy/worker/ExecutorRunner.scala | 1 + .../apache/spark/deploy/worker/Worker.scala | 20 +++++++++---------- .../cluster/SparkDeploySchedulerBackend.scala | 4 ++-- .../spark/deploy/JsonProtocolSuite.scala | 4 ++-- .../deploy/worker/ExecutorRunnerTest.scala | 3 +-- 11 files changed, 26 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..5a466912dbeb0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,8 +22,8 @@ private[spark] class ApplicationDescription( val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, - val sparkHome: Option[String], var appUiUrl: String, + val executorSparkHome: Option[String] = None, val eventLogDir: Option[String] = None) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c371dc3a51c73..c19b6082dc748 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -69,13 +69,15 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends val javaOpts = sys.props.get(javaOptionsConf) val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts) + val driverSparkHome = conf.getOption("spark.driver.home") val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, - command) + command, + driverSparkHome) masterActor ! RequestSubmitDriver(driverDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 58c95dc4f9116..e06fbdcefa236 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -22,7 +22,8 @@ private[spark] class DriverDescription( val mem: Int, val cores: Int, val supervise: Boolean, - val command: Command) + val command: Command, + val driverSparkHome: Option[String] = None) extends Serializable { override def toString: String = s"DriverDescription (${command.mainClass})" diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index c4f5e294a393e..f3eaa47a5e6f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -56,7 +56,7 @@ private[spark] object JsonProtocol { ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ ("user" -> obj.user) ~ - ("sparkhome" -> obj.sparkHome) ~ + ("executorsparkhome" -> obj.executorSparkHome) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index e15a87bd38fda..e07ae07d0f5bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -48,9 +48,8 @@ private[spark] object TestClient { val conf = new SparkConf val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) - val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), - Seq()), Some("dummy-spark-home"), "ignored") + val desc = new ApplicationDescription("TestClient", Some(1), 512, + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq()), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 662d37871e7a6..15a1b42a5aced 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState /** * Manages the execution of one driver, including automatically restarting the driver on failure. + * This is currently only used by the standalone Worker in cluster deploy mode. */ private[spark] class DriverRunner( val driverId: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 467317dd9b44c..70bdffcf4cbb1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender /** * Manages the execution of one executor process. + * This is currently used only by the standalone Worker. */ private[spark] class ExecutorRunner( val appId: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ce425443051b0..d182fe745052f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -81,7 +81,7 @@ private[spark] class Worker( @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() - val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) + val workerSparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] @@ -106,12 +106,12 @@ private[spark] class Worker( def memoryFree: Int = memory - memoryUsed def createWorkDir() { - workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) + workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(workerSparkHome, "work")) try { // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() // So attempting to create and then check if directory was created or not. workDir.mkdirs() - if ( !workDir.exists() || !workDir.isDirectory) { + if (!workDir.exists() || !workDir.isDirectory) { logError("Failed to create work directory " + workDir) System.exit(1) } @@ -127,7 +127,7 @@ private[spark] class Worker( assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) - logInfo("Spark home: " + sparkHome) + logInfo("Worker Spark home: " + workerSparkHome) createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) @@ -232,10 +232,9 @@ private[spark] class Worker( } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, - appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), - workDir, akkaUrl, conf, ExecutorState.RUNNING) + val execSparkHome = appDesc.executorSparkHome.map(new File(_)).getOrElse(workerSparkHome) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, + workerId, host, execSparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -264,7 +263,7 @@ private[spark] class Worker( val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { executors.get(fullId) match { - case Some(executor) => + case Some(executor) => logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) @@ -295,7 +294,8 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) + val driverSparkHome = driverDesc.driverSparkHome.map(new File(_)).getOrElse(workerSparkHome) + val driver = new DriverRunner(driverId, workDir, driverSparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bf2dc88e29048..5b69c5eede632 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -57,9 +57,9 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) - val sparkHome = sc.getSparkHome() + val executorSparkHome = conf.getOption("spark.executor.home") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sc.ui.appUIAddress, executorSparkHome, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 01ab2d549325c..0aa458f31151b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", Some("executorSparkHome")) } def createAppInfo() : ApplicationInfo = { @@ -169,7 +169,7 @@ object JsonConstants { val appDescJsonStr = """ |{"name":"name","cores":4,"memoryperslave":1234, - |"user":"%s","sparkhome":"sparkHome", + |"user":"%s","executorsparkhome":"executorSparkHome", |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"} """.format(System.getProperty("user.name", "")).stripMargin diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index e5f748d55500d..8127ae33fa977 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -29,8 +29,7 @@ class ExecutorRunnerTest extends FunSuite { def f(s:String) = new File(s) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq()), - sparkHome, "appUiUrl") + Command("foo", Seq(), Map(), Seq(), Seq()), "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) From b90444d65744174ba6105da23459218e90788644 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 14:33:44 -0700 Subject: [PATCH 2/9] Remove / deprecate all occurrences of spark.home This involves replacing spark.home to spark.test.home in tests. Looks like python still uses spark.home, however. The next commit will fix this. --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++- core/src/main/scala/org/apache/spark/SparkContext.scala | 7 ++----- .../org/apache/spark/api/java/JavaSparkContext.scala | 1 + .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 8 +++++--- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 9 ++++++--- core/src/test/scala/org/apache/spark/DriverSuite.scala | 2 +- .../src/test/scala/org/apache/spark/SparkConfSuite.scala | 2 -- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- .../apache/spark/deploy/worker/ExecutorRunnerTest.scala | 4 ++-- project/SparkBuild.scala | 2 +- .../main/scala/org/apache/spark/repl/SparkILoop.scala | 3 --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 1 - .../org/apache/spark/streaming/StreamingContext.scala | 8 +++----- .../apache/spark/streaming/StreamingContextSuite.scala | 5 ----- 14 files changed, 25 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae8ae..eb0bbb03873e6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -121,7 +121,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Set the location where Spark is installed on worker nodes. */ def setSparkHome(home: String): SparkConf = { - set("spark.home", home) + set("spark.home", home) // deprecated + set("spark.driver.home", home) + set("spark.executor.home", home) } /** Set multiple parameters together */ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab7526..6176d137aa0f5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -114,7 +114,7 @@ class SparkContext(config: SparkConf) extends Logging { environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { - this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) + this(SparkContext.updatedConf(new SparkConf, master, appName, jars, environment)) this.preferredNodeLocationData = preferredNodeLocationData } @@ -1013,6 +1013,7 @@ class SparkContext(config: SparkConf) extends Logging { * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ + @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") private[spark] def getSparkHome(): Option[String] = { conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } @@ -1431,16 +1432,12 @@ object SparkContext extends Logging { conf: SparkConf, master: String, appName: String, - sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()): SparkConf = { val res = conf.clone() res.setMaster(master) res.setAppName(appName) - if (sparkHome != null) { - res.setSparkHome(sparkHome) - } if (jars != null && !jars.isEmpty) { res.setJars(jars) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1e0493c4855e0..38cb7ce79ec79 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -465,6 +465,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ + @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome()) /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 9f45400bcf852..c7400d2afebc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -71,9 +71,11 @@ private[spark] class CoarseMesosSchedulerBackend( val taskIdToSlaveId = new HashMap[Int, String] val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) + private val sparkHome = sc.conf.getOption("spark.executor.home") + .orElse(sc.conf.getOption("spark.home")) // deprecated + .getOrElse { + throw new SparkException("Executor Spark home is not set; set it through spark.executor.home") + } val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c717e7c621a8f..386b24b88eee0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -62,6 +62,12 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + private val sparkHome = sc.conf.getOption("spark.executor.home") + .orElse(sc.conf.getOption("spark.home")) // deprecated + .getOrElse { + throw new SparkException("Executor Spark home is not set; set it through spark.executor.home") + } + override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -86,9 +92,6 @@ private[spark] class MesosSchedulerBackend( } def createExecutorInfo(execId: String): ExecutorInfo = { - val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( - "Spark home is not set; set it through the spark.home system " + - "property, the SPARK_HOME environment variable or the SparkContext constructor")) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index de4bd90c8f7e5..e36902ec81e08 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -34,7 +34,7 @@ import scala.language.postfixOps class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622456..1b91d25ebdf3c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -45,14 +45,12 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { conf.setMaster("local[3]") conf.setAppName("My app") - conf.setSparkHome("/path") conf.setJars(Seq("a.jar", "b.jar")) conf.setExecutorEnv("VAR1", "value1") conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3"))) assert(conf.get("spark.master") === "local[3]") assert(conf.get("spark.app.name") === "My app") - assert(conf.get("spark.home") === "/path") assert(conf.get("spark.jars") === "a.jar,b.jar") assert(conf.get("spark.executorEnv.VAR1") === "value1") assert(conf.get("spark.executorEnv.VAR2") === "value2") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 565c53e9529ff..d29516c03568d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -266,7 +266,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String]): String = { - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.props("spark.test.home") Utils.executeAndGetOutput( Seq("./bin/spark-submit") ++ args, new File(sparkHome), diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 8127ae33fa977..b28b02abc1c43 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -27,11 +27,11 @@ import org.apache.spark.SparkConf class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) + val sparkHome = sys.props("spark.test.home") val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(), Map(), Seq(), Seq()), "appUiUrl") val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5461d25d72d7e..c3fdc2816ac57 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -312,7 +312,7 @@ object TestSettings { lazy val settings = Seq ( // Fork new JVMs for tests and set Java options for those fork := true, - javaOptions in Test += "-Dspark.home=" + sparkHome, + javaOptions in Test += "-Dspark.test.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index e1db4d5395ab9..3d61615e2317e 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -951,9 +951,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, if (execUri != null) { conf.set("spark.executor.uri", execUri) } - if (System.getenv("SPARK_HOME") != null) { - conf.setSparkHome(System.getenv("SPARK_HOME")) - } sparkContext = new SparkContext(conf) logInfo("Created spark context..") sparkContext diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ac56ff709c1c4..b780282bdac37 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -35,7 +35,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName - val sparkHome = ssc.sc.getSparkHome.getOrElse(null) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e0677b795cb94..c156dbc7b074f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -85,10 +85,10 @@ class StreamingContext private[streaming] ( master: String, appName: String, batchDuration: Duration, - sparkHome: String = null, + sparkHome: String = null, // Not used, but kept for backwards compatibility jars: Seq[String] = Nil, environment: Map[String, String] = Map()) = { - this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), + this(StreamingContext.createNewSparkContext(master, appName, jars, environment), null, batchDuration) } @@ -552,12 +552,10 @@ object StreamingContext extends Logging { private[streaming] def createNewSparkContext( master: String, appName: String, - sparkHome: String, jars: Seq[String], environment: Map[String, String] ): SparkContext = { - val conf = SparkContext.updatedConf( - new SparkConf(), master, appName, sparkHome, jars, environment) + val conf = SparkContext.updatedConf(new SparkConf, master, appName, jars, environment) createNewSparkContext(conf) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 7b33d3b235466..f0fefc4830039 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -59,11 +59,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.sparkContext.conf.get("spark.app.name") === appName) } - test("from no conf + spark home") { - ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil) - assert(ssc.conf.get("spark.home") === sparkHome) - } - test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair)) From 2a64cfcc63023a7ded58421f094421e9a1067e10 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 14:52:48 -0700 Subject: [PATCH 3/9] Remove usages of spark.home in python --- python/pyspark/conf.py | 10 ++-------- python/pyspark/context.py | 6 ++---- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 60fc6ba7c52c2..eb752cb2affd8 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -30,14 +30,9 @@ u'local' >>> sc.appName u'My app' ->>> sc.sparkHome is None +>>> sc.sparkHome is not None True - >>> conf = SparkConf(loadDefaults=False) ->>> conf.setSparkHome("/path") - ->>> conf.get("spark.home") -u'/path' >>> conf.setExecutorEnv("VAR1", "value1") >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) @@ -48,9 +43,8 @@ spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 -spark.home=/path >>> sorted(conf.getAll(), key=lambda p: p[0]) -[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4')] """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 95c54e7a5ad63..4bcc82c42ac38 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -75,7 +75,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @param serializer: The serializer for RDDs. @param conf: A L{SparkConf} object setting Spark properties. @param gateway: Use an existing gateway and JVM, otherwise a new JVM - will be instatiated. + will be instantiated. >>> from pyspark.context import SparkContext @@ -108,8 +108,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._conf.setMaster(master) if appName: self._conf.setAppName(appName) - if sparkHome: - self._conf.setSparkHome(sparkHome) if environment: for key, value in environment.iteritems(): self._conf.setExecutorEnv(key, value) @@ -124,7 +122,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, # the classpath or an external config file self.master = self._conf.get("spark.master") self.appName = self._conf.get("spark.app.name") - self.sparkHome = self._conf.get("spark.home", None) + self.sparkHome = os.environ.get("SPARK_HOME") for (k, v) in self._conf.getAll(): if k.startswith("spark.executorEnv."): varName = k[len("spark.executorEnv."):] From 81710627925ee6cbd2099215efd17c3173b7bed8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 15:02:58 -0700 Subject: [PATCH 4/9] Add back *SparkContext functionality to setSparkHome This is because we cannot deprecate these constructors easily... --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++++-- .../org/apache/spark/streaming/StreamingContext.scala | 8 +++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6176d137aa0f5..f24e649eb3c80 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -114,7 +114,7 @@ class SparkContext(config: SparkConf) extends Logging { environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { - this(SparkContext.updatedConf(new SparkConf, master, appName, jars, environment)) + this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) this.preferredNodeLocationData = preferredNodeLocationData } @@ -1013,7 +1013,7 @@ class SparkContext(config: SparkConf) extends Logging { * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ - @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") + @deprecated("spark.home is deprecated; use spark.executor/driver.home and/or spark.driver.home instead", "1.1.0") private[spark] def getSparkHome(): Option[String] = { conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } @@ -1432,12 +1432,16 @@ object SparkContext extends Logging { conf: SparkConf, master: String, appName: String, + sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()): SparkConf = { val res = conf.clone() res.setMaster(master) res.setAppName(appName) + if (sparkHome != null) { + res.setSparkHome(sparkHome) + } if (jars != null && !jars.isEmpty) { res.setJars(jars) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index c156dbc7b074f..e0677b795cb94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -85,10 +85,10 @@ class StreamingContext private[streaming] ( master: String, appName: String, batchDuration: Duration, - sparkHome: String = null, // Not used, but kept for backwards compatibility + sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()) = { - this(StreamingContext.createNewSparkContext(master, appName, jars, environment), + this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), null, batchDuration) } @@ -552,10 +552,12 @@ object StreamingContext extends Logging { private[streaming] def createNewSparkContext( master: String, appName: String, + sparkHome: String, jars: Seq[String], environment: Map[String, String] ): SparkContext = { - val conf = SparkContext.updatedConf(new SparkConf, master, appName, jars, environment) + val conf = SparkContext.updatedConf( + new SparkConf(), master, appName, sparkHome, jars, environment) createNewSparkContext(conf) } From 2333c0ecb8ccd16a2c9dbf1a97ae58d7c6e708eb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 15:04:46 -0700 Subject: [PATCH 5/9] Minor deprecation message change --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f24e649eb3c80..c51f3604af55d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1013,7 +1013,7 @@ class SparkContext(config: SparkConf) extends Logging { * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ - @deprecated("spark.home is deprecated; use spark.executor/driver.home and/or spark.driver.home instead", "1.1.0") + @deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0") private[spark] def getSparkHome(): Option[String] = { conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } From b94020e13917ae59b1f3d8954cdecc7089c77141 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 15:46:58 -0700 Subject: [PATCH 6/9] Document spark.executor.home (but not spark.driver.home) ... because the only mode that uses spark.driver.home right now is standalone-cluster, which is broken (SPARK-2260). It makes little sense to document that this feature exists on a mode that is broken. --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 1 + docs/configuration.md | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c19b6082dc748..6865970a72905 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -69,6 +69,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends val javaOpts = sys.props.get(javaOptionsConf) val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts) + // TODO: document this once standalone-cluster mode is fixed (SPARK-2260) val driverSparkHome = conf.getOption("spark.driver.home") val driverDescription = new DriverDescription( diff --git a/docs/configuration.md b/docs/configuration.md index a70007c165442..01d3c8aac6547 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -186,6 +186,15 @@ Apart from these, the following properties are also available, and may be useful Set a special library path to use when launching executor JVM's. + + spark.executor.home + (none) + + Home directory of Spark installation to use when launching executors on the worker machines. + In standalone mode, the Worker's current working directory is used if this is not set. This + is not used in yarn mode. + + spark.files.userClassPathFirst false From 00147646ec8594caa8915c9a3fb329fcbe0042a4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 21 Jul 2014 18:28:18 -0700 Subject: [PATCH 7/9] Fix tests that use local-cluster mode --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c3fdc2816ac57..29045f13a41fd 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -313,6 +313,7 @@ object TestSettings { // Fork new JVMs for tests and set Java options for those fork := true, javaOptions in Test += "-Dspark.test.home=" + sparkHome, + javaOptions in Test += "-Dspark.executor.home=" + sparkHome, // For local-cluster mode javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") From ecdfa92fd33f19fc57e041e4269405c011a43261 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 21 Jul 2014 18:28:32 -0700 Subject: [PATCH 8/9] Formatting changes (minor) --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index d182fe745052f..e782171980b75 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -381,7 +381,8 @@ private[spark] object Worker extends Logging { cores: Int, memory: Int, masterUrls: Array[String], - workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workDir: String, + workerNumber: Option[Int] = None): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf @@ -391,7 +392,7 @@ private[spark] object Worker extends Logging { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) + masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } From c6533bc3520b5907584f9f010c75f303046b3e7b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 Aug 2014 19:56:52 -0700 Subject: [PATCH 9/9] Do not ship any spark home to workers --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +--- .../apache/spark/deploy/ApplicationDescription.scala | 1 - .../main/scala/org/apache/spark/deploy/Client.scala | 5 +---- .../scala/org/apache/spark/deploy/JsonProtocol.scala | 1 - .../org/apache/spark/deploy/client/TestClient.scala | 3 +-- .../org/apache/spark/deploy/worker/Worker.scala | 12 +++++------- .../cluster/SparkDeploySchedulerBackend.scala | 3 +-- .../org/apache/spark/deploy/JsonProtocolSuite.scala | 5 ++--- .../spark/deploy/worker/ExecutorRunnerTest.scala | 2 +- 9 files changed, 12 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 14c1247a38451..38700847c80f4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -123,9 +123,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Set the location where Spark is installed on worker nodes. */ def setSparkHome(home: String): SparkConf = { - set("spark.home", home) // deprecated - set("spark.driver.home", home) - set("spark.executor.home", home) + set("spark.home", home) } /** Set multiple parameters together */ diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 5a466912dbeb0..65a1a8fd7e929 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -23,7 +23,6 @@ private[spark] class ApplicationDescription( val memoryPerSlave: Int, val command: Command, var appUiUrl: String, - val executorSparkHome: Option[String] = None, val eventLogDir: Option[String] = None) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 94e5430980af8..17c507af2652d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -67,16 +67,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) - // TODO: document this once standalone-cluster mode is fixed (SPARK-2260) - val driverSparkHome = conf.getOption("spark.driver.home") val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, - command, - driverSparkHome) + command) masterActor ! RequestSubmitDriver(driverDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index f3eaa47a5e6f3..696f32a6f5730 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -56,7 +56,6 @@ private[spark] object JsonProtocol { ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ ("user" -> obj.user) ~ - ("executorsparkhome" -> obj.executorSparkHome) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 3c53f4f5fac89..88a0862b96afe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,8 +49,7 @@ private[spark] object TestClient { val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, - Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), - "ignored", Some("dummy-spark-home")) + Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index d1d0264348655..519aaaf5e3f46 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -81,7 +81,7 @@ private[spark] class Worker( @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() - val workerSparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) + val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] @@ -106,7 +106,7 @@ private[spark] class Worker( def memoryFree: Int = memory - memoryUsed def createWorkDir() { - workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(workerSparkHome, "work")) + workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() // So attempting to create and then check if directory was created or not. @@ -127,7 +127,7 @@ private[spark] class Worker( assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) - logInfo("Worker Spark home: " + workerSparkHome) + logInfo("Spark home: " + sparkHome) createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) @@ -232,9 +232,8 @@ private[spark] class Worker( } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val execSparkHome = appDesc.executorSparkHome.map(new File(_)).getOrElse(workerSparkHome) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, - workerId, host, execSparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) + workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -294,8 +293,7 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driverSparkHome = driverDesc.driverSparkHome.map(new File(_)).getOrElse(workerSparkHome) - val driver = new DriverRunner(driverId, workDir, driverSparkHome, driverDesc, self, akkaUrl) + val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index fcc0193cf4c5a..a28446f6c8a6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend( val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) - val executorSparkHome = conf.getOption("spark.executor.home") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sc.ui.appUIAddress, executorSparkHome, sc.eventLogger.map(_.logDir)) + sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 12358b19a116c..31aa7ec837f43 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) - new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", Some("executorSparkHome")) + new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl") } def createAppInfo() : ApplicationInfo = { @@ -169,8 +169,7 @@ object JsonConstants { val appDescJsonStr = """ |{"name":"name","cores":4,"memoryperslave":1234, - |"user":"%s","executorsparkhome":"executorSparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} + |"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} """.format(System.getProperty("user.name", "")).stripMargin val executorRunnerJsonStr = diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index deef27448c83b..149a2b3d95b86 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite { def f(s:String) = new File(s) val sparkHome = sys.props("spark.test.home") val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl", Option(sparkHome)) + Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)