From 4c692d96b632488125de7920752c001a1a09eac9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 6 Jun 2014 16:06:47 -0700 Subject: [PATCH 1/2] Yarn: report HS URL in client mode, correct user in cluster mode. Yarn client mode was not setting the app's tracking URL to the History Server's URL when configured by the user. Now client mode behaves the same as cluster mode. In SparkContext.scala, the "user.name" system property had precedence over the SPARK_USER environment variable. This means that SPARK_USER was never used, since "user.name" is always set by the JVM. In Yarn cluster mode, this means the application always reported itself as being run by user "yarn" (or whatever user was running the Yarn NM). One could argue that the correct fix would be to use UGI.getCurrentUser() here, but at least for Yarn that will match what SPARK_USER is set to. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d941aea9d7eb2..0de0217505880 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -297,7 +297,7 @@ class SparkContext(config: SparkConf) extends Logging { // Set SPARK_USER for user who is running SparkContext. val sparkUser = Option { - Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER")) + Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name")) }.getOrElse { SparkContext.SPARK_UNKNOWN_USER } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a4ce8766d347c..51e141fdb042f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -115,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val interval = math.min(timeoutInterval / 2, schedulerInterval) reporterThread = launchReporterThread(interval) - + // Wait for the reporter thread to Finish. reporterThread.join() @@ -134,12 +134,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .orElse(Option(System.getenv("LOCAL_DIRS"))) - + localDirs match { case None => throw new Exception("Yarn Local dirs can't be empty") case Some(l) => l } - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() @@ -247,7 +247,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("finish ApplicationMaster with " + status) - amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) + val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) } } From 4046e042a8c52f8cc719d7796e5e66b9f68e217c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Jun 2014 13:18:38 -0700 Subject: [PATCH 2/2] Set HS link in yarn-alpha also. --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a3bd91590fc25..b6ecae1e652fe 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -271,6 +271,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) resourceManager.finishApplicationMaster(finishReq) }