From 04c26b15ef06cb7751d1c7853db3708188e5e512 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 24 Sep 2015 14:59:33 -0700 Subject: [PATCH 1/8] Supply Yarn or regular hadoop libs as required --- .../apache/spark/deploy/SparkHadoopUtil.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index a0b7365df900a..fb2a45c75d2b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -385,20 +385,13 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private val hadoop = { - val yarnMode = java.lang.Boolean.valueOf( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (yarnMode) { - try { - Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") - .newInstance() - .asInstanceOf[SparkHadoopUtil] - } catch { - case e: Exception => throw new SparkException("Unable to load YARN support", e) - } - } else { - new SparkHadoopUtil - } + private lazy val hadoop = new SparkHadoopUtil + private lazy val yarn = try{ + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + .newInstance() + .asInstanceOf[SparkHadoopUtil] + } catch { + case e: Exception => throw new SparkException("Unable to load YARN support", e) } val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp" @@ -406,6 +399,13 @@ object SparkHadoopUtil { val SPARK_YARN_CREDS_COUNTER_DELIM = "-" def get: SparkHadoopUtil = { - hadoop + // Check each time to support changing to/from YARN + val yarnMode = java.lang.Boolean.valueOf( + System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if (yarnMode) { + yarn + } else { + hadoop + } } } From 1915e7dfb95994986d3adac57f9f7c5ce9790dd9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 24 Sep 2015 14:59:59 -0700 Subject: [PATCH 2/8] Check to make sure changing the system property changes the hadoop utils as expected --- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 49bee0866dd43..53b7aa40ba062 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -233,4 +233,17 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } + + test("check different hadoop utils based on env variable") { + import org.apache.spark.deploy.SparkHadoopUtil + System.setProperty("SPARK_YARN_MODE", "true") + SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] + System.setProperty("SPARK_YARN_MODE", "false") + val caught = intercept[java.lang.ClassCastException] { + SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] + } + assert(caught.getMessage === "org.apache.spark.deploy.SparkHadoopUtil cannot be cast to " + + "org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + System.clearProperty("SPARK_YARN_MODE") + } } From 664162b4239503c1390f15420dc458b5ac6ec4e7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 12:06:53 -0700 Subject: [PATCH 3/8] Fix spacing --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index fb2a45c75d2b2..d606b80c03c98 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -386,7 +386,7 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { private lazy val hadoop = new SparkHadoopUtil - private lazy val yarn = try{ + private lazy val yarn = try { Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") .newInstance() .asInstanceOf[SparkHadoopUtil] From f97ec06c7063254ec02472e1962e88b86c8fc165 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 12:07:18 -0700 Subject: [PATCH 4/8] Make the tests simpler (thanks srowen :)). --- .../spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 53b7aa40ba062..c0f9708bd3698 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.Matchers import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -235,15 +236,10 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } test("check different hadoop utils based on env variable") { - import org.apache.spark.deploy.SparkHadoopUtil System.setProperty("SPARK_YARN_MODE", "true") - SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] + assert(SparkHadoopUtil.get.getClass.getSimpleName === "YarnSparkHadoopUtil") System.setProperty("SPARK_YARN_MODE", "false") - val caught = intercept[java.lang.ClassCastException] { - SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] - } - assert(caught.getMessage === "org.apache.spark.deploy.SparkHadoopUtil cannot be cast to " + - "org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + assert(SparkHadoopUtil.get.getClass.getSimpleName === "SparkHadoopUtil") System.clearProperty("SPARK_YARN_MODE") } } From 3c20e5d723412924955169ab2c02e3996479ff0a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 14:33:12 -0700 Subject: [PATCH 5/8] Clear SPARK_YARN_MODE property on context shutdown --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bf3aeb488d597..220092381ed32 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1757,6 +1757,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli SparkEnv.set(null) } SparkContext.clearActiveContext() + // Unset YARN mode system env variable, to allow switching between cluster types. + System.clearProperty("SPARK_YARN_MODE") logInfo("Successfully stopped SparkContext") } From e0388995dfdc7bb1115db8e554aceb366f9332e5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 14:34:07 -0700 Subject: [PATCH 6/8] Clear Spark property during yarn client stop as well --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a2c4bc2f5480b..8c53c24a79c48 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -86,7 +86,11 @@ private[spark] class Client( private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) - def stop(): Unit = yarnClient.stop() + def stop(): Unit = { + yarnClient.stop() + // Unset YARN mode system env variable, to allow switching between cluster types. + System.clearProperty("SPARK_YARN_MODE") + } /** * Submit an application running our ApplicationMaster to the ResourceManager. From fb0de1be079240dfb83efd25c6996b3269bf0a7b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 16:43:13 -0700 Subject: [PATCH 7/8] Clear the SPARK_YARN_MODE property earlier --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 220092381ed32..0c72adfb9505b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1756,9 +1756,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } SparkEnv.set(null) } - SparkContext.clearActiveContext() // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") + SparkContext.clearActiveContext() logInfo("Successfully stopped SparkContext") } From d9ca9256a4c4e88989e6c803760a88d0ff3908e9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 25 Sep 2015 16:43:42 -0700 Subject: [PATCH 8/8] Compare classes rather than strings, clear propery in a final block --- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index c0f9708bd3698..e1c67db76571f 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -236,10 +236,13 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } test("check different hadoop utils based on env variable") { - System.setProperty("SPARK_YARN_MODE", "true") - assert(SparkHadoopUtil.get.getClass.getSimpleName === "YarnSparkHadoopUtil") - System.setProperty("SPARK_YARN_MODE", "false") - assert(SparkHadoopUtil.get.getClass.getSimpleName === "SparkHadoopUtil") - System.clearProperty("SPARK_YARN_MODE") + try { + System.setProperty("SPARK_YARN_MODE", "true") + assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil]) + System.setProperty("SPARK_YARN_MODE", "false") + assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil]) + } finally { + System.clearProperty("SPARK_YARN_MODE") + } } }