diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala new file mode 100644 index 000000000000..118b4605675b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.lang.reflect.Modifier + +import org.apache.spark.SparkConf + +/** + * Entry point for a Spark application. Implementations must provide a no-argument constructor. + */ +private[spark] trait SparkApplication { + + def start(args: Array[String], conf: SparkConf): Unit + +} + +/** + * Implementation of SparkApplication that wraps a standard Java class with a "main" method. + * + * Configuration is propagated to the application via system properties, so running multiple + * of these in the same JVM may lead to undefined behavior due to configuration leaks. + */ +private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { + + override def start(args: Array[String], conf: SparkConf): Unit = { + val mainMethod = klass.getMethod("main", new Array[String](0).getClass) + if (!Modifier.isStatic(mainMethod.getModifiers)) { + throw new IllegalStateException("The main method in the given main class must be static") + } + + val sysProps = conf.getAll.toMap + sysProps.foreach { case (k, v) => + sys.props(k) = v + } + + mainMethod.invoke(null, args) + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b7e6d0ea021a..73b956ef3e47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -158,7 +158,7 @@ object SparkSubmit extends CommandLineUtils with Logging { */ @tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { - val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) + val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { @@ -167,7 +167,7 @@ object SparkSubmit extends CommandLineUtils with Logging { try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { - runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } }) } catch { @@ -185,7 +185,7 @@ object SparkSubmit extends CommandLineUtils with Logging { } } } else { - runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } } @@ -235,11 +235,11 @@ object SparkSubmit extends CommandLineUtils with Logging { private[deploy] def prepareSubmitEnvironment( args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) - : (Seq[String], Seq[String], Map[String, String], String) = { + : (Seq[String], Seq[String], SparkConf, String) = { // Return values val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() - val sysProps = new HashMap[String, String]() + val sparkConf = new SparkConf() var childMainClass = "" // Set the cluster manager @@ -337,7 +337,6 @@ object SparkSubmit extends CommandLineUtils with Logging { } } - val sparkConf = new SparkConf(false) args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) } val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() @@ -351,8 +350,8 @@ object SparkSubmit extends CommandLineUtils with Logging { // for later use; e.g. in spark sql, the isolated class loader used to talk // to HiveMetastore will use these settings. They will be set as Java system // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) + sparkConf.set(KEYTAB, args.keytab) + sparkConf.set(PRINCIPAL, args.principal) UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } @@ -364,23 +363,24 @@ object SparkSubmit extends CommandLineUtils with Logging { args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull + // This security manager will not need an auth secret, but set a dummy value in case + // spark.authenticate is enabled, otherwise an exception is thrown. + lazy val downloadConf = sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + lazy val secMgr = new SecurityManager(downloadConf) + // In client mode, download remote files. var localPrimaryResource: String = null var localJars: String = null var localPyFiles: String = null if (deployMode == CLIENT) { - // This security manager will not need an auth secret, but set a dummy value in case - // spark.authenticate is enabled, otherwise an exception is thrown. - sparkConf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") - val secMgr = new SecurityManager(sparkConf) localPrimaryResource = Option(args.primaryResource).map { - downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr) }.orNull localJars = Option(args.jars).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) }.orNull localPyFiles = Option(args.pyFiles).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) }.orNull } @@ -409,7 +409,7 @@ object SparkSubmit extends CommandLineUtils with Logging { if (file.exists()) { file.toURI.toString } else { - downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) + downloadFile(resource, targetDir, downloadConf, hadoopConf, secMgr) } case _ => uri.toString } @@ -449,7 +449,7 @@ object SparkSubmit extends CommandLineUtils with Logging { args.files = mergeFileLists(args.files, args.pyFiles) } if (localPyFiles != null) { - sysProps("spark.submit.pyFiles") = localPyFiles + sparkConf.set("spark.submit.pyFiles", localPyFiles) } } @@ -515,69 +515,69 @@ object SparkSubmit extends CommandLineUtils with Logging { } // Special flag to avoid deprecation warnings at the client - sysProps("SPARK_SUBMIT") = "true" + sys.props("SPARK_SUBMIT") = "true" // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( // All cluster managers - OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), + OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"), OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - sysProp = "spark.submit.deployMode"), - OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), + confKey = "spark.submit.deployMode"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, - sysProp = "spark.driver.memory"), + confKey = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - sysProp = "spark.driver.extraClassPath"), + confKey = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - sysProp = "spark.driver.extraJavaOptions"), + confKey = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - sysProp = "spark.driver.extraLibraryPath"), + confKey = "spark.driver.extraLibraryPath"), // Propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"), OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, - sysProp = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + confKey = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"), OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, - CLUSTER, sysProp = "spark.jars.excludes"), + CLUSTER, confKey = "spark.jars.excludes"), // Yarn only - OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), + OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, - sysProp = "spark.executor.instances"), - OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"), - OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), - OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), - OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), + confKey = "spark.executor.instances"), + OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"), + OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"), + OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"), + OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"), + OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, - sysProp = "spark.executor.cores"), + confKey = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, - sysProp = "spark.executor.memory"), + confKey = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, - sysProp = "spark.cores.max"), + confKey = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, - sysProp = "spark.files"), - OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), - OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), + confKey = "spark.files"), + OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, - sysProp = "spark.driver.memory"), + confKey = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, - sysProp = "spark.driver.cores"), + confKey = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, - sysProp = "spark.driver.supervise"), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), + confKey = "spark.driver.supervise"), + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to // remote jars, so adding a new option to only specify local jars for spark-shell internally. - OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars") + OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars") ) // In client mode, launch the application main class directly @@ -610,24 +610,24 @@ object SparkSubmit extends CommandLineUtils with Logging { (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } - if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } + if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) } } } // In case of shells, spark.ui.showConsoleProgress can be true by default or by user. if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) { - sysProps(UI_SHOW_CONSOLE_PROGRESS.key) = "true" + sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true) } // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python and R files, the primary resource is already distributed as a regular file if (!isYarnCluster && !args.isPython && !args.isR) { - var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) + var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } - sysProps.put("spark.jars", jars.mkString(",")) + sparkConf.set("spark.jars", jars.mkString(",")) } // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+). @@ -653,12 +653,12 @@ object SparkSubmit extends CommandLineUtils with Logging { // Let YARN know it's a pyspark app, so it distributes needed libraries. if (clusterManager == YARN) { if (args.isPython) { - sysProps.put("spark.yarn.isPython", "true") + sparkConf.set("spark.yarn.isPython", "true") } } if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { - setRMPrincipal(sysProps) + setRMPrincipal(sparkConf) } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class @@ -689,7 +689,7 @@ object SparkSubmit extends CommandLineUtils with Logging { // Second argument is main class childArgs += (args.primaryResource, "") if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles + sparkConf.set("spark.submit.pyFiles", args.pyFiles) } } else if (args.isR) { // Second argument is main class @@ -704,12 +704,12 @@ object SparkSubmit extends CommandLineUtils with Logging { // Load any properties specified through --conf and the default properties file for ((k, v) <- args.sparkProperties) { - sysProps.getOrElseUpdate(k, v) + sparkConf.setIfMissing(k, v) } // Ignore invalid spark.driver.host in cluster modes. if (deployMode == CLUSTER) { - sysProps -= "spark.driver.host" + sparkConf.remove("spark.driver.host") } // Resolve paths in certain spark properties @@ -721,15 +721,15 @@ object SparkSubmit extends CommandLineUtils with Logging { "spark.yarn.dist.jars") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist - sysProps.get(config).foreach { oldValue => - sysProps(config) = Utils.resolveURIs(oldValue) + sparkConf.getOption(config).foreach { oldValue => + sparkConf.set(config, Utils.resolveURIs(oldValue)) } } // Resolve and format python file paths properly before adding them to the PYTHONPATH. // The resolving part is redundant in the case of --py-files, but necessary if the user // explicitly sets `spark.submit.pyFiles` in his/her default properties file. - sysProps.get("spark.submit.pyFiles").foreach { pyFiles => + sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles => val resolvedPyFiles = Utils.resolveURIs(pyFiles) val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) { PythonRunner.formatPaths(resolvedPyFiles).mkString(",") @@ -739,22 +739,22 @@ object SparkSubmit extends CommandLineUtils with Logging { // locally. resolvedPyFiles } - sysProps("spark.submit.pyFiles") = formattedPyFiles + sparkConf.set("spark.submit.pyFiles", formattedPyFiles) } - (childArgs, childClasspath, sysProps, childMainClass) + (childArgs, childClasspath, sparkConf, childMainClass) } // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we // must trick it into thinking we're YARN. - private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = { + private def setRMPrincipal(sparkConf: SparkConf): Unit = { val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" // scalastyle:off println printStream.println(s"Setting ${key} to ${shortUserName}") // scalastyle:off println - sysProps.put(key, shortUserName) + sparkConf.set(key, shortUserName) } /** @@ -766,7 +766,7 @@ object SparkSubmit extends CommandLineUtils with Logging { private def runMain( childArgs: Seq[String], childClasspath: Seq[String], - sysProps: Map[String, String], + sparkConf: SparkConf, childMainClass: String, verbose: Boolean): Unit = { // scalastyle:off println @@ -774,14 +774,14 @@ object SparkSubmit extends CommandLineUtils with Logging { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") // sysProps may contain sensitive information, so redact before printing - printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}") + printStream.println(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}") printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") printStream.println("\n") } // scalastyle:on println val loader = - if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { + if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { @@ -794,10 +794,6 @@ object SparkSubmit extends CommandLineUtils with Logging { addJarToClasspath(jar, loader) } - for ((key, value) <- sysProps) { - System.setProperty(key, value) - } - var mainClass: Class[_] = null try { @@ -823,14 +819,14 @@ object SparkSubmit extends CommandLineUtils with Logging { System.exit(CLASS_NOT_FOUND_EXIT_STATUS) } - // SPARK-4170 - if (classOf[scala.App].isAssignableFrom(mainClass)) { - printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") - } - - val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) - if (!Modifier.isStatic(mainMethod.getModifiers)) { - throw new IllegalStateException("The main method in the given main class must be static") + val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { + mainClass.newInstance().asInstanceOf[SparkApplication] + } else { + // SPARK-4170 + if (classOf[scala.App].isAssignableFrom(mainClass)) { + printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") + } + new JavaMainApplication(mainClass) } @tailrec @@ -844,7 +840,7 @@ object SparkSubmit extends CommandLineUtils with Logging { } try { - mainMethod.invoke(null, childArgs.toArray) + app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => findCause(t) match { @@ -1271,4 +1267,4 @@ private case class OptionAssigner( clusterManager: Int, deployMode: Int, clOption: String = null, - sysProp: String = null) + confKey: String = null) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b52da4c0c8bc..cfbf56fb8c36 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -176,10 +176,10 @@ class SparkSubmitSuite "thejar.jar" ) val appArgs = new SparkSubmitArguments(clArgs) - val (_, _, sysProps, _) = prepareSubmitEnvironment(appArgs) + val (_, _, conf, _) = prepareSubmitEnvironment(appArgs) appArgs.deployMode should be ("client") - sysProps("spark.submit.deployMode") should be ("client") + conf.get("spark.submit.deployMode") should be ("client") // Both cmd line and configuration are specified, cmdline option takes the priority val clArgs1 = Seq( @@ -190,10 +190,10 @@ class SparkSubmitSuite "thejar.jar" ) val appArgs1 = new SparkSubmitArguments(clArgs1) - val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1) + val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1) appArgs1.deployMode should be ("cluster") - sysProps1("spark.submit.deployMode") should be ("cluster") + conf1.get("spark.submit.deployMode") should be ("cluster") // Neither cmdline nor configuration are specified, client mode is the default choice val clArgs2 = Seq( @@ -204,9 +204,9 @@ class SparkSubmitSuite val appArgs2 = new SparkSubmitArguments(clArgs2) appArgs2.deployMode should be (null) - val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2) + val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2) appArgs2.deployMode should be ("client") - sysProps2("spark.submit.deployMode") should be ("client") + conf2.get("spark.submit.deployMode") should be ("client") } test("handles YARN cluster mode") { @@ -227,7 +227,7 @@ class SparkSubmitSuite "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) val childArgsStr = childArgs.mkString(" ") childArgsStr should include ("--class org.SomeClass") childArgsStr should include ("--arg arg1 --arg arg2") @@ -240,16 +240,16 @@ class SparkSubmitSuite classpath(2) should endWith ("two.jar") classpath(3) should endWith ("three.jar") - sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.driver.memory") should be ("4g") - sysProps("spark.executor.cores") should be ("5") - sysProps("spark.yarn.queue") should be ("thequeue") - sysProps("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") - sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") - sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") - sysProps("spark.app.name") should be ("beauty") - sysProps("spark.ui.enabled") should be ("false") - sysProps("SPARK_SUBMIT") should be ("true") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.driver.memory") should be ("4g") + conf.get("spark.executor.cores") should be ("5") + conf.get("spark.yarn.queue") should be ("thequeue") + conf.get("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") + conf.get("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") + conf.get("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") + conf.get("spark.app.name") should be ("beauty") + conf.get("spark.ui.enabled") should be ("false") + sys.props("SPARK_SUBMIT") should be ("true") } test("handles YARN client mode") { @@ -270,7 +270,7 @@ class SparkSubmitSuite "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (4) @@ -278,17 +278,17 @@ class SparkSubmitSuite classpath(1) should endWith ("one.jar") classpath(2) should endWith ("two.jar") classpath(3) should endWith ("three.jar") - sysProps("spark.app.name") should be ("trill") - sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.executor.cores") should be ("5") - sysProps("spark.yarn.queue") should be ("thequeue") - sysProps("spark.executor.instances") should be ("6") - sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") - sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") - sysProps("spark.yarn.dist.jars") should include + conf.get("spark.app.name") should be ("trill") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.executor.cores") should be ("5") + conf.get("spark.yarn.queue") should be ("thequeue") + conf.get("spark.executor.instances") should be ("6") + conf.get("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") + conf.get("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") + conf.get("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar") - sysProps("SPARK_SUBMIT") should be ("true") - sysProps("spark.ui.enabled") should be ("false") + conf.get("spark.ui.enabled") should be ("false") + sys.props("SPARK_SUBMIT") should be ("true") } test("handles standalone cluster mode") { @@ -316,7 +316,7 @@ class SparkSubmitSuite "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) appArgs.useRest = useRest - val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) val childArgsStr = childArgs.mkString(" ") if (useRest) { childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2") @@ -327,17 +327,18 @@ class SparkSubmitSuite mainClass should be ("org.apache.spark.deploy.Client") } classpath should have size 0 - sysProps should have size 9 - sysProps.keys should contain ("SPARK_SUBMIT") - sysProps.keys should contain ("spark.master") - sysProps.keys should contain ("spark.app.name") - sysProps.keys should contain ("spark.jars") - sysProps.keys should contain ("spark.driver.memory") - sysProps.keys should contain ("spark.driver.cores") - sysProps.keys should contain ("spark.driver.supervise") - sysProps.keys should contain ("spark.ui.enabled") - sysProps.keys should contain ("spark.submit.deployMode") - sysProps("spark.ui.enabled") should be ("false") + sys.props("SPARK_SUBMIT") should be ("true") + + val confMap = conf.getAll.toMap + confMap.keys should contain ("spark.master") + confMap.keys should contain ("spark.app.name") + confMap.keys should contain ("spark.jars") + confMap.keys should contain ("spark.driver.memory") + confMap.keys should contain ("spark.driver.cores") + confMap.keys should contain ("spark.driver.supervise") + confMap.keys should contain ("spark.ui.enabled") + confMap.keys should contain ("spark.submit.deployMode") + conf.get("spark.ui.enabled") should be ("false") } test("handles standalone client mode") { @@ -352,14 +353,14 @@ class SparkSubmitSuite "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (1) classpath(0) should endWith ("thejar.jar") - sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.cores.max") should be ("5") - sysProps("spark.ui.enabled") should be ("false") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.cores.max") should be ("5") + conf.get("spark.ui.enabled") should be ("false") } test("handles mesos client mode") { @@ -374,14 +375,14 @@ class SparkSubmitSuite "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") mainClass should be ("org.SomeClass") classpath should have length (1) classpath(0) should endWith ("thejar.jar") - sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.cores.max") should be ("5") - sysProps("spark.ui.enabled") should be ("false") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.cores.max") should be ("5") + conf.get("spark.ui.enabled") should be ("false") } test("handles confs with flag equivalents") { @@ -394,23 +395,26 @@ class SparkSubmitSuite "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) - val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) - sysProps("spark.executor.memory") should be ("5g") - sysProps("spark.master") should be ("yarn") - sysProps("spark.submit.deployMode") should be ("cluster") + val (_, _, conf, mainClass) = prepareSubmitEnvironment(appArgs) + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.master") should be ("yarn") + conf.get("spark.submit.deployMode") should be ("cluster") mainClass should be ("org.apache.spark.deploy.yarn.Client") } test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") { + // Unset from system properties since this config is defined in the root pom's test config. + sys.props -= UI_SHOW_CONSOLE_PROGRESS.key + val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", "spark-shell") val appArgs1 = new SparkSubmitArguments(clArgs1) - val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1) - sysProps1(UI_SHOW_CONSOLE_PROGRESS.key) should be ("true") + val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1) + conf1.get(UI_SHOW_CONSOLE_PROGRESS) should be (true) val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar") val appArgs2 = new SparkSubmitArguments(clArgs2) - val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2) - sysProps2.keys should not contain UI_SHOW_CONSOLE_PROGRESS.key + val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2) + assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS)) } test("launch simple application with spark-submit") { @@ -585,11 +589,11 @@ class SparkSubmitSuite "--files", files, "thejar.jar") val appArgs = new SparkSubmitArguments(clArgs) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 + val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs) appArgs.jars should be (Utils.resolveURIs(jars)) appArgs.files should be (Utils.resolveURIs(files)) - sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) - sysProps("spark.files") should be (Utils.resolveURIs(files)) + conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) + conf.get("spark.files") should be (Utils.resolveURIs(files)) // Test files and archives (Yarn) val clArgs2 = Seq( @@ -600,11 +604,11 @@ class SparkSubmitSuite "thejar.jar" ) val appArgs2 = new SparkSubmitArguments(clArgs2) - val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3 + val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2) appArgs2.files should be (Utils.resolveURIs(files)) appArgs2.archives should be (Utils.resolveURIs(archives)) - sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) - sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives)) + conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) + conf2.get("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives)) // Test python files val clArgs3 = Seq( @@ -615,12 +619,12 @@ class SparkSubmitSuite "mister.py" ) val appArgs3 = new SparkSubmitArguments(clArgs3) - val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3 + val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3) appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles)) - sysProps3("spark.submit.pyFiles") should be ( + conf3.get("spark.submit.pyFiles") should be ( PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) - sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4") - sysProps3(PYSPARK_PYTHON.key) should be ("python3.5") + conf3.get(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4") + conf3.get(PYSPARK_PYTHON.key) should be ("python3.5") } test("resolves config paths correctly") { @@ -644,9 +648,9 @@ class SparkSubmitSuite "thejar.jar" ) val appArgs = new SparkSubmitArguments(clArgs) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 - sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) - sysProps("spark.files") should be(Utils.resolveURIs(files)) + val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs) + conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) + conf.get("spark.files") should be(Utils.resolveURIs(files)) // Test files and archives (Yarn) val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir) @@ -661,9 +665,9 @@ class SparkSubmitSuite "thejar.jar" ) val appArgs2 = new SparkSubmitArguments(clArgs2) - val sysProps2 = SparkSubmit.prepareSubmitEnvironment(appArgs2)._3 - sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files)) - sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives)) + val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2) + conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files)) + conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives)) // Test python files val f3 = File.createTempFile("test-submit-python-files", "", tmpDir) @@ -676,8 +680,8 @@ class SparkSubmitSuite "mister.py" ) val appArgs3 = new SparkSubmitArguments(clArgs3) - val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3 - sysProps3("spark.submit.pyFiles") should be( + val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3) + conf3.get("spark.submit.pyFiles") should be( PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) // Test remote python files @@ -693,11 +697,9 @@ class SparkSubmitSuite "hdfs:///tmp/mister.py" ) val appArgs4 = new SparkSubmitArguments(clArgs4) - val sysProps4 = SparkSubmit.prepareSubmitEnvironment(appArgs4)._3 + val (_, _, conf4, _) = SparkSubmit.prepareSubmitEnvironment(appArgs4) // Should not format python path for yarn cluster mode - sysProps4("spark.submit.pyFiles") should be( - Utils.resolveURIs(remotePyFiles) - ) + conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles)) } test("user classpath first in driver") { @@ -771,14 +773,14 @@ class SparkSubmitSuite jar2.toString) val appArgs = new SparkSubmitArguments(args) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3 - sysProps("spark.yarn.dist.jars").split(",").toSet should be + val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs) + conf.get("spark.yarn.dist.jars").split(",").toSet should be (Set(jar1.toURI.toString, jar2.toURI.toString)) - sysProps("spark.yarn.dist.files").split(",").toSet should be + conf.get("spark.yarn.dist.files").split(",").toSet should be (Set(file1.toURI.toString, file2.toURI.toString)) - sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be + conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath)) - sysProps("spark.yarn.dist.archives").split(",").toSet should be + conf.get("spark.yarn.dist.archives").split(",").toSet should be (Set(archive1.toURI.toString, archive2.toURI.toString)) } @@ -897,18 +899,18 @@ class SparkSubmitSuite ) val appArgs = new SparkSubmitArguments(args) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf)) // All the resources should still be remote paths, so that YARN client will not upload again. - sysProps("spark.yarn.dist.jars") should be (tmpJarPath) - sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}") - sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}") + conf.get("spark.yarn.dist.jars") should be (tmpJarPath) + conf.get("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}") + conf.get("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}") // Local repl jars should be a local path. - sysProps("spark.repl.local.jars") should (startWith("file:")) + conf.get("spark.repl.local.jars") should (startWith("file:")) // local py files should not be a URI format. - sysProps("spark.submit.pyFiles") should (startWith("/")) + conf.get("spark.submit.pyFiles") should (startWith("/")) } test("download remote resource if it is not supported by yarn service") { @@ -955,9 +957,9 @@ class SparkSubmitSuite ) val appArgs = new SparkSubmitArguments(args) - val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf)) - val jars = sysProps("spark.yarn.dist.jars").split(",").toSet + val jars = conf.get("spark.yarn.dist.jars").split(",").toSet // The URI of remote S3 resource should still be remote. assert(jars.contains(tmpS3JarPath)) @@ -996,6 +998,21 @@ class SparkSubmitSuite conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName) conf.set("fs.s3a.impl.disable.cache", "true") } + + test("start SparkApplication without modifying system properties") { + val args = Array( + "--class", classOf[TestSparkApplication].getName(), + "--master", "local", + "--conf", "spark.test.hello=world", + "spark-internal", + "hello") + + val exception = intercept[SparkException] { + SparkSubmit.main(args) + } + + assert(exception.getMessage() === "hello") + } } object SparkSubmitSuite extends SparkFunSuite with TimeLimits { @@ -1115,3 +1132,17 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { override def open(path: Path): FSDataInputStream = super.open(local(path)) } + +class TestSparkApplication extends SparkApplication with Matchers { + + override def start(args: Array[String], conf: SparkConf): Unit = { + assert(args.size === 1) + assert(args(0) === "hello") + assert(conf.get("spark.test.hello") === "world") + assert(sys.props.get("spark.test.hello") === None) + + // This is how the test verifies the application was actually run. + throw new SparkException(args(0)) + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 70887dc5dd97..490baf040491 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -445,9 +445,9 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { "--class", mainClass, mainJar) ++ appArgs val args = new SparkSubmitArguments(commandLineArgs) - val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) + val (_, _, sparkConf, _) = SparkSubmit.prepareSubmitEnvironment(args) new RestSubmissionClient("spark://host:port").constructSubmitRequest( - mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) + mainJar, mainClass, appArgs, sparkConf.getAll.toMap, Map.empty) } /** Return the response as a submit response, or fail with error otherwise. */