Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 50 additions & 39 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,25 @@ object SparkSubmit extends CommandLineUtils {
}
}

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for failing faster no need to resolve dependencies if we are going to fail.

// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
Expand All @@ -278,36 +297,40 @@ object SparkSubmit extends CommandLineUtils {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!isMesosCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}

// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}
// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}

val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)


if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
}

// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
}

// In client mode, download remote files.
Expand Down Expand Up @@ -339,24 +362,6 @@ object SparkSubmit extends CommandLineUtils {
}
}

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}

// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
Expand Down Expand Up @@ -464,6 +469,12 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),

// Mesos only - propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
Expand Down