Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

Expand Down Expand Up @@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter {case (k, v) => k.startsWith("akka.")}
getAll.filter { case (k, _) => isAkkaConf(k) }

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
Expand Down Expand Up @@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}

private[spark] object SparkConf {
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
*/
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")

/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain akka and authentication configs are required of the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
}
}
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy

import scala.collection.JavaConversions._
import scala.collection.mutable.Map
import scala.concurrent._

import akka.actor._
Expand Down Expand Up @@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val env = Map[String, String]()
System.getenv().foreach{case (k, v) => env(k) = v}

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

val classPathConf = "spark.driver.extraClassPath"
Expand All @@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
cp.split(java.io.File.pathSeparator)
}

val javaOptionsConf = "spark.driver.extraJavaOptions"
val javaOpts = sys.props.get(javaOptionsConf)
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
Expand Down Expand Up @@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
}
System.exit(0)
Expand Down Expand Up @@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ private[spark] case class Command(
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
extraJavaOptions: Option[String] = None) {
javaOpts: Seq[String]) {
}
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (STANDALONE, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down Expand Up @@ -166,9 +164,9 @@ object SparkSubmit {
val options = List[OptionAssigner](

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),

// Standalone cluster only
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
Expand Down Expand Up @@ -199,9 +197,9 @@ object SparkSubmit {
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
Seq()), Some("dummy-spark-home"), "ignored")
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ object CommandUtils extends Logging {
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())

// Exists for backwards compatibility with older Spark versions
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
Expand All @@ -62,7 +61,7 @@ object CommandUtils extends Logging {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
Seq()
Seq()
}

val permGenOpt = Seq("-XX:MaxPermSize=128m")
Expand All @@ -71,11 +70,11 @@ object CommandUtils extends Logging {
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
extraEnvironment=command.environment)
extraEnvironment = command.environment)
val userClassPath = command.classPathEntries ++ Seq(classPath)

Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState

/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
Expand Down Expand Up @@ -81,7 +82,7 @@ private[spark] class DriverRunner(
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
driverDesc.command.extraJavaOptions)
driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender

/**
* Manages the execution of one executor process.
* This is currently only used in standalone mode.
*/
private[spark] class ExecutorRunner(
val appId: String,
Expand Down Expand Up @@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
}

/**
* kill executor process, wait for exit and notify worker to update resource status
* Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
*/
Expand Down Expand Up @@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
}

def getCommandSeq = {
val command = Command(appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
appDesc.command.extraJavaOptions)
val command = Command(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.deploy.worker.ui

import java.io.File
import javax.servlet.http.HttpServletRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes to this file related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise the driver log page doesn't render correctly (relatively minor)


import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
import org.apache.spark.util.logging.RollingFileAppender

private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
Expand Down Expand Up @@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)

val (logDir, params) = (appId, executorId, driverId) match {
val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
case (None, None, Some(d)) =>
(s"${workDir.getPath}/$d/", s"driverId=$d")
(s"${workDir.getPath}/$d/", s"driverId=$d", d)
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
Expand Down Expand Up @@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
</div>
</body>
</html>
UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app"))
UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
}

/** Get the part of the log files given the offset and desired length of bytes */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {

private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
workerUrl: Option[String]) {

SignalLogger.register(log)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
Expand Down Expand Up @@ -46,6 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
Expand All @@ -54,9 +55,11 @@ private[spark] class SparkDeploySchedulerBackend(
cp.split(java.io.File.pathSeparator)
}

val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1313,4 +1313,13 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}

/**
* Convert all spark properties set in the given SparkConf to a sequence of java options.
*/
def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = {
conf.getAll
.filter { case (k, _) => filterKey(k) }
.map { case (k, v) => s"-D$k=$v" }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
}

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}

Expand All @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {

def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)

def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
Expand Down Expand Up @@ -170,7 +170,7 @@ object JsonConstants {
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin

val executorRunnerJsonStr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ class SparkSubmitSuite extends FunSuite with Matchers {
childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
classpath should have size (0)
sysProps should have size (3)
sysProps.keys should contain ("spark.jars")
sysProps should have size (5)
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
sysProps.keys should contain ("spark.jars")
sysProps.keys should contain ("spark.shuffle.spill")
sysProps("spark.shuffle.spill") should be ("false")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}

class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq()),
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
Expand Down