Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils {
private[deploy] object DependencyUtils extends Logging {

def resolveMavenDependencies(
packagesExclusions: String,
Expand Down Expand Up @@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
addJarToClasspath(jar, loader)
}
}
}
Expand Down Expand Up @@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
}.mkString(",")
}

def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
logWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
logWarning(s"Skip remote jar $uri.")
}
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(Utils.stringToSeq)
if (merged.nonEmpty) merged.mkString(",") else null
}

private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
Expand Down
318 changes: 161 additions & 157 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try

import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
Expand All @@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
* The env argument is used for testing.
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
Expand Down Expand Up @@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
// scalastyle:off println
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
if (verbose) {
logInfo(s"Using properties file: $propertiesFile")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
Expand All @@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
logInfo(s"Adding default property: $k=$v")
}
}
}
// scalastyle:on println
defaultProperties
}

// Set parameters from command line arguments
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
parse(args.asJava)

// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
Expand Down Expand Up @@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
logWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
Expand Down Expand Up @@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
} catch {
case _: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
error(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
error(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
Expand Down Expand Up @@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case SUBMIT => validateSubmitArguments()
case KILL => validateKillArguments()
case REQUEST_STATUS => validateStatusRequestArguments()
case PRINT_VERSION =>
}
}

Expand All @@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
printUsageAndExit(-1)
}
if (primaryResource == null) {
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
error("Must specify a primary resource (JAR or Python or R file)")
}
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
error("No main class set in JAR; please specify one with --class")
}
if (driverMemory != null
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
error("Driver memory must be a positive number")
}
if (executorMemory != null
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
error("Executor memory must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
error("Executor cores must be a positive number")
}
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
error("Total executor cores must be a positive number")
}
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
error("--py-files given but primary resource is not a Python script")
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
error(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a good candidate to use your new error method instead of throwing the Exception directly. It might happen there is client catching both Exception and SparkException and doing very different things but I guess that is very unlikely case.


if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
error("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
error("Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
error("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
error("Please specify a submission to request status for.")
}
}

Expand Down Expand Up @@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value

Expand Down Expand Up @@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
error(s"Action cannot be both $action and $KILL.")
}
action = KILL

case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
error(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS

Expand Down Expand Up @@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value

case CONF =>
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
sparkProperties(confName) = confValue

case PROXY_USER =>
Expand All @@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
verbose = true

case VERSION =>
SparkSubmit.printVersionAndExit()
action = SparkSubmitAction.PRINT_VERSION

case USAGE_ERROR =>
printUsageAndExit(1)

case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
error(s"Unexpected argument '$opt'.")
}
true
action != SparkSubmitAction.PRINT_VERSION
}

/**
Expand All @@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
*/
override protected def handleUnknown(opt: String): Boolean = {
if (opt.startsWith("-")) {
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
error(s"Unrecognized option '$opt'.")
}

primaryResource =
Expand All @@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming the method. What about printUsageAndThrowException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to "exit" the submission process (even if there's no "exit" in some cases). The different name would also feel weird given the "exitCode" parameter. So even if not optimal I prefer the current name.

// scalastyle:off println
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
logInfo("Unknown/unsupported param " + unknownParam)
}
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|Usage: spark-submit --status [submission ID] --master [spark://...]
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
outStream.println(command)
logInfo(command)

val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
outStream.println(
logInfo(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
Expand Down Expand Up @@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
)

if (SparkSubmit.isSqlShell(mainClass)) {
outStream.println("CLI options:")
outStream.println(getSqlShellOptions())
logInfo("CLI options:")
logInfo(getSqlShellOptions())
}
// scalastyle:on println

SparkSubmit.exitFn(exitCode)
throw new SparkUserAppException(exitCode)
}

/**
Expand Down Expand Up @@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}

private def error(msg: String): Unit = throw new SparkException(msg)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.util._

/**
* Utility object for launching driver programs such that they share fate with the Worker process.
Expand Down Expand Up @@ -93,7 +93,7 @@ object DriverWrapper extends Logging {
val jars = {
val jarsProp = sys.props.get("spark.jars").orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
} else {
jarsProp
}
Expand Down
18 changes: 4 additions & 14 deletions core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,14 @@ private[spark] trait CommandLineUtils {
private[spark] var printStream: PrintStream = System.err

// scalastyle:off println

private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
private[spark] def printMessage(str: String): Unit = printStream.println(str)
// scalastyle:on println

private[spark] def printErrorAndExit(str: String): Unit = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
printMessage("Error: " + str)
printMessage("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}

// scalastyle:on println

private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
pair.split("=", 2).toSeq match {
case Seq(k, v) => (k, v)
case _ => printErrorAndExit(s"Spark config without '=': $pair")
throw new SparkException(s"Spark config without '=': $pair")
}
}

def main(args: Array[String]): Unit
}
Loading