From db3773c8aeb44e69610c17ff5bc169d7085333ee Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 8 Feb 2017 18:27:07 -0500 Subject: [PATCH 1/2] Fix propagation of executor memory in local-cluster mode --- .../scala/org/apache/spark/SparkContext.scala | 24 +++++++-------- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../spark/deploy/SparkSubmitSuite.scala | 29 +++++++++++++++++++ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 869c5d7094cd..7169be7cf58d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -473,12 +473,16 @@ class SparkContext(config: SparkConf) extends Logging { files.foreach(addFile) } - _executorMemory = _conf.getOption("spark.executor.memory") - .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) - .orElse(Option(System.getenv("SPARK_MEM")) - .map(warnSparkMem)) - .map(Utils.memoryStringToMb) - .getOrElse(1024) + // In local-cluster mode, always use the slave memory specified in the master string + _executorMemory = master match { + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, em) => em.toInt + case _ => _conf.getOption("spark.executor.memory") + .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) + .orElse(Option(System.getenv("SPARK_MEM")) + .map(warnSparkMem)) + .map(Utils.memoryStringToMb) + .getOrElse(1024) + } // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. @@ -2681,14 +2685,8 @@ object SparkContext extends Logging { (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => - // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt - if (sc.executorMemory > memoryPerSlaveInt) { - throw new SparkException( - "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( - memoryPerSlaveInt, sc.executorMemory)) - } - + assert(sc.executorMemory == memoryPerSlaveInt) val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd1658a..533f07c1447a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -466,7 +466,7 @@ object SparkSubmit extends CommandLineUtils { // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, + OptionAssigner(args.executorMemory, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 9417930d0240..7fc812df5172 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -388,6 +388,20 @@ class SparkSubmitSuite runSparkSubmit(args) } + test("executor memory in local-cluster mode") { + val executorMemoryMb = 1888 + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", LocalClusterExecutorMemoryTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[2,1,%d]".format(executorMemoryMb), + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + unusedJar.toString, + executorMemoryMb.toString) + runSparkSubmit(args) + } + test("includes jars passed in through --jars") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) @@ -720,6 +734,21 @@ object JarCreationTest extends Logging { } } +object LocalClusterExecutorMemoryTest { + def main(args: Array[String]): Unit = { + Utils.configTestLog4j("INFO") + val sc = new SparkContext + if (args.length != 1) { + throw new IllegalArgumentException("Excepted exactly 1 argument, got " + args.length) + } + val executorMemory = args.head.toInt + if (sc.executorMemory != executorMemory) { + throw new SparkException( + "Expected executor memory to be %s, was %s".format(executorMemory, sc.executorMemory)) + } + } +} + object SimpleApplicationTest { def main(args: Array[String]) { Utils.configTestLog4j("INFO") From 1a5bdfeca931a624b82933772c57c605e02dc58d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 17 Feb 2017 11:47:44 -0500 Subject: [PATCH 2/2] Log warning if memory is explicitly set --- .../scala/org/apache/spark/SparkContext.scala | 23 +++++++++++++------ .../spark/deploy/SparkSubmitSuite.scala | 4 +++- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7169be7cf58d..773db43edb7f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -473,15 +473,24 @@ class SparkContext(config: SparkConf) extends Logging { files.foreach(addFile) } - // In local-cluster mode, always use the slave memory specified in the master string - _executorMemory = master match { - case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, em) => em.toInt - case _ => _conf.getOption("spark.executor.memory") + _executorMemory = { + val defaultMemory = 1024 + val configuredMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) - .orElse(Option(System.getenv("SPARK_MEM")) - .map(warnSparkMem)) + .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) .map(Utils.memoryStringToMb) - .getOrElse(1024) + // In local-cluster mode, always use the slave memory specified in the master string + // In other modes, use the configured memory if it exists + master match { + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, em) => + if (configuredMemory.isDefined) { + logWarning(s"Ignoring explicit setting of executor" + + s"memory $configuredMemory in local-cluster mode") + } + em.toInt + case _ => + configuredMemory.getOrElse(defaultMemory) + } } // Convert java options to env vars as a work around diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 7fc812df5172..ab19d95722bf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -394,9 +394,11 @@ class SparkSubmitSuite val args = Seq( "--class", LocalClusterExecutorMemoryTest.getClass.getName.stripSuffix("$"), "--name", "testApp", - "--master", "local-cluster[2,1,%d]".format(executorMemoryMb), + "--master", s"local-cluster[2,1,$executorMemoryMb]", "--conf", "spark.ui.enabled=false", "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.executor.memory=${executorMemoryMb * 2}", // not used + "--conf", "spark.testing.reservedMemory=0", // needed to avoid SPARK-12759 unusedJar.toString, executorMemoryMb.toString) runSparkSubmit(args)