From 6139631bc6b0a2bf773ae2a3e90899dfa45fd6e2 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 10 Jul 2017 13:03:20 +0300 Subject: [PATCH] fix --packages for mesos --- .../org/apache/spark/deploy/SparkSubmit.scala | 89 +++++++++++-------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d13fb4193970..111f1e590fa8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -269,6 +269,25 @@ object SparkSubmit extends CommandLineUtils { } } + // Fail fast, the following modes are not supported or applicable + (clusterManager, deployMode) match { + case (STANDALONE, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python " + + "applications on standalone clusters.") + case (STANDALONE, CLUSTER) if args.isR => + printErrorAndExit("Cluster deploy mode is currently not supported for R " + + "applications on standalone clusters.") + case (LOCAL, CLUSTER) => + printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case (_, CLUSTER) if isSqlShell(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") + case (_, CLUSTER) if isThriftServer(args.mainClass) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") + case _ => + } + // Update args.deployMode if it is null. It will be passed down as a Spark property later. (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" @@ -278,36 +297,40 @@ object SparkSubmit extends CommandLineUtils { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val exclusions: Seq[String] = + if (!isMesosCluster) { + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val exclusions: Seq[String] = if (!StringUtils.isBlank(args.packagesExclusions)) { args.packagesExclusions.split(",") } else { Nil } - // Create the IvySettings, either load from file or build defaults - val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => - SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), - Option(args.ivyRepoPath)) - }.getOrElse { - SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) - } + // Create the IvySettings, either load from file or build defaults + val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), + Option(args.ivyRepoPath)) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) + } - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - ivySettings, exclusions = exclusions) - if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) { - args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, + ivySettings, exclusions = exclusions) + + + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython) { + args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + } } - } - // install any R packages that may have been passed through --jars or --packages. - // Spark Packages may contain R source code inside the jar. - if (args.isR && !StringUtils.isBlank(args.jars)) { - RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR && !StringUtils.isBlank(args.jars)) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } } // In client mode, download remote files. @@ -339,24 +362,6 @@ object SparkSubmit extends CommandLineUtils { } } - // The following modes are not supported or applicable - (clusterManager, deployMode) match { - case (STANDALONE, CLUSTER) if args.isPython => - printErrorAndExit("Cluster deploy mode is currently not supported for python " + - "applications on standalone clusters.") - case (STANDALONE, CLUSTER) if args.isR => - printErrorAndExit("Cluster deploy mode is currently not supported for R " + - "applications on standalone clusters.") - case (LOCAL, CLUSTER) => - printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") - case (_, CLUSTER) if isShell(args.primaryResource) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") - case (_, CLUSTER) if isSqlShell(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") - case (_, CLUSTER) if isThriftServer(args.mainClass) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") - case _ => - } // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { @@ -464,6 +469,12 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), + // Mesos only - propagate attributes for dependency resolution at the driver side + OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"), + // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,