diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae99432f5ce8..124b04c802e3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -29,7 +29,7 @@ private[spark] class ApplicationDescription( // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None, val coresPerExecutor: Option[Int] = None) - extends Serializable { + extends Description with Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 3feb7cea593e..eeff00900f0c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -23,7 +23,7 @@ import org.apache.spark.deploy.ExecutorState.ExecutorState import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.RecoveryState.MasterState -import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} +import org.apache.spark.deploy.worker.{DriverRunnerInfo, ExecutorRunnerInfo} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -188,8 +188,8 @@ private[deploy] object DeployMessages { // Worker to WorkerWebUI case class WorkerStateResponse(host: String, port: Int, workerId: String, - executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], - drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String, + executors: List[ExecutorRunnerInfo], finishedExecutors: List[ExecutorRunnerInfo], + drivers: List[DriverRunnerInfo], finishedDrivers: List[DriverRunnerInfo], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/Description.scala b/core/src/main/scala/org/apache/spark/deploy/Description.scala new file mode 100644 index 000000000000..7dabf4c6d3db --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/Description.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +trait Description { + def command: Command +} diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 659fb434a80f..83128ba1393b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -17,21 +17,12 @@ package org.apache.spark.deploy -private[deploy] class DriverDescription( - val jarUrl: String, - val mem: Int, - val cores: Int, - val supervise: Boolean, - val command: Command) - extends Serializable { - - def copy( - jarUrl: String = jarUrl, - mem: Int = mem, - cores: Int = cores, - supervise: Boolean = supervise, - command: Command = command): DriverDescription = - new DriverDescription(jarUrl, mem, cores, supervise, command) +private[deploy] case class DriverDescription( + jarUrl: String, + mem: Int, + cores: Int, + supervise: Boolean, + command: Command) extends Description { override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala index ec23371b52f9..b7f9b2d1a4a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala @@ -22,12 +22,11 @@ package org.apache.spark.deploy * This state is sufficient for the Master to reconstruct its internal data structures during * failover. */ -private[deploy] class ExecutorDescription( - val appId: String, - val execId: Int, - val cores: Int, - val state: ExecutorState.Value) - extends Serializable { +private[deploy] case class ExecutorDescription( + appId: String, + execId: Int, + cores: Int, + state: ExecutorState.Value) { override def toString: String = "ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index ccffb3665298..823e5c4939d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -22,7 +22,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.deploy.worker.ExecutorRunner +import org.apache.spark.deploy.worker.ExecutorRunnerInfo private[deploy] object JsonProtocol { def writeWorkerInfo(obj: WorkerInfo): JObject = { @@ -60,11 +60,11 @@ private[deploy] object JsonProtocol { ("command" -> obj.command.toString) } - def writeExecutorRunner(obj: ExecutorRunner): JObject = { - ("id" -> obj.execId) ~ - ("memory" -> obj.memory) ~ + def writeExecutorRunner(obj: ExecutorRunnerInfo): JObject = { + ("id" -> obj.setup.id) ~ + ("memory" -> obj.setup.memory) ~ ("appid" -> obj.appId) ~ - ("appdesc" -> writeApplicationDescription(obj.appDesc)) + ("appdesc" -> writeApplicationDescription(obj.setup.description)) } def writeDriverInfo(obj: DriverInfo): JObject = { diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 83ccaadfe744..2b59b3dda133 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -62,7 +62,7 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, + val (workerEnv, _) = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, memoryPerWorker, masters, null, Some(workerNum), _conf) workerRpcEnvs += workerEnv } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ChildRunnerFactory.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ChildRunnerFactory.scala new file mode 100644 index 000000000000..60a911d76880 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ChildRunnerFactory.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import org.apache.spark.deploy.{DriverDescription, Description, ApplicationDescription, ExecutorState} + +private[deploy] trait ChildRunnerFactory[D <: Description, T <: ChildRunnerInfo[D]] { + def createRunner( + appId: Option[String], + processSetup: ChildProcessCommonSetup[D], + workerSetup: WorkerSetup, + stateChangeListener: StateChangeListener[D, T], + localDirs: Seq[String]): ChildProcessRunner[D, T] +} + +private[deploy] object DriverRunnerFactoryImpl + extends ChildRunnerFactory[DriverDescription, DriverRunnerInfo] { + + override def createRunner( + appId: Option[String], + processSetup: ChildProcessCommonSetup[DriverDescription], + workerSetup: WorkerSetup, + stateChangeListener: StateChangeListener[DriverDescription, DriverRunnerInfo], + localDirs: Seq[String]): DriverRunner = { + + val manager = new DriverRunnerImpl( + processSetup, + workerSetup, + stateChangeListener) + + manager + } +} + +private[deploy] object ExecutorRunnerFactoryImpl + extends ChildRunnerFactory[ApplicationDescription, ExecutorRunnerInfo] { + + override def createRunner( + appId: Option[String], + processSetup: ChildProcessCommonSetup[ApplicationDescription], + workerSetup: WorkerSetup, + stateChangeListener: + StateChangeListener[ApplicationDescription, ExecutorRunnerInfo], + localDirs: Seq[String]): ExecutorRunner = { + + val manager = new ExecutorRunnerImpl( + processSetup, + workerSetup, + stateChangeListener, + appId.get, + localDirs, + ExecutorState.LOADING) + + manager + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunnerImpl.scala similarity index 77% rename from core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala rename to core/src/main/scala/org/apache/spark/deploy/worker/DriverRunnerImpl.scala index 89159ff5e2b3..9fc53f0db451 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunnerImpl.scala @@ -24,29 +24,28 @@ import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.spark.Logging -import org.apache.spark.{Logging, SparkConf, SecurityManager} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} -import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.{Utils, Clock, SystemClock} +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * Manages the execution of one driver, including automatically restarting the driver on failure. * This is currently only used in standalone cluster deploy mode. */ -private[deploy] class DriverRunner( - conf: SparkConf, - val driverId: String, - val workDir: File, - val sparkHome: File, - val driverDesc: DriverDescription, - val worker: RpcEndpointRef, - val workerUrl: String, - val securityManager: SecurityManager) - extends Logging { +private[deploy] class DriverRunnerImpl( + processSetup: ChildProcessCommonSetup[DriverDescription], + workerSetup: WorkerSetup, + stateChangeListener: StateChangeListener[DriverDescription, DriverRunnerInfo]) + extends ChildProcessRunner[DriverDescription, DriverRunnerInfo] + with DriverRunnerInfo with Logging { self => + + override def state: DriverState = finalState.getOrElse(DriverState.RUNNING) + override def info: DriverRunnerImpl = this + override def exception: Option[Exception] = finalException + override def setup: ChildProcessCommonSetup[DriverDescription] = processSetup @volatile private var process: Option[Process] = None @volatile private var killed = false @@ -71,23 +70,26 @@ private[deploy] class DriverRunner( } /** Starts a thread to run and manage the driver. */ - private[worker] def start() = { - new Thread("DriverRunner for " + driverId) { + def start(): Unit = { + new Thread("DriverRunner for " + processSetup.id) { override def run() { try { - val driverDir = createWorkingDirectory() - val localJarFilename = downloadUserJar(driverDir) + val localJarFilename = downloadUserJar(processSetup.workDir) def substituteVariables(argument: String): String = argument match { - case "{{WORKER_URL}}" => workerUrl + case "{{WORKER_URL}}" => workerSetup.workerUri case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here - val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, - driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) - launchDriver(builder, driverDir, driverDesc.supervise) + val builder = CommandUtils.buildProcessBuilder( + processSetup.description.command, + workerSetup.securityManager, + processSetup.memory, + workerSetup.sparkHome.getAbsolutePath, + substituteVariables) + launchDriver(builder, processSetup.workDir, processSetup.description.supervise) } catch { case e: Exception => finalException = Some(e) @@ -107,39 +109,28 @@ private[deploy] class DriverRunner( finalState = Some(state) - worker.send(DriverStateChanged(driverId, state, finalException)) + stateChangeListener(self, None, + finalException orElse finalExitCode.filter(_ != 0).map(new NonZeroExitCodeException(_))) } }.start() } /** Terminate this driver (or prevent it from ever starting if not yet started) */ - private[worker] def kill() { + def kill(): Unit = { synchronized { process.foreach(p => p.destroy()) killed = true } } - /** - * Creates the working directory for this driver. - * Will throw an exception if there are errors preparing the directory. - */ - private def createWorkingDirectory(): File = { - val driverDir = new File(workDir, driverId) - if (!driverDir.exists() && !driverDir.mkdirs()) { - throw new IOException("Failed to create directory " + driverDir) - } - driverDir - } - /** * Download the user jar into the supplied directory and return its local path. * Will throw an exception if there are errors downloading the jar. */ private def downloadUserJar(driverDir: File): String = { - val jarPath = new Path(driverDesc.jarUrl) + val jarPath = new Path(processSetup.description.jarUrl) - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(workerSetup.conf) val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) val jarFileName = jarPath.getName val localJarFile = new File(driverDir, jarFileName) @@ -148,10 +139,10 @@ private[deploy] class DriverRunner( if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") Utils.fetchFile( - driverDesc.jarUrl, + processSetup.description.jarUrl, driverDir, - conf, - securityManager, + workerSetup.conf, + workerSetup.securityManager, hadoopConf, System.currentTimeMillis(), useCache = false) @@ -180,7 +171,7 @@ private[deploy] class DriverRunner( runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) } - def runCommandWithRetry( + private[spark] def runCommandWithRetry( command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = { // Time to wait between submission retries. var waitSeconds = 1 @@ -232,3 +223,7 @@ private[deploy] object ProcessBuilderLike { override def command: Seq[String] = processBuilder.command().asScala } } + +private[deploy] trait DriverRunnerInfo extends ChildRunnerInfo[DriverDescription] { + def state: DriverState +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunnerImpl.scala similarity index 69% rename from core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala rename to core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunnerImpl.scala index 3aef0515cbf6..7064f9ff4c84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunnerImpl.scala @@ -24,47 +24,43 @@ import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.{SecurityManager, SparkConf, Logging} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} -import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.Logging +import org.apache.spark.deploy.ExecutorState._ +import org.apache.spark.deploy.{ApplicationDescription, ExecutorDescription, ExecutorState} +import org.apache.spark.util.ShutdownHookManager import org.apache.spark.util.logging.FileAppender /** * Manages the execution of one executor process. * This is currently only used in standalone mode. */ -private[deploy] class ExecutorRunner( - val appId: String, - val execId: Int, - val appDesc: ApplicationDescription, - val cores: Int, - val memory: Int, - val worker: RpcEndpointRef, - val workerId: String, - val host: String, - val webUiPort: Int, - val publicAddress: String, - val sparkHome: File, - val executorDir: File, - val workerUrl: String, - conf: SparkConf, - val appLocalDirs: Seq[String], +private[deploy] class ExecutorRunnerImpl( + processSetup: ChildProcessCommonSetup[ApplicationDescription], + workerSetup: WorkerSetup, + stateChangedListener: StateChangeListener[ApplicationDescription, ExecutorRunnerInfo], + override val appId: String, + appLocalDirs: Seq[String], @volatile var state: ExecutorState.Value) - extends Logging { + extends ChildProcessRunner[ApplicationDescription, ExecutorRunnerInfo] + with ExecutorRunnerInfo with Logging { self => - private val fullId = appId + "/" + execId + override def info: ExecutorRunnerImpl = this + override def setup: ChildProcessCommonSetup[ApplicationDescription] = processSetup + + private val fullId = appId + "/" + processSetup.id private var workerThread: Thread = null private var process: Process = null private var stdoutAppender: FileAppender = null private var stderrAppender: FileAppender = null + @volatile var exception: Option[Exception] = None + // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might // make sense to remove this in the future. private var shutdownHook: AnyRef = null - private[worker] def start() { + override def start() { + exception = None workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } } @@ -92,11 +88,12 @@ private[deploy] class ExecutorRunner( process.destroy() exitCode = Some(process.waitFor()) } - worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode)) + stateChangedListener(this, message, + exitCode.filter(_ != 0).map(new NonZeroExitCodeException(_))) } /** Stop this executor runner, including killing the process it launched */ - private[worker] def kill() { + override def kill() { if (workerThread != null) { // the workerThread will kill the child process when interrupted workerThread.interrupt() @@ -112,10 +109,10 @@ private[deploy] class ExecutorRunner( /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ private[worker] def substituteVariables(argument: String): String = argument match { - case "{{WORKER_URL}}" => workerUrl - case "{{EXECUTOR_ID}}" => execId.toString - case "{{HOSTNAME}}" => host - case "{{CORES}}" => cores.toString + case "{{WORKER_URL}}" => workerSetup.workerUri + case "{{EXECUTOR_ID}}" => processSetup.id + case "{{HOSTNAME}}" => processSetup.host + case "{{CORES}}" => processSetup.cores.toString case "{{APP_ID}}" => appId case other => other } @@ -126,13 +123,17 @@ private[deploy] class ExecutorRunner( private def fetchAndRunExecutor() { try { // Launch the process - val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), - memory, sparkHome.getAbsolutePath, substituteVariables) + val builder = CommandUtils.buildProcessBuilder( + processSetup.description.command, + workerSetup.securityManager, + processSetup.memory, + workerSetup.sparkHome.getAbsolutePath, + substituteVariables) val command = builder.command() val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") logInfo(s"Launch command: $formattedCommand") - builder.directory(executorDir) + builder.directory(processSetup.workDir) builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command @@ -140,7 +141,8 @@ private[deploy] class ExecutorRunner( // Add webUI log urls val baseUrl = - s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + s"http://${processSetup.publicAddress}:${processSetup.webUIPort}" + + s"/logPage/?appId=$appId&executorId=${processSetup.id}&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") @@ -149,19 +151,20 @@ private[deploy] class ExecutorRunner( formattedCommand, "=" * 40) // Redirect its stdout and stderr to files - val stdout = new File(executorDir, "stdout") - stdoutAppender = FileAppender(process.getInputStream, stdout, conf) + val stdout = new File(processSetup.workDir, "stdout") + stdoutAppender = FileAppender(process.getInputStream, stdout, workerSetup.conf) - val stderr = new File(executorDir, "stderr") + val stderr = new File(processSetup.workDir, "stderr") Files.write(header, stderr, UTF_8) - stderrAppender = FileAppender(process.getErrorStream, stderr, conf) + stderrAppender = FileAppender(process.getErrorStream, stderr, workerSetup.conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code val exitCode = process.waitFor() state = ExecutorState.EXITED val message = "Command exited with code " + exitCode - worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) + stateChangedListener(self, Some(message), + Some(exitCode).filter(_ != 0).map(new NonZeroExitCodeException(_))) } catch { case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") @@ -171,8 +174,18 @@ private[deploy] class ExecutorRunner( case e: Exception => { logError("Error running executor", e) state = ExecutorState.FAILED + exception = Some(e) killProcess(Some(e.toString)) } } } } + +private[deploy] trait ExecutorRunnerInfo extends ChildRunnerInfo[ApplicationDescription] { + def appId: String + def state: ExecutorState + + def createExecutorDescription(): ExecutorDescription = { + new ExecutorDescription(appId, setup.id.toInt, setup.cores, state) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a45867e7680e..029f0902cae1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -24,15 +24,14 @@ import java.util.{UUID, Date} import java.util.concurrent._ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} -import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} +import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.util.{Failure, Random, Success} import scala.util.control.NonFatal import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} +import org.apache.spark.deploy._ import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem @@ -110,16 +109,17 @@ private[deploy] class Worker( assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!") new File(sys.props("spark.test.home")) } else { - new File(sys.env.get("SPARK_HOME").getOrElse(".")) + new File(sys.env.getOrElse("SPARK_HOME", ".")) } var workDir: File = null - val finishedExecutors = new LinkedHashMap[String, ExecutorRunner] - val drivers = new HashMap[String, DriverRunner] - val executors = new HashMap[String, ExecutorRunner] - val finishedDrivers = new LinkedHashMap[String, DriverRunner] - val appDirectories = new HashMap[String, Seq[String]] - val finishedApps = new HashSet[String] + val finishedExecutors = new mutable.LinkedHashMap[String, ExecutorRunner] + val drivers = new mutable.HashMap[String, DriverRunner] + val executors = new mutable.HashMap[String, ExecutorRunner] + val finishedDrivers = new mutable.LinkedHashMap[String, DriverRunner] + val appDirectories = new mutable.HashMap[String, Seq[String]] + val apps = new mutable.HashMap[String, ApplicationDescription] + val finishedApps = new mutable.HashSet[String] val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors", WorkerWebUI.DEFAULT_RETAINED_EXECUTORS) @@ -156,6 +156,36 @@ private[deploy] class Worker( var coresUsed = 0 var memoryUsed = 0 + val workerSetup = new WorkerSetup( + workerUri, + sparkHome, + conf, + securityMgr + ) + + val executorRunnerFactory = childProcessFactory[ApplicationDescription, ExecutorRunnerInfo]( + "spark.worker.executorRunnerFactory").getOrElse(ExecutorRunnerFactoryImpl) + + val driverRunnerFactory = childProcessFactory[DriverDescription, DriverRunnerInfo]( + "spark.worker.driverRunnerFactory").getOrElse(DriverRunnerFactoryImpl) + + private def childProcessFactory[D <: Description, T <: ChildRunnerInfo[D]]( + classNameProperty: String): Option[ChildRunnerFactory[D, T]] = { + + for (className <- conf.getOption(classNameProperty); + instance <- try { + val cls = Utils.classForName(className) + val instance = Some(cls.newInstance().asInstanceOf[ChildRunnerFactory[D, T]]) + logInfo(s"Using custom child process factory implementation $className") + instance + } catch { + case ex: Exception => + logError(s"Failed to create a custom child process factory $className. " + + s"Falling back to default implementation.") + None + }) yield instance + } + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -394,7 +424,7 @@ private[deploy] class Worker( // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker // rpcEndpoint. // Copy ids so that it can be used in the cleanup thread. - val appIds = executors.values.map(_.appId).toSet + val appIds = executors.values.map(_.info.appId).toSet val cleanupFuture = concurrent.future { val appDirs = workDir.listFiles() if (appDirs == null) { @@ -422,8 +452,7 @@ private[deploy] class Worker( logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) changeMaster(masterRef, masterWebUiUrl) - val execs = executors.values. - map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) + val execs = executors.values.map(e => e.info.createExecutorDescription()) masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) case ReconnectWorker(masterUrl) => @@ -454,27 +483,26 @@ private[deploy] class Worker( }.toSeq } appDirectories(appId) = appLocalDirs - val manager = new ExecutorRunner( - appId, - execId, - appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), + val processSetup = ChildProcessCommonSetup( + execId.toString, cores_, memory_, - self, - workerId, host, - webUi.boundPort, publicAddress, - sparkHome, + webUi.boundPort, executorDir, - workerUri, - conf, - appLocalDirs, ExecutorState.LOADING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) + appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf))) + + val manager = executorRunnerFactory.createRunner( + Some(appId), + processSetup, + workerSetup, + onExecutorStateChange, + appLocalDirs) + + startRunner(appId + "/" + execId, manager, executors) + apps += appId -> appDesc + sendToMaster(ExecutorStateChanged(appId, execId, manager.info.state, None, None)) } catch { case e: Exception => { logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) @@ -505,25 +533,34 @@ private[deploy] class Worker( } } - case LaunchDriver(driverId, driverDesc) => { + case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner( - conf, - driverId, - workDir, - sparkHome, - driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), - self, - workerUri, - securityMgr) - drivers(driverId) = driver - driver.start() - - coresUsed += driverDesc.cores - memoryUsed += driverDesc.mem - } - case KillDriver(driverId) => { + val driverDir = new File(workDir, driverId) + if (!driverDir.exists() && !driverDir.mkdirs()) { + throw new IOException("Failed to create directory " + driverDir) + } + + val processSetup = new ChildProcessCommonSetup( + driverId, + cores, + memory, + host, + publicAddress, + webUi.boundPort, + driverDir, + driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf))) + + val driver = driverRunnerFactory.createRunner( + None, + processSetup, + workerSetup, + onDriverStateChange, + Nil) + + startRunner(driverId, driver, drivers) + + case KillDriver(driverId) => logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => @@ -531,11 +568,9 @@ private[deploy] class Worker( case None => logError(s"Asked to kill unknown driver $driverId") } - } - case driverStateChanged @ DriverStateChanged(driverId, state, exception) => { + case driverStateChanged @ DriverStateChanged(driverId, state, exception) => handleDriverStateChanged(driverStateChanged) - } case ReregisterWithMaster => reregisterWithMaster() @@ -545,12 +580,46 @@ private[deploy] class Worker( maybeCleanupApplication(id) } + private def onExecutorStateChange( + runner: ExecutorRunner, + message: Option[String], + exception: Option[Exception]) { + + val exitCode = if (ExecutorState.isFinished(runner.info.state)) { + exception.collectFirst { + case ex: NonZeroExitCodeException => Some(ex.exitCode) + }.getOrElse(Some(0)) + } else None + + self.send(ExecutorStateChanged( + runner.info.appId, runner.info.setup.id.toInt, runner.info.state, message, exitCode)) + } + + private def onDriverStateChange( + runner: DriverRunner, + message: Option[String], + exception: Option[Exception]): Unit = { + + self.send(DriverStateChanged( + runner.info.setup.id, runner.info.state, exception)) + } + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RequestWorkerState => - context.reply(WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, drivers.values.toList, - finishedDrivers.values.toList, activeMasterUrl, cores, memory, - coresUsed, memoryUsed, activeMasterWebUiUrl)) + context.reply(WorkerStateResponse( + host, + port, + workerId, + executors.values.map(_.info).toList, + finishedExecutors.values.map(_.info).toList, + drivers.values.map(_.info).toList, + finishedDrivers.values.map(_.info).toList, + activeMasterUrl, + cores, + memory, + coresUsed, + memoryUsed, + activeMasterWebUiUrl)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -567,7 +636,7 @@ private[deploy] class Worker( } private def maybeCleanupApplication(id: String): Unit = { - val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) + val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.info.appId == id) if (shouldCleanup) { finishedApps -= id appDirectories.remove(id).foreach { dirList => @@ -610,22 +679,15 @@ private[deploy] class Worker( metricsSystem.stop() } - private def trimFinishedExecutorsIfNecessary(): Unit = { - // do not need to protect with locks since both WorkerPage and Restful server get data through - // thread-safe RpcEndPoint - if (finishedExecutors.size > retainedExecutors) { - finishedExecutors.take(math.max(finishedExecutors.size / 10, 1)).foreach { - case (executorId, _) => finishedExecutors.remove(executorId) - } - } - } + private def trimFinishedRunnersIfNecessary[D <: Description, T <: ChildRunnerInfo[D]]( + finishedRunners: mutable.Map[String, ChildProcessRunner[D, T]], + retainedRunners: Int): Unit = { - private def trimFinishedDriversIfNecessary(): Unit = { // do not need to protect with locks since both WorkerPage and Restful server get data through // thread-safe RpcEndPoint - if (finishedDrivers.size > retainedDrivers) { - finishedDrivers.take(math.max(finishedDrivers.size / 10, 1)).foreach { - case (driverId, _) => finishedDrivers.remove(driverId) + if (finishedRunners.size > retainedRunners) { + finishedRunners.take(math.max(finishedRunners.size / 10, 1)).foreach { + case (id, _) => finishedRunners.remove(id) } } } @@ -647,11 +709,7 @@ private[deploy] class Worker( logDebug(s"Driver $driverId changed state to $state") } sendToMaster(driverStateChanged) - val driver = drivers.remove(driverId).get - finishedDrivers(driverId) = driver - trimFinishedDriversIfNecessary() - memoryUsed -= driver.driverDesc.mem - coresUsed -= driver.driverDesc.cores + finishRunner(driverId, drivers, finishedDrivers, retainedDrivers) } private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged): @@ -668,11 +726,7 @@ private[deploy] class Worker( logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) - executors -= fullId - finishedExecutors(fullId) = executor - trimFinishedExecutorsIfNecessary() - coresUsed -= executor.cores - memoryUsed -= executor.memory + finishRunner(fullId, executors, finishedExecutors, retainedExecutors) case None => logInfo("Unknown Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + @@ -681,6 +735,30 @@ private[deploy] class Worker( maybeCleanupApplication(appId) } } + + private def startRunner[D <: Description, T <: ChildRunnerInfo[D]]( + runnerId: String, + runner: ChildProcessRunner[D, T], + runners: mutable.Map[String, ChildProcessRunner[D, T]]): Unit = { + + runners(runnerId) = runner + runner.start() + coresUsed += runner.info.setup.cores + memoryUsed += runner.info.setup.memory + } + + private def finishRunner[D <: Description, T <: ChildRunnerInfo[D]]( + runnerId: String, + runners: mutable.Map[String, ChildProcessRunner[D, T]], + finishedRunners: mutable.Map[String, ChildProcessRunner[D, T]], + retainedRunners: Int): Unit = { + + val runner = runners.remove(runnerId).get + finishedRunners(runnerId) = runner + trimFinishedRunnersIfNecessary(finishedRunners, retainedRunners) + coresUsed -= runner.info.setup.cores + memoryUsed -= runner.info.setup.memory + } } private[deploy] object Worker extends Logging { @@ -691,8 +769,14 @@ private[deploy] object Worker extends Logging { SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) - val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.masters, args.workDir) + val (rpcEnv, _) = startRpcEnvAndEndpoint( + args.host, + args.port, + args.webUiPort, + args.cores, + args.memory, + args.masters, + args.workDir) rpcEnv.awaitTermination() } @@ -705,16 +789,26 @@ private[deploy] object Worker extends Logging { masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, - conf: SparkConf = new SparkConf): RpcEnv = { + conf: SparkConf = new SparkConf): (RpcEnv, Worker) = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) - val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, - masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) - rpcEnv + val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL) + val worker = new Worker( + rpcEnv, + webUiPort, + cores, + memory, + masterAddresses, + systemName, + ENDPOINT_NAME, + workDir, + conf, + securityMgr) + rpcEnv.setupEndpoint(ENDPOINT_NAME, worker) + (rpcEnv, worker) } def isUseLocalNodeSSLConfig(cmd: Command): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/package.scala b/core/src/main/scala/org/apache/spark/deploy/worker/package.scala new file mode 100644 index 000000000000..08915fefc588 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/package.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.executor.ExecutorExitCode + +package object worker { + + type StateChangeListener[D <: Description, T <: ChildRunnerInfo[D]] = + (ChildProcessRunner[D, T], Option[String], Option[Exception]) => Unit + + type ExecutorRunner = ChildProcessRunner[ApplicationDescription, ExecutorRunnerInfo] + type DriverRunner = ChildProcessRunner[DriverDescription, DriverRunnerInfo] + + private[deploy] trait ChildRunnerInfo[+T <: Description] { + def setup: ChildProcessCommonSetup[T] + def state: Enumeration#Value + def exception: Option[Exception] + } + + private[deploy] trait ChildProcessRunner[D <: Description, T <: ChildRunnerInfo[D]] { + def start(): Unit + + def kill(): Unit + + def info: T + } + + private[spark] class NonZeroExitCodeException(val exitCode: Int) + extends Exception(ExecutorExitCode.explainExitCode(exitCode)) + + case class ChildProcessCommonSetup[+T <: Description]( + id: String, + cores: Int, + memory: Int, + host: String, + publicAddress: String, + webUIPort: Int, + workDir: File, + description: T + ) + + case class WorkerSetup( + workerUri: String, + sparkHome: File, + conf: SparkConf, + securityManager: SecurityManager + ) + + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index fd905feb97e9..6c3ddf4d7718 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -20,12 +20,11 @@ package org.apache.spark.deploy.worker.ui import scala.xml.Node import javax.servlet.http.HttpServletRequest +import org.apache.spark.deploy.worker.{DriverRunnerInfo, ExecutorRunnerInfo} import org.json4s.JValue -import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.{ApplicationDescription, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -import org.apache.spark.deploy.master.DriverState -import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils @@ -49,9 +48,9 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") - val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse + val runningDrivers = workerState.drivers.sortBy(_.setup.id).reverse val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) - val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse + val finishedDrivers = workerState.finishedDrivers.sortBy(_.setup.id).reverse val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. @@ -100,48 +99,48 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { workerState.host, workerState.port)) } - def executorRow(executor: ExecutorRunner): Seq[Node] = { + def executorRow(executor: ExecutorRunnerInfo): Seq[Node] = { - {executor.execId} - {executor.cores} + {executor.setup.id} + {executor.setup.cores} {executor.state} - - {Utils.megabytesToString(executor.memory)} + + {Utils.megabytesToString(executor.setup.memory)} stdout + .format(executor.appId, executor.setup.id)}>stdout stderr + .format(executor.appId, executor.setup.id)}>stderr } - def driverRow(driver: DriverRunner): Seq[Node] = { + def driverRow(driver: DriverRunnerInfo): Seq[Node] = { - {driver.driverId} - {driver.driverDesc.command.arguments(2)} - {driver.finalState.getOrElse(DriverState.RUNNING)} - - {driver.driverDesc.cores.toString} + {driver.setup.id} + {driver.setup.description.command.mainClass} + {driver.state} + + {driver.setup.cores.toString} - - {Utils.megabytesToString(driver.driverDesc.mem)} + + {Utils.megabytesToString(driver.setup.memory)} - stdout - stderr + stdout + stderr - {driver.finalException.getOrElse("")} + {driver.exception.getOrElse("")} } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 967aa0976f0c..b6e08dd522af 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -17,11 +17,11 @@ package org.apache.spark.deploy -import java.io.File +import java.io.{IOException, File} import java.util.Date import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} +import org.apache.spark.deploy.worker._ import org.apache.spark.{SecurityManager, SparkConf} private[deploy] object DeployTestUtils { @@ -55,35 +55,37 @@ private[deploy] object DeployTestUtils { } def createExecutorRunner(execId: Int): ExecutorRunner = { - new ExecutorRunner( - "appId", - execId, - createAppDesc(), - 4, - 1234, - null, - "workerId", - "host", - 123, - "publicAddress", - new File("sparkHome"), - new File("workDir"), - "akka://worker", - new SparkConf, - Seq("localDir"), - ExecutorState.RUNNING) + val appDesc = createAppDesc() + val processSetup = new ChildProcessCommonSetup( + execId.toString, 4, 1234, "host", "publicAddress", 123, new File("workDir"), appDesc) + val conf = new SparkConf + val workerSetup = new WorkerSetup( + "akka://worker", new File("sparkHome"), conf, new SecurityManager(conf)) + ExecutorRunnerFactoryImpl.createRunner( + Some("appId"), + processSetup, + workerSetup, + (x, y, z) => {}, + Seq("localDir")) } def createDriverRunner(driverId: String): DriverRunner = { val conf = new SparkConf() - new DriverRunner( - conf, - driverId, - new File("workDir"), - new File("sparkHome"), - createDriverDesc(), - null, - "akka://worker", - new SecurityManager(conf)) + val driverDir = new File("workDir", driverId) + if (!driverDir.exists() && !driverDir.mkdirs()) { + throw new IOException("Failed to create directory " + driverDir) + } + val driverDesc = createDriverDesc() + val processSetup = new ChildProcessCommonSetup( + driverId, driverDesc.cores, driverDesc.mem, "host", "publicAddress", 123, + new File("workDir"), driverDesc) + val workerSetup = new WorkerSetup( + "akka://worker", new File("sparkHome"), conf, new SecurityManager(conf)) + DriverRunnerFactoryImpl.createRunner( + None, + processSetup, + workerSetup, + (x, y, z) => {}, + Nil) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 0a9f128a3a6b..cc1ccc3af015 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -20,12 +20,12 @@ package org.apache.spark.deploy import java.util.Date import com.fasterxml.jackson.core.JsonParseException +import org.apache.spark.deploy.worker.ExecutorRunnerInfo import org.json4s._ import org.json4s.jackson.JsonMethods import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState} -import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.{JsonTestUtils, SparkFunSuite} class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { @@ -51,7 +51,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { } test("writeExecutorRunner") { - val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123)) + val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123).info) assertValidJson(output) assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) } @@ -77,11 +77,12 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { } test("writeWorkerState") { - val executors = List[ExecutorRunner]() - val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123), - createExecutorRunner(123)) - val drivers = List(createDriverRunner("driverId")) - val finishedDrivers = List(createDriverRunner("driverId"), createDriverRunner("driverId")) + val executors = List[ExecutorRunnerInfo]() + val finishedExecutors = List[ExecutorRunnerInfo](createExecutorRunner(123).info, + createExecutorRunner(123).info) + val drivers = List(createDriverRunner("driverId").info) + val finishedDrivers = List(createDriverRunner("driverId").info, + createDriverRunner("driverId").info) val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors, finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") val output = JsonProtocol.writeWorkerState(stateResponse) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 6258c18d177f..a5beae677dc1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -33,8 +33,12 @@ class DriverRunnerTest extends SparkFunSuite { val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) val conf = new SparkConf() - new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf)) + val processSetup = ChildProcessCommonSetup( + "driverId", driverDescription.cores, driverDescription.mem, "localhost", "localhost", 8081, + new File("workDir"), driverDescription) + val workerSetup = WorkerSetup("akka://1.2.3.4/worker/", new File("sparkHome"), + conf, new SecurityManager(conf)) + new DriverRunnerImpl(processSetup, workerSetup, (x, y, z) => {}) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 98664dc1101e..cdca79b94467 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -29,9 +29,11 @@ class ExecutorRunnerTest extends SparkFunSuite { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, - "publicAddr", new File(sparkHome), new File("ooga"), "blah", conf, Seq("localDir"), - ExecutorState.RUNNING) + val processSetup = ChildProcessCommonSetup("1", 8, 500, "worker123", "publicAddr", 123, + new File("ooga"), appDesc) + val workerSetup = WorkerSetup("blah", new File(sparkHome), conf, new SecurityManager(conf)) + val er = new ExecutorRunnerImpl(processSetup, workerSetup, (x, y, z) => {}, + appId, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder( appDesc.command, new SecurityManager(conf), 512, sparkHome, er.substituteVariables) val builderCommand = builder.command()