From 542531483312b77ed941c277f3e05c4ef1867534 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 Jan 2015 15:36:12 -0800 Subject: [PATCH] Disable UI for tests --- .../scala/org/apache/spark/SparkContext.scala | 12 +++-- .../cluster/SimrSchedulerBackend.scala | 5 +- .../cluster/SparkDeploySchedulerBackend.scala | 4 +- .../scala/org/apache/spark/ui/UISuite.scala | 46 +++++++++++++------ pom.xml | 4 ++ project/SparkBuild.scala | 1 + .../spark/streaming/StreamingContext.scala | 11 ++++- .../spark/streaming/StreamingSource.scala | 2 +- .../spark/streaming/ui/StreamingTab.scala | 25 +++++++--- .../org/apache/spark/streaming/UISuite.scala | 15 ++++-- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 12 files changed, 94 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 59cd78ecc54e..29c3f15a6c2c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging { new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) // Initialize the Spark UI, registering all associated listeners - private[spark] val ui = new SparkUI(this) - ui.bind() + private[spark] val ui: Option[SparkUI] = + if (conf.getBoolean("spark.ui.enabled", true)) { + Some(new SparkUI(this)) + } else { + // For tests, do not enable the UI + None + } + ui.foreach(_.bind()) /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration: Configuration = { @@ -979,7 +985,7 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { postApplicationEnd() - ui.stop() + ui.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index d99c76117c16..149c9fd1efba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -44,16 +44,17 @@ private[spark] class SimrSchedulerBackend( val conf = new Configuration() val fs = FileSystem.get(conf) + val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") logInfo("Writing to HDFS file: " + driverFilePath) logInfo("Writing Akka address: " + driverUrl) - logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress) + logInfo("Writing Spark UI Address: " + appUIAddress) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) temp.writeUTF(driverUrl) temp.writeInt(maxCores) - temp.writeUTF(sc.ui.appUIAddress) + temp.writeUTF(appUIAddress) temp.close() // "Atomic" rename diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f7b695..d7d78a994465 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -58,8 +58,10 @@ private[spark] class SparkDeploySchedulerBackend( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() + val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val eventLogDir = sc.eventLogger.map(_.logDir) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sparkHome, appUIAddress, eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 038746d2eda4..3d241324f6b9 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest import scala.io.Source import scala.language.postfixOps import scala.util.{Failure, Success, Try} +import scala.xml.Node import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler @@ -32,15 +33,28 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.LocalSparkContext._ -import scala.xml.Node class UISuite extends FunSuite { + /** + * Create a test SparkContext with the SparkUI enabled. + * It is safe to `get` the SparkUI directly from the SparkContext returned here. + */ + private def newSparkContext(): SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.ui.enabled", "true") + val sc = new SparkContext(conf) + assert(sc.ui.isDefined) + sc + } + ignore("basic ui visibility") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress).mkString + val html = Source.fromURL(sc.ui.get.appUIAddress).mkString assert(!html.contains("random data that should not be present")) assert(html.toLowerCase.contains("stages")) assert(html.toLowerCase.contains("storage")) @@ -51,7 +65,7 @@ class UISuite extends FunSuite { } ignore("visibility at localhost:4040") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10 seconds), interval(50 milliseconds)) { val html = Source.fromURL("http://localhost:4040").mkString @@ -61,8 +75,8 @@ class UISuite extends FunSuite { } ignore("attaching a new tab") { - withSpark(new SparkContext("local", "test")) { sc => - val sparkUI = sc.ui + withSpark(newSparkContext()) { sc => + val sparkUI = sc.ui.get val newTab = new WebUITab(sparkUI, "foo") { attachPage(new WebUIPage("") { @@ -73,7 +87,7 @@ class UISuite extends FunSuite { } sparkUI.attachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress).mkString + val html = Source.fromURL(sparkUI.appUIAddress).mkString assert(!html.contains("random data that should not be present")) // check whether new page exists @@ -87,7 +101,7 @@ class UISuite extends FunSuite { } eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString + val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString // check whether new page exists assert(html.contains("magic")) } @@ -129,16 +143,20 @@ class UISuite extends FunSuite { } test("verify appUIAddress contains the scheme") { - withSpark(new SparkContext("local", "test")) { sc => - val uiAddress = sc.ui.appUIAddress - assert(uiAddress.equals("http://" + sc.ui.appUIHostPort)) + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + val uiAddress = ui.appUIAddress + val uiHostPort = ui.appUIHostPort + assert(uiAddress.equals("http://" + uiHostPort)) } } test("verify appUIAddress contains the port") { - withSpark(new SparkContext("local", "test")) { sc => - val splitUIAddress = sc.ui.appUIAddress.split(':') - assert(splitUIAddress(2).toInt == sc.ui.boundPort) + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + val splitUIAddress = ui.appUIAddress.split(':') + val boundPort = ui.boundPort + assert(splitUIAddress(2).toInt == boundPort) } } } diff --git a/pom.xml b/pom.xml index d400a766a0d9..975e963d1978 100644 --- a/pom.xml +++ b/pom.xml @@ -839,6 +839,10 @@ ${project.build.directory}/SparkTestSuite.txt -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + + true + false + ${session.executionRootDirectory} 1 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5328ad3a35ac..11523981438e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -183,6 +183,7 @@ object SparkBuild extends Build { fork := true, javaOptions in Test += "-Dspark.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", + javaOptions in Test += "-Dspark.ui.enabled=false", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark").map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g".split(" ").toSeq, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7c6b0524267e..2c59b759b056 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver} import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.ui.StreamingTab +import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.MetadataCleaner /** @@ -152,7 +152,14 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter - private[streaming] val uiTab = new StreamingTab(this) + private[streaming] val progressListener = new StreamingJobProgressListener(this) + + private[streaming] val uiTab: Option[StreamingTab] = + if (conf.getBoolean("spark.ui.enabled", true)) { + Some(new StreamingTab(this)) + } else { + None + } /** Register streaming source to metrics system */ private val streamingSource = new StreamingSource(this) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index 774adc3c23c2..77f549ff487a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { val metricRegistry = new MetricRegistry val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName) - val streamingListener = ssc.uiTab.listener + val streamingListener = ssc.progressListener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, defaultValue: T) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 51448d15c651..7d97d7e1c1c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,20 +17,33 @@ package org.apache.spark.streaming.ui -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.WebUITab +import org.apache.spark.ui.{SparkUI, WebUITab} -/** Spark Web UI tab that shows statistics of a streaming job */ +import StreamingTab._ + +/** + * Spark Web UI tab that shows statistics of a streaming job. + * This assumes the given SparkContext has enabled its SparkUI. + */ private[spark] class StreamingTab(ssc: StreamingContext) - extends WebUITab(ssc.sc.ui, "streaming") with Logging { + extends WebUITab(getSparkUI(ssc), "streaming") with Logging { - val parent = ssc.sc.ui + val parent = getSparkUI(ssc) val appName = parent.appName val basePath = parent.basePath - val listener = new StreamingJobProgressListener(ssc) + val listener = ssc.progressListener ssc.addStreamingListener(listener) attachPage(new StreamingPage(this)) parent.attachTab(this) } + +private object StreamingTab { + def getSparkUI(ssc: StreamingContext): SparkUI = { + ssc.sc.ui.getOrElse { + throw new SparkException("Parent SparkUI to attach this tab to not found!") + } + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 2a0db7564915..99aa03e96b20 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -24,13 +24,21 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkConf + class UISuite extends FunSuite { // Ignored: See SPARK-1530 ignore("streaming tab in spark UI") { - val ssc = new StreamingContext("local", "test", Seconds(1)) + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.ui.enabled", "true") + val ssc = new StreamingContext(conf, Seconds(1)) + assert(ssc.sc.ui.isDefined, "Spark UI is not started!") + val ui = ssc.sc.ui.get eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString + val html = Source.fromURL(ui.appUIAddress).mkString assert(!html.contains("random data that should not be present")) // test if streaming tab exist assert(html.toLowerCase.contains("streaming")) @@ -39,8 +47,7 @@ class UISuite extends FunSuite { } eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL( - ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString assert(html.toLowerCase.contains("batch")) assert(html.toLowerCase.contains("network")) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f0ecb855718..ca18b33f82ea 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -235,7 +235,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, assert(sparkContext != null || count >= numTries) if (null != sparkContext) { - uiAddress = sparkContext.ui.appUIHostPort + uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("") this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, resourceManager, diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 33a60d978c58..53d52283c7da 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -222,7 +222,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, assert(sparkContext != null || numTries >= maxNumTries) if (sparkContext != null) { - uiAddress = sparkContext.ui.appUIHostPort + uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("") this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, amClient,