Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ class SparkContext(config: SparkConf) extends Logging {
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
@deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0")
private[spark] def getSparkHome(): Option[String] = {
conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
@deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0")
def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
extends Serializable {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/deploy/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ private[spark] case class Command(
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
javaOpts: Seq[String]) {
}
javaOpts: Seq[String])
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ private[spark] class DriverDescription(
val mem: Int,
val cores: Int,
val supervise: Boolean,
val command: Command)
val command: Command,
val driverSparkHome: Option[String] = None)
extends Serializable {

override def toString: String = s"DriverDescription (${command.mainClass})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ private[spark] object JsonProtocol {
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user) ~
("sparkhome" -> obj.sparkHome) ~
("command" -> obj.command.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ private[spark] object TestClient {
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[spark] class Worker(
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
if (!workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
Expand Down Expand Up @@ -232,10 +232,8 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, conf, ExecutorState.RUNNING)
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self,
workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down Expand Up @@ -264,7 +262,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
Expand Down Expand Up @@ -382,7 +380,8 @@ private[spark] object Worker extends Logging {
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
Expand All @@ -392,7 +391,7 @@ private[spark] object Worker extends Logging {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Int, String]
val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed

val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
private val sparkHome = sc.conf.getOption("spark.executor.home")
.orElse(sc.conf.getOption("spark.home")) // deprecated
.getOrElse {
throw new SparkException("Executor Spark home is not set; set it through spark.executor.home")
}

val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ private[spark] class MesosSchedulerBackend(

var classLoader: ClassLoader = null

private val sparkHome = sc.conf.getOption("spark.executor.home")
.orElse(sc.conf.getOption("spark.home")) // deprecated
.getOrElse {
throw new SparkException("Executor Spark home is not set; set it through spark.executor.home")
}

override def start() {
synchronized {
classLoader = Thread.currentThread.getContextClassLoader
Expand All @@ -86,9 +92,6 @@ private[spark] class MesosSchedulerBackend(
}

def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.language.postfixOps
class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
Expand Down
2 changes: 0 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

conf.setMaster("local[3]")
conf.setAppName("My app")
conf.setSparkHome("/path")
conf.setJars(Seq("a.jar", "b.jar"))
conf.setExecutorEnv("VAR1", "value1")
conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3")))

assert(conf.get("spark.master") === "local[3]")
assert(conf.get("spark.app.name") === "My app")
assert(conf.get("spark.home") === "/path")
assert(conf.get("spark.jars") === "a.jar,b.jar")
assert(conf.get("spark.executorEnv.VAR1") === "value1")
assert(conf.get("spark.executorEnv.VAR2") === "value2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}

def createAppInfo() : ApplicationInfo = {
Expand Down Expand Up @@ -169,8 +169,7 @@ object JsonConstants {
val appDescJsonStr =
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
|"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin

val executorRunnerJsonStr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val sparkHome = sys.props("spark.test.home")
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
sparkHome, "appUiUrl")
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)

assert(er.getCommandSeq.last === appId)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ Apart from these, the following properties are also available, and may be useful
Set a special library path to use when launching executor JVM's.
</td>
</tr>
<tr>
<td><code>spark.executor.home</code></td>
<td>(none)</td>
<td>
Home directory of Spark installation to use when launching executors on the worker machines.
In standalone mode, the Worker's current working directory is used if this is not set. This
is not used in yarn mode.
</td>
</tr>
<tr>
<td><code>spark.files.userClassPathFirst</code></td>
<td>false</td>
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ object TestSettings {
lazy val settings = Seq (
// Fork new JVMs for tests and set Java options for those
fork := true,
javaOptions in Test += "-Dspark.home=" + sparkHome,
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.executor.home=" + sparkHome, // For local-cluster mode
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
Expand Down
11 changes: 2 additions & 9 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@
u'local'
>>> sc.appName
u'My app'
>>> sc.sparkHome is None
>>> sc.sparkHome is not None
True

>>> conf = SparkConf(loadDefaults=False)
>>> conf.setSparkHome("/path")
<pyspark.conf.SparkConf object at ...>
>>> conf.get("spark.home")
u'/path'
>>> conf.setExecutorEnv("VAR1", "value1")
<pyspark.conf.SparkConf object at ...>
>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")])
Expand All @@ -48,10 +43,8 @@
spark.executorEnv.VAR1=value1
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
spark.home=/path
>>> sorted(conf.getAll(), key=lambda p: p[0])
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \
(u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4')]
"""


Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
will be instantiated.


>>> from pyspark.context import SparkContext
Expand Down Expand Up @@ -126,8 +126,6 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf.setMaster(master)
if appName:
self._conf.setAppName(appName)
if sparkHome:
self._conf.setSparkHome(sparkHome)
if environment:
for key, value in environment.iteritems():
self._conf.setExecutorEnv(key, value)
Expand All @@ -144,7 +142,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
# the classpath or an external config file
self.master = self._conf.get("spark.master")
self.appName = self._conf.get("spark.app.name")
self.sparkHome = self._conf.get("spark.home", None)
self.sparkHome = os.environ.get("SPARK_HOME")
for (k, v) in self._conf.getAll():
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
Expand Down
3 changes: 0 additions & 3 deletions repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
sparkContext = new SparkContext(conf)
logInfo("Created spark context..")
sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
}

test("from no conf + spark home") {
ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
assert(ssc.conf.get("spark.home") === sparkHome)
}

test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair))
Expand Down