From 24de26347c4421d54393dc60263e462a2112a10b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Oct 2014 21:02:14 -0700 Subject: [PATCH 01/18] Create UI listeners in SparkContext instead of in Tabs: This change means that the listeners will always be registered, even if the web UI is disabled. The motivation for this change is to allow these listeners to be used when implementing a stable pull-based status / progress API for 1.2. --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++- .../deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 4 +- .../scala/org/apache/spark/ui/SparkUI.scala | 54 ++++++++++--------- .../apache/spark/ui/env/EnvironmentTab.scala | 4 +- .../apache/spark/ui/exec/ExecutorsTab.scala | 3 +- .../apache/spark/ui/jobs/JobProgressTab.scala | 3 +- .../apache/spark/ui/storage/StorageTab.scala | 3 +- 8 files changed, 55 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 396cdd1247e0..03bc4654a5a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -50,6 +50,10 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.env.EnvironmentListener +import org.apache.spark.ui.exec.ExecutorsListener +import org.apache.spark.ui.jobs.JobProgressListener +import org.apache.spark.ui.storage.StorageListener import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} /** @@ -229,10 +233,24 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) - // Initialize the Spark UI, registering all associated listeners + + private[spark] val environmentListener = new EnvironmentListener + private[spark] val storageStatusListener = new StorageStatusListener + private[spark] val executorsListener = new ExecutorsListener(storageStatusListener) + private[spark] val jobProgressListener = new JobProgressListener(conf) + private[spark] val storageListener = new StorageListener(storageStatusListener) + + listenerBus.addListener(environmentListener) + listenerBus.addListener(storageStatusListener) + listenerBus.addListener(executorsListener) + listenerBus.addListener(jobProgressListener) + listenerBus.addListener(storageListener) + + // Initialize the Spark UI: private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(new SparkUI(this)) + Some(new SparkUI(this, conf, env.securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, appName)) } else { // For tests, do not enable the UI None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 481f6c93c6a8..d7a86d8037d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, + SparkUI.create(conf, replayBus, appSecManager, appId, s"${HistoryServer.UI_PATH_PREFIX}/$appId") // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f98b531316a3..b4d4012f49e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -714,8 +714,8 @@ private[spark] class Master( try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", - HistoryServer.UI_PATH_PREFIX + s"/${app.id}") + val ui = SparkUI.create(new SparkConf, replayBus, new SecurityManager(conf), + appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui webUi.attachSparkUI(ui) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index cccd59d122a9..c77eddfad943 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -21,10 +21,10 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.EnvironmentTab -import org.apache.spark.ui.exec.ExecutorsTab -import org.apache.spark.ui.jobs.JobProgressTab -import org.apache.spark.ui.storage.StorageTab +import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} +import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} +import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab} +import org.apache.spark.ui.storage.{StorageListener, StorageTab} /** * Top level user interface for a Spark application. @@ -33,35 +33,23 @@ private[spark] class SparkUI( val sc: SparkContext, val conf: SparkConf, val securityManager: SecurityManager, - val listenerBus: SparkListenerBus, + val environmentListener: EnvironmentListener, + val storageStatusListener: StorageStatusListener, + val executorsListener: ExecutorsListener, + val jobProgressListener: JobProgressListener, + val storageListener: StorageListener, var appName: String, val basePath: String = "") extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { - def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) - def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath) - - def this( - conf: SparkConf, - securityManager: SecurityManager, - listenerBus: SparkListenerBus, - appName: String, - basePath: String) = - this(null, conf, securityManager, listenerBus, appName, basePath) - // If SparkContext is not provided, assume the associated application is not live val live = sc != null - // Maintain executor storage status through Spark events - val storageStatusListener = new StorageStatusListener - initialize() /** Initialize all components of the server. */ def initialize() { - listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) attachTab(new StorageTab(this)) @@ -83,11 +71,6 @@ private[spark] class SparkUI( appName = name } - /** Register the given listener with the listener bus. */ - def registerListener(listener: SparkListener) { - listenerBus.addListener(listener) - } - /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() @@ -116,4 +99,23 @@ private[spark] object SparkUI { def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } + + // Called by HistoryServer and Master when reconstituting a SparkUI from event logs: + def create(conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, + appName: String, basePath: String): SparkUI = { + val environmentListener = new EnvironmentListener + val storageStatusListener = new StorageStatusListener + val executorsListener = new ExecutorsListener(storageStatusListener) + val jobProgressListener = new JobProgressListener(conf) + val storageListener = new StorageListener(storageStatusListener) + + listenerBus.addListener(environmentListener) + listenerBus.addListener(storageStatusListener) + listenerBus.addListener(executorsListener) + listenerBus.addListener(jobProgressListener) + listenerBus.addListener(storageListener) + + new SparkUI(null, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, appName) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 0d158fbe638d..f62260c6f6e1 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -22,10 +22,8 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui._ private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") { - val listener = new EnvironmentListener - + val listener = parent.environmentListener attachPage(new EnvironmentPage(this)) - parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 61eb111cd910..689cf02b25b7 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -26,10 +26,9 @@ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val listener = new ExecutorsListener(parent.storageStatusListener) + val listener = parent.executorsListener attachPage(new ExecutorsPage(this)) - parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index c16542c9db30..547c394b55cb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -29,12 +29,11 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st val sc = parent.sc val conf = if (live) sc.conf else new SparkConf val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) - val listener = new JobProgressListener(conf) + val listener = parent.jobProgressListener attachPage(new JobProgressPage(this)) attachPage(new StagePage(this)) attachPage(new PoolPage(this)) - parent.registerListener(listener) def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 76097f1c51f8..a81291d50558 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -26,11 +26,10 @@ import org.apache.spark.storage._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") { - val listener = new StorageListener(parent.storageStatusListener) + val listener = parent.storageListener attachPage(new StoragePage(this)) attachPage(new RDDPage(this)) - parent.registerListener(listener) } /** From ac2d13aae4fc226c9e625ae6a9e6bba0f71d2dd6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Oct 2014 23:23:56 -0700 Subject: [PATCH 02/18] Add jobId->stage, stageId->stage mappings in JobProgressListener --- .../spark/ui/jobs/JobProgressListener.scala | 45 ++++++++++++++++--- .../org/apache/spark/ui/jobs/UIData.scala | 5 +++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59e5..b3e9b3a5f982 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -40,17 +40,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ + type JobId = Int + type StageId = Int + type StageAttemptId = Int + // How many stages to remember val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) + // How many jobs to remember + val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) - // Map from stageId to StageInfo - val activeStages = new HashMap[Int, StageInfo] - - // Map from (stageId, attemptId) to StageUIData - val stageIdToData = new HashMap[(Int, Int), StageUIData] + val activeJobs = new HashMap[JobId, JobUIData] + val completedJobs = ListBuffer[JobUIData]() + val failedJobs = ListBuffer[JobUIData]() + val jobIdToData = new HashMap[JobId, JobUIData] + val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() + val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] + val stageIdToInfo = new HashMap[StageId, StageInfo] // Map from pool name to a hash map (map from stage id to StageInfo). val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() @@ -61,8 +69,28 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { def blockManagerIds = executorIdToBlockManagerId.values.toSeq + override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { + val jobData: JobUIData = JobUIData(jobStart.jobId, jobStart.stageIds, "running") + jobIdToData(jobStart.jobId) = jobData + activeJobs(jobStart.jobId) = jobData + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { + val jobData: JobUIData = activeJobs(jobEnd.jobId) + activeJobs.remove(jobEnd.jobId) + jobEnd.jobResult match { + case JobSucceeded => + completedJobs += jobData + jobData.status = "completed" + case JobFailed(exception) => + failedJobs += jobData + jobData.status = "failed" + } + } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo + stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { logWarning("Stage completed for unknown stage " + stage.stageId) new StageUIData @@ -89,7 +117,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) } + stages.take(toRemove).foreach { s => + stageIdToData.remove((s.stageId, s.attemptId)) + stageIdToInfo.remove(s.stageId) + } stages.trimStart(toRemove) } } @@ -103,6 +134,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) + stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) stageData.schedulingPool = poolName @@ -277,4 +309,5 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private object JobProgressListener { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 + val DEFAULT_RETAINED_JOBS = 1000 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index a336bf7e1ed0..513420e59958 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -65,4 +65,9 @@ private[jobs] object UIData { var taskInfo: TaskInfo, var taskMetrics: Option[TaskMetrics] = None, var errorMessage: Option[String] = None) + + case class JobUIData( + jobId: Int, + stageIds: Seq[Int], + var status: String /* one of "running", "completed", or "failed" */) } From 08cbec9e7f5b38e40c7ab7c41782fef67db74024 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Oct 2014 23:24:35 -0700 Subject: [PATCH 03/18] Begin to sketch the interfaces for a stable, public status API. Some design goals here: - Hide constructors and implementations from users; only expose interfaces. - Return only immutable objects. - Ensure API will be usable from Java (e.g. only expose Array collections, since they're nicely usable from Scala and Java without having to do any implicit conversions on the Scala side or wrapping into Java-friendly types on the Java side). --- .../java/org/apache/spark/SparkJobInfo.java | 25 ++++++++ .../java/org/apache/spark/SparkStageInfo.java | 23 +++++++ .../scala/org/apache/spark/SparkContext.scala | 6 ++ .../org/apache/spark/StatusAPIImpl.scala | 60 +++++++++++++++++++ 4 files changed, 114 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/SparkJobInfo.java create mode 100644 core/src/main/java/org/apache/spark/SparkStageInfo.java create mode 100644 core/src/main/scala/org/apache/spark/StatusAPIImpl.scala diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java new file mode 100644 index 000000000000..e87924bd3ca1 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +public interface SparkJobInfo { + int getJobId(); + int[] getStageIds(); + SparkStageInfo[] getStages(); + String getStatus(); +} diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java new file mode 100644 index 000000000000..473785abe514 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +public interface SparkStageInfo { + int getStageId(); + String getName(); +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03bc4654a5a0..ca084de16b01 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -246,6 +246,8 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.addListener(jobProgressListener) listenerBus.addListener(storageListener) + private val statusApi = new StatusAPIImpl(this) + // Initialize the Spark UI: private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { @@ -871,6 +873,10 @@ class SparkContext(config: SparkConf) extends Logging { /** The version of Spark on which this application is running. */ def version = SPARK_VERSION + def getJobInfo(jobId: Int): Option[SparkJobInfo] = statusApi.newJobInfo(jobId) + + def getStageInfo(stageId: Int): Option[SparkStageInfo] = statusApi.newStageInfo(stageId) + /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala new file mode 100644 index 000000000000..a958f4777a7c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +private[spark] class StatusAPIImpl(sc: SparkContext) { + def newJobInfo(jobId: Int): Option[SparkJobInfo] = { + sc.jobProgressListener.synchronized { + sc.jobProgressListener.jobIdToData.get(jobId).map { data => + val stageInfos = data.stageIds.flatMap(newStageInfo) + new SparkJobInfoImpl(jobId, data.stageIds.toArray, stageInfos.toArray, data.status) + } + } + } + + def newStageInfo(stageId: Int): Option[SparkStageInfo] = { + sc.jobProgressListener.synchronized { + sc.jobProgressListener.stageIdToInfo.get(stageId).map { info => + new SparkStageInfoImpl(stageId, info.name) + } + } + } +} + +private class SparkJobInfoImpl ( + jobId: Int, + stageIds: Array[Int], + stageInfos: Array[SparkStageInfo], + status: String) + extends SparkJobInfo { + + def getJobId: Int = jobId + def getStageIds: Array[Int] = stageIds + def getStages: Array[SparkStageInfo] = stageInfos + def getStatus: String = status +} + +private class SparkStageInfoImpl( + stageId: Int, + name: String) + extends SparkStageInfo { + + def getStageId: Int = stageId + def getName: String = name +} + From 6e840d4bace946ea4e36e372ce175b0a0dfc4db2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 8 Oct 2014 16:47:18 -0700 Subject: [PATCH 04/18] Remove getter-style names and "consistent snapshot" semantics: - Java-style getX() method naming doesn't make sense for immutable objects that will never have any setters. - The "consistent snapshot of the entire job -> stage -> task mapping" semantics might be very expensive to implement for large jobs, so I've decided to remove chaining between SparkJobInfo and SparkStageInfo interfaces. Concretely, this means that you can't write something like job.stages()(0).name to get the name of the first stage in a job. Instead, you have to explicitly get the stage's ID from the job and then look up that stage using sc.getStageInfo(). This isn't to say that we can't implement methods like "getNumActiveStages" that reflect consistent state; the goal is mainly to avoid spending lots of time / memory to construct huge object graphs. --- .../java/org/apache/spark/SparkJobInfo.java | 7 ++--- .../java/org/apache/spark/SparkStageInfo.java | 4 +-- .../org/apache/spark/StatusAPIImpl.scala | 29 +++++-------------- 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java index e87924bd3ca1..934dcad3ee71 100644 --- a/core/src/main/java/org/apache/spark/SparkJobInfo.java +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -18,8 +18,7 @@ package org.apache.spark; public interface SparkJobInfo { - int getJobId(); - int[] getStageIds(); - SparkStageInfo[] getStages(); - String getStatus(); + int jobId(); + int[] stageIds(); + String status(); } diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 473785abe514..97a915d5f4dc 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -18,6 +18,6 @@ package org.apache.spark; public interface SparkStageInfo { - int getStageId(); - String getName(); + int stageId(); + String name(); } diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index a958f4777a7c..e1dacf700643 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -21,8 +21,7 @@ private[spark] class StatusAPIImpl(sc: SparkContext) { def newJobInfo(jobId: Int): Option[SparkJobInfo] = { sc.jobProgressListener.synchronized { sc.jobProgressListener.jobIdToData.get(jobId).map { data => - val stageInfos = data.stageIds.flatMap(newStageInfo) - new SparkJobInfoImpl(jobId, data.stageIds.toArray, stageInfos.toArray, data.status) + new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) } } } @@ -37,24 +36,12 @@ private[spark] class StatusAPIImpl(sc: SparkContext) { } private class SparkJobInfoImpl ( - jobId: Int, - stageIds: Array[Int], - stageInfos: Array[SparkStageInfo], - status: String) - extends SparkJobInfo { - - def getJobId: Int = jobId - def getStageIds: Array[Int] = stageIds - def getStages: Array[SparkStageInfo] = stageInfos - def getStatus: String = status -} + val jobId: Int, + val stageIds: Array[Int], + val status: String) + extends SparkJobInfo private class SparkStageInfoImpl( - stageId: Int, - name: String) - extends SparkStageInfo { - - def getStageId: Int = stageId - def getName: String = name -} - + val stageId: Int, + val name: String) + extends SparkStageInfo \ No newline at end of file From cc568e57d6a1ec70b61d1c0d78a1902bd88aeefa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 8 Oct 2014 18:06:30 -0700 Subject: [PATCH 05/18] Add note explaining that interfaces should not be implemented outside of Spark. --- core/src/main/java/org/apache/spark/SparkJobInfo.java | 6 ++++++ core/src/main/java/org/apache/spark/SparkStageInfo.java | 6 ++++++ .../java/org/apache/spark/examples/JavaStatusAPITest.java | 8 ++++++++ 3 files changed, 20 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java index 934dcad3ee71..c4f09cb10b95 100644 --- a/core/src/main/java/org/apache/spark/SparkJobInfo.java +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -17,6 +17,12 @@ package org.apache.spark; +/** + * Exposes information about Spark Jobs. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ public interface SparkJobInfo { int jobId(); int[] stageIds(); diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 97a915d5f4dc..f8225f1c496d 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -17,6 +17,12 @@ package org.apache.spark; +/** + * Exposes information about Spark Stages. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ public interface SparkStageInfo { int stageId(); String name(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java new file mode 100644 index 000000000000..f0f61826d9c1 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java @@ -0,0 +1,8 @@ +package org.apache.spark.examples; + +/** + * Created by joshrosen on 10/8/14. + */ +public class JavaStatusAPITest { + public static +} From 7319ffdaf013782bf4f0a2ee4d123d42934fb54e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 8 Oct 2014 18:07:40 -0700 Subject: [PATCH 06/18] Add getJobIdsForGroup() and num*Tasks() methods. --- .../java/org/apache/spark/SparkStageInfo.java | 4 +++ .../scala/org/apache/spark/SparkContext.scala | 2 ++ .../org/apache/spark/StatusAPIImpl.scala | 27 ++++++++++++++++--- .../spark/ui/jobs/JobProgressListener.scala | 3 ++- .../org/apache/spark/ui/jobs/UIData.scala | 1 + 5 files changed, 33 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index f8225f1c496d..8387dbebe1fc 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -26,4 +26,8 @@ public interface SparkStageInfo { int stageId(); String name(); + int numTasks(); + int numActiveTasks(); + int numCompleteTasks(); + int numFailedTasks(); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ca084de16b01..ef838162f7c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -873,6 +873,8 @@ class SparkContext(config: SparkConf) extends Logging { /** The version of Spark on which this application is running. */ def version = SPARK_VERSION + def getJobsIdsForGroup(jobGroup: String): Array[Int] = statusApi.jobIdsForGroup(jobGroup) + def getJobInfo(jobId: Int): Option[SparkJobInfo] = statusApi.newJobInfo(jobId) def getStageInfo(stageId: Int): Option[SparkStageInfo] = statusApi.newStageInfo(stageId) diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index e1dacf700643..cfcf9d70dc13 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -18,6 +18,14 @@ package org.apache.spark private[spark] class StatusAPIImpl(sc: SparkContext) { + + def jobIdsForGroup(jobGroup: String): Array[Int] = { + sc.jobProgressListener.synchronized { + val jobData = sc.jobProgressListener.jobIdToData.valuesIterator + jobData.filter(_.jobGroup == Some(jobGroup)).map(_.jobId).toArray + } + } + def newJobInfo(jobId: Int): Option[SparkJobInfo] = { sc.jobProgressListener.synchronized { sc.jobProgressListener.jobIdToData.get(jobId).map { data => @@ -28,8 +36,17 @@ private[spark] class StatusAPIImpl(sc: SparkContext) { def newStageInfo(stageId: Int): Option[SparkStageInfo] = { sc.jobProgressListener.synchronized { - sc.jobProgressListener.stageIdToInfo.get(stageId).map { info => - new SparkStageInfoImpl(stageId, info.name) + for ( + info <- sc.jobProgressListener.stageIdToInfo.get(stageId); + data <- sc.jobProgressListener.stageIdToData.get((stageId, info.attemptId)) + ) yield { + new SparkStageInfoImpl( + stageId, + info.name, + numTasks = info.numTasks, + numActiveTasks = data.numActiveTasks, + numCompleteTasks = data.numCompleteTasks, + numFailedTasks = data.numFailedTasks) } } } @@ -43,5 +60,9 @@ private class SparkJobInfoImpl ( private class SparkStageInfoImpl( val stageId: Int, - val name: String) + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompleteTasks: Int, + val numFailedTasks: Int) extends SparkStageInfo \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b3e9b3a5f982..6f2fc1bf0ca4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -70,7 +70,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { def blockManagerIds = executorIdToBlockManagerId.values.toSeq override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { - val jobData: JobUIData = JobUIData(jobStart.jobId, jobStart.stageIds, "running") + val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + val jobData: JobUIData = JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, "running") jobIdToData(jobStart.jobId) = jobData activeJobs(jobStart.jobId) = jobData } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 513420e59958..49a292c4601c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -69,5 +69,6 @@ private[jobs] object UIData { case class JobUIData( jobId: Int, stageIds: Seq[Int], + jobGroup: Option[String], var status: String /* one of "running", "completed", or "failed" */) } From da5648e7472a44a86f8887b4b413fb88ec1d96ba Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 8 Oct 2014 18:08:40 -0700 Subject: [PATCH 07/18] Add example of basic progress reporting in Java. This is a very rough WIP example; things should be considerably simpler once I port AsyncRDDActions to Java. --- .../spark/api/java/JavaSparkContext.scala | 6 ++ .../spark/examples/JavaStatusAPITest.java | 89 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 791d853a015a..cbda7f5fb79b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -132,6 +132,12 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions + def getJobsIdsForGroup(jobGroup: String): Array[Int] = sc.getJobsIdsForGroup(jobGroup) + + def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull + + def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java index f0f61826d9c1..6ab130eb47b0 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java @@ -1,8 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.examples; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + /** - * Created by joshrosen on 10/8/14. + * Example of using Spark's status APIs from Java. */ -public class JavaStatusAPITest { - public static +public final class JavaStatusAPITest { + + public static final String APP_NAME = "JavaStatusAPITest"; + + public static final class IdentityWithDelay implements Function { + @Override + public T call(T x) throws Exception { + Thread.sleep(2 * 1000); // 2 seconds + return x; + } + } + + public static void main(String[] args) throws Exception { + SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); + final JavaSparkContext sc = new JavaSparkContext(sparkConf); + + // Example of implementing a simple progress reporter for a single-action job. + // TODO: refactor this to use collectAsync() once we've implemented AsyncRDDActions in Java. + + // Submit a single-stage job asynchronously: + ExecutorService pool = Executors.newFixedThreadPool(1); + Future> jobFuture = pool.submit(new Callable>() { + @Override + public List call() { + try { + sc.setJobGroup(APP_NAME, "[Job Group Description]"); + return sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( + new IdentityWithDelay()).collect(); + } catch (Exception e) { + System.out.println("Got exception while submitting job: "); + e.printStackTrace(); + throw e; + } + } + }); + + // Monitor the progress of our job + while (sc.getJobsIdsForGroup(APP_NAME).length == 0) { + System.out.println("Waiting for job to be submitted to scheduler"); + Thread.sleep(1000); + } + int jobId = sc.getJobsIdsForGroup(APP_NAME)[0]; + System.out.println("Job was submitted with id " + jobId); + while (!jobFuture.isDone()) { + Thread.sleep(1000); // 1 second + SparkJobInfo jobInfo = sc.getJobInfo(jobId); + SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); + System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + + " active, " + stageInfo.numCompleteTasks() + " complete"); + } + + System.out.println("Job results are: " + jobFuture.get()); + pool.shutdown(); + sc.stop(); + } } From 249ca16c1a9b6f101121b821718b360309463356 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Oct 2014 17:12:50 -0700 Subject: [PATCH 08/18] Address several review comments: - Use enumeration for job status. - Remove StatusAPIImpl class; implement methods directly in SparkContext. - Fix typo in getJobIdsForGroup method name. - Add Javadoc and comment on nullability. - Rename numCompleteTasks -> numCompleted tasks - Perform more efficient filtering in jobIdsForGroup --- .../org/apache/spark/ExecutionStatus.java | 24 ++++++++++ .../java/org/apache/spark/SparkJobInfo.java | 2 +- .../java/org/apache/spark/SparkStageInfo.java | 2 +- .../scala/org/apache/spark/SparkContext.scala | 45 ++++++++++++++++--- .../org/apache/spark/StatusAPIImpl.scala | 41 ++--------------- .../spark/api/java/JavaSparkContext.scala | 14 +++++- .../spark/ui/jobs/JobProgressListener.scala | 7 +-- .../org/apache/spark/ui/jobs/UIData.scala | 3 +- .../spark/examples/JavaStatusAPITest.java | 6 +-- 9 files changed, 91 insertions(+), 53 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/ExecutionStatus.java diff --git a/core/src/main/java/org/apache/spark/ExecutionStatus.java b/core/src/main/java/org/apache/spark/ExecutionStatus.java new file mode 100644 index 000000000000..8e80561b6056 --- /dev/null +++ b/core/src/main/java/org/apache/spark/ExecutionStatus.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +public enum ExecutionStatus { + RUNNING, + SUCCEEDED, + FAILED +} diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java index c4f09cb10b95..40bfd096b582 100644 --- a/core/src/main/java/org/apache/spark/SparkJobInfo.java +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -26,5 +26,5 @@ public interface SparkJobInfo { int jobId(); int[] stageIds(); - String status(); + ExecutionStatus status(); } diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 8387dbebe1fc..186431eb3900 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -28,6 +28,6 @@ public interface SparkStageInfo { String name(); int numTasks(); int numActiveTasks(); - int numCompleteTasks(); + int numCompletedTasks(); int numFailedTasks(); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef838162f7c3..d763afc9fd06 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -246,8 +246,6 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.addListener(jobProgressListener) listenerBus.addListener(storageListener) - private val statusApi = new StatusAPIImpl(this) - // Initialize the Spark UI: private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { @@ -873,11 +871,48 @@ class SparkContext(config: SparkConf) extends Logging { /** The version of Spark on which this application is running. */ def version = SPARK_VERSION - def getJobsIdsForGroup(jobGroup: String): Array[Int] = statusApi.jobIdsForGroup(jobGroup) + /** + * Return a list of all known jobs in a particular job group. The returned list may contain + * running, failed, and completed jobs, and may vary across invocations of this method. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = { + jobProgressListener.synchronized { + val jobData = jobProgressListener.jobIdToData.valuesIterator + jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray + } + } - def getJobInfo(jobId: Int): Option[SparkJobInfo] = statusApi.newJobInfo(jobId) + /** + * Returns job information, or None if the job info could not be found or was garbage-collected. + */ + def getJobInfo(jobId: Int): Option[SparkJobInfo] = { + jobProgressListener.synchronized { + jobProgressListener.jobIdToData.get(jobId).map { data => + new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) + } + } + } - def getStageInfo(stageId: Int): Option[SparkStageInfo] = statusApi.newStageInfo(stageId) + /** + * Returns stage information, or None if the stage info could not be found or was + * garbage-collected. + */ + def getStageInfo(stageId: Int): Option[SparkStageInfo] = { + jobProgressListener.synchronized { + for ( + info <- jobProgressListener.stageIdToInfo.get(stageId); + data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) + ) yield { + new SparkStageInfoImpl( + stageId, + info.name, + info.numTasks, + data.numActiveTasks, + data.numCompleteTasks, + data.numFailedTasks) + } + } + } /** * Return a map from the slave to the max memory available for caching and the remaining diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index cfcf9d70dc13..7948402f5c29 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -17,45 +17,10 @@ package org.apache.spark -private[spark] class StatusAPIImpl(sc: SparkContext) { - - def jobIdsForGroup(jobGroup: String): Array[Int] = { - sc.jobProgressListener.synchronized { - val jobData = sc.jobProgressListener.jobIdToData.valuesIterator - jobData.filter(_.jobGroup == Some(jobGroup)).map(_.jobId).toArray - } - } - - def newJobInfo(jobId: Int): Option[SparkJobInfo] = { - sc.jobProgressListener.synchronized { - sc.jobProgressListener.jobIdToData.get(jobId).map { data => - new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) - } - } - } - - def newStageInfo(stageId: Int): Option[SparkStageInfo] = { - sc.jobProgressListener.synchronized { - for ( - info <- sc.jobProgressListener.stageIdToInfo.get(stageId); - data <- sc.jobProgressListener.stageIdToData.get((stageId, info.attemptId)) - ) yield { - new SparkStageInfoImpl( - stageId, - info.name, - numTasks = info.numTasks, - numActiveTasks = data.numActiveTasks, - numCompleteTasks = data.numCompleteTasks, - numFailedTasks = data.numFailedTasks) - } - } - } -} - private class SparkJobInfoImpl ( val jobId: Int, val stageIds: Array[Int], - val status: String) + val status: ExecutionStatus) extends SparkJobInfo private class SparkStageInfoImpl( @@ -63,6 +28,6 @@ private class SparkStageInfoImpl( val name: String, val numTasks: Int, val numActiveTasks: Int, - val numCompleteTasks: Int, + val numCompletedTasks: Int, val numFailedTasks: Int) - extends SparkStageInfo \ No newline at end of file + extends SparkStageInfo diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index cbda7f5fb79b..8c481d76e19b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -132,10 +132,22 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions - def getJobsIdsForGroup(jobGroup: String): Array[Int] = sc.getJobsIdsForGroup(jobGroup) + /** + * Return a list of all known jobs in a particular job group. The returned list may contain + * running, failed, and completed jobs, and may vary across invocations of this method. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup) + + /** + * Returns job information, or `null` if the job info could not be found or was garbage-collected. + */ def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull + /** + * Returns stage information, or `null` if the stage info could not be found or was + * garbage-collected. + */ def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull /** Distribute a local Scala collection to form an RDD. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6f2fc1bf0ca4..82976f46ef21 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -71,7 +71,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) - val jobData: JobUIData = JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, "running") + val jobData: JobUIData = + JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup,ExecutionStatus.RUNNING) jobIdToData(jobStart.jobId) = jobData activeJobs(jobStart.jobId) = jobData } @@ -82,10 +83,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData - jobData.status = "completed" + jobData.status = ExecutionStatus.SUCCEEDED case JobFailed(exception) => failedJobs += jobData - jobData.status = "failed" + jobData.status = ExecutionStatus.FAILED } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 49a292c4601c..1823b69b0062 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.jobs +import org.apache.spark.ExecutionStatus import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.collection.OpenHashSet @@ -70,5 +71,5 @@ private[jobs] object UIData { jobId: Int, stageIds: Seq[Int], jobGroup: Option[String], - var status: String /* one of "running", "completed", or "failed" */) + var status: ExecutionStatus) } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java index 6ab130eb47b0..b00355384e1e 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java @@ -70,18 +70,18 @@ public List call() { }); // Monitor the progress of our job - while (sc.getJobsIdsForGroup(APP_NAME).length == 0) { + while (sc.getJobIdsForGroup(APP_NAME).length == 0) { System.out.println("Waiting for job to be submitted to scheduler"); Thread.sleep(1000); } - int jobId = sc.getJobsIdsForGroup(APP_NAME)[0]; + int jobId = sc.getJobIdsForGroup(APP_NAME)[0]; System.out.println("Job was submitted with id " + jobId); while (!jobFuture.isDone()) { Thread.sleep(1000); // 1 second SparkJobInfo jobInfo = sc.getJobInfo(jobId); SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + - " active, " + stageInfo.numCompleteTasks() + " complete"); + " active, " + stageInfo.numCompletedTasks() + " complete"); } System.out.println("Job results are: " + jobFuture.get()); From 3dc79afcf9bdf33ee2869b24e07e8683d5b41121 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 19:18:38 -0700 Subject: [PATCH 09/18] Remove creation of unused listeners in SparkContext. --- .../scala/org/apache/spark/SparkContext.scala | 16 +------ .../deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 48 ++++++++++++------- .../spark/ui/jobs/JobProgressPage.scala | 9 ++-- .../apache/spark/ui/jobs/JobProgressTab.scala | 7 ++- .../org/apache/spark/ui/jobs/PoolPage.scala | 3 +- 7 files changed, 42 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d763afc9fd06..9b7ec79c7fe7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -50,10 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.env.EnvironmentListener -import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.ui.storage.StorageListener import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} /** @@ -234,23 +231,14 @@ class SparkContext(config: SparkConf) extends Logging { new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) - private[spark] val environmentListener = new EnvironmentListener - private[spark] val storageStatusListener = new StorageStatusListener - private[spark] val executorsListener = new ExecutorsListener(storageStatusListener) private[spark] val jobProgressListener = new JobProgressListener(conf) - private[spark] val storageListener = new StorageListener(storageStatusListener) - - listenerBus.addListener(environmentListener) - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) listenerBus.addListener(jobProgressListener) - listenerBus.addListener(storageListener) // Initialize the Spark UI: private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(new SparkUI(this, conf, env.securityManager, environmentListener, storageStatusListener, - executorsListener, jobProgressListener, storageListener, appName)) + Some(SparkUI.create(Some(this), conf, listenerBus, env.securityManager, appName, + jobProgressListener = Some(jobProgressListener))) } else { // For tests, do not enable the UI None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d7a86d8037d9..53ed84414a5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.create(conf, replayBus, appSecManager, appId, + SparkUI.create(None, conf, replayBus, appSecManager, appId, s"${HistoryServer.UI_PATH_PREFIX}/$appId") // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b4d4012f49e2..c7207fd4fe49 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -714,7 +714,7 @@ private[spark] class Master( try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = SparkUI.create(new SparkConf, replayBus, new SecurityManager(conf), + val ui = SparkUI.create(None, new SparkConf, replayBus, new SecurityManager(conf), appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index c77eddfad943..11da8b56d9a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,8 +29,8 @@ import org.apache.spark.ui.storage.{StorageListener, StorageTab} /** * Top level user interface for a Spark application. */ -private[spark] class SparkUI( - val sc: SparkContext, +private[spark] class SparkUI private[ui] ( + val sc: Option[SparkContext], val conf: SparkConf, val securityManager: SecurityManager, val environmentListener: EnvironmentListener, @@ -39,15 +39,10 @@ private[spark] class SparkUI( val jobProgressListener: JobProgressListener, val storageListener: StorageListener, var appName: String, - val basePath: String = "") + val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { - // If SparkContext is not provided, assume the associated application is not live - val live = sc != null - - initialize() - /** Initialize all components of the server. */ def initialize() { val jobProgressTab = new JobProgressTab(this) @@ -59,10 +54,10 @@ private[spark] class SparkUI( attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) - if (live) { - sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) - } + // If the UI is live, then serve + sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) } } + initialize() def getAppName = appName @@ -100,22 +95,39 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } - // Called by HistoryServer and Master when reconstituting a SparkUI from event logs: - def create(conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, - appName: String, basePath: String): SparkUI = { + /** + * Create a new Spark UI. + * + * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. + * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the + * web UI will create and register its own JobProgressListener. + */ + def create( + sc: Option[SparkContext], + conf: SparkConf, + listenerBus: SparkListenerBus, + securityManager: SecurityManager, + appName: String, + basePath: String = "", + jobProgressListener: Option[JobProgressListener] = None): SparkUI = { + + val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val listener = new JobProgressListener(conf) + listenerBus.addListener(listener) + listener + } + val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) - val jobProgressListener = new JobProgressListener(conf) val storageListener = new StorageListener(storageStatusListener) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) - listenerBus.addListener(jobProgressListener) listenerBus.addListener(storageListener) - new SparkUI(null, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, jobProgressListener, storageListener, appName) + new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, _jobProgressListener, storageListener, appName, basePath) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index a82f71ed0847..40a4a5bd7cc3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") { - private val live = parent.live private val sc = parent.sc private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler @@ -47,17 +46,17 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) sc.getAllPools else Seq[Schedulable]() + val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) val poolTable = new PoolTable(pools, parent) val summary: NodeSeq =
    - {if (live) { + {if (sc.isDefined) { // Total duration is not meaningful unless the UI is live
  • Total Duration: - {UIUtils.formatDuration(now - sc.startTime)} + {UIUtils.formatDuration(now - sc.get.startTime)}
  • }}
  • @@ -80,7 +79,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
val content = summary ++ - {if (live && isFairScheduler) { + {if (sc.isDefined && isFairScheduler) {

{pools.size} Fair Scheduler Pools

++ poolTable.toNodeSeq } else { Seq[Node]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 547c394b55cb..03ca918e2e8b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -25,10 +25,9 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") { - val live = parent.live val sc = parent.sc - val conf = if (live) sc.conf else new SparkConf - val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) + val conf = sc.map(_.conf).getOrElse(new SparkConf) + val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) val listener = parent.jobProgressListener attachPage(new JobProgressPage(this)) @@ -42,7 +41,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { - sc.cancelStage(stageId) + sc.get.cancelStage(stageId) } // Do a quick pause here to give Spark time to kill the stage so it shows up as // killed after the refresh. Note that this will block the serving thread so the diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7a6c7d1a497e..770d99eea1c9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { - private val live = parent.live private val sc = parent.sc private val listener = parent.listener @@ -42,7 +41,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() + val pools = sc.map(_.getPoolForName(poolName).get).toSeq val poolTable = new PoolTable(pools, parent) val content = From f9a9a0008cc058da1bb1038eafe6df713646cf9d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 19:37:55 -0700 Subject: [PATCH 10/18] More review comments: - "garbage-collected" -> "garbage collected" - ExecutionStatus -> JobExecutionStatus - Make JobUIData into regular class; add default constructor. - Handle missing JobUIData in onJobComplete handler. --- ...ecutionStatus.java => JobExecutionStatus.java} | 5 +++-- .../main/java/org/apache/spark/SparkJobInfo.java | 2 +- .../scala/org/apache/spark/SparkContext.scala | 4 ++-- .../scala/org/apache/spark/StatusAPIImpl.scala | 2 +- .../apache/spark/api/java/JavaSparkContext.scala | 4 ++-- .../spark/ui/jobs/JobProgressListener.scala | 12 +++++++----- .../scala/org/apache/spark/ui/jobs/UIData.scala | 15 ++++++++------- 7 files changed, 24 insertions(+), 20 deletions(-) rename core/src/main/java/org/apache/spark/{ExecutionStatus.java => JobExecutionStatus.java} (94%) diff --git a/core/src/main/java/org/apache/spark/ExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java similarity index 94% rename from core/src/main/java/org/apache/spark/ExecutionStatus.java rename to core/src/main/java/org/apache/spark/JobExecutionStatus.java index 8e80561b6056..6e161313702b 100644 --- a/core/src/main/java/org/apache/spark/ExecutionStatus.java +++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java @@ -17,8 +17,9 @@ package org.apache.spark; -public enum ExecutionStatus { +public enum JobExecutionStatus { RUNNING, SUCCEEDED, - FAILED + FAILED, + UNKNOWN } diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java index 40bfd096b582..4e3c983b1170 100644 --- a/core/src/main/java/org/apache/spark/SparkJobInfo.java +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -26,5 +26,5 @@ public interface SparkJobInfo { int jobId(); int[] stageIds(); - ExecutionStatus status(); + JobExecutionStatus status(); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9b7ec79c7fe7..350e4a4de707 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -871,7 +871,7 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Returns job information, or None if the job info could not be found or was garbage-collected. + * Returns job information, or None if the job info could not be found or was garbage collected. */ def getJobInfo(jobId: Int): Option[SparkJobInfo] = { jobProgressListener.synchronized { @@ -883,7 +883,7 @@ class SparkContext(config: SparkConf) extends Logging { /** * Returns stage information, or None if the stage info could not be found or was - * garbage-collected. + * garbage collected. */ def getStageInfo(stageId: Int): Option[SparkStageInfo] = { jobProgressListener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index 7948402f5c29..85ee28353b51 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -20,7 +20,7 @@ package org.apache.spark private class SparkJobInfoImpl ( val jobId: Int, val stageIds: Array[Int], - val status: ExecutionStatus) + val status: JobExecutionStatus) extends SparkJobInfo private class SparkStageInfoImpl( diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8c481d76e19b..9885ec234ee9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -140,13 +140,13 @@ class JavaSparkContext(val sc: SparkContext) def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup) /** - * Returns job information, or `null` if the job info could not be found or was garbage-collected. + * Returns job information, or `null` if the job info could not be found or was garbage collected. */ def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull /** * Returns stage information, or `null` if the stage info could not be found or was - * garbage-collected. + * garbage collected. */ def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 82976f46ef21..b5207360510d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -72,21 +72,23 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) val jobData: JobUIData = - JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup,ExecutionStatus.RUNNING) + new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING) jobIdToData(jobStart.jobId) = jobData activeJobs(jobStart.jobId) = jobData } override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { - val jobData: JobUIData = activeJobs(jobEnd.jobId) - activeJobs.remove(jobEnd.jobId) + val jobData = activeJobs.remove(jobEnd.jobId).getOrElse { + logWarning(s"Job completed for unknown job ${jobEnd.jobId}") + new JobUIData(jobId = jobEnd.jobId) + } jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData - jobData.status = ExecutionStatus.SUCCEEDED + jobData.status = JobExecutionStatus.SUCCEEDED case JobFailed(exception) => failedJobs += jobData - jobData.status = ExecutionStatus.FAILED + jobData.status = JobExecutionStatus.FAILED } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 1823b69b0062..e2813f8eb5ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import org.apache.spark.ExecutionStatus +import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.collection.OpenHashSet @@ -37,6 +37,13 @@ private[jobs] object UIData { var diskBytesSpilled : Long = 0 } + class JobUIData( + var jobId: Int = -1, + var stageIds: Seq[Int] = Seq.empty, + var jobGroup: Option[String] = None, + var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN + ) + class StageUIData { var numActiveTasks: Int = _ var numCompleteTasks: Int = _ @@ -66,10 +73,4 @@ private[jobs] object UIData { var taskInfo: TaskInfo, var taskMetrics: Option[TaskMetrics] = None, var errorMessage: Option[String] = None) - - case class JobUIData( - jobId: Int, - stageIds: Seq[Int], - jobGroup: Option[String], - var status: ExecutionStatus) } From 787444c4ee20693a8f8c4fb5320ee4c4133a0d91 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 20 Oct 2014 19:46:15 -0700 Subject: [PATCH 11/18] Move status API methods into trait that can be mixed into SparkContext. --- .../scala/org/apache/spark/SparkContext.scala | 111 +------------- .../org/apache/spark/SparkStatusAPI.scala | 141 ++++++++++++++++++ 2 files changed, 143 insertions(+), 109 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/SparkStatusAPI.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 350e4a4de707..588299ec4412 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -61,7 +60,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends Logging { +class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -234,7 +233,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) - // Initialize the Spark UI: + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), conf, listenerBus, env.securityManager, appName, @@ -859,112 +858,6 @@ class SparkContext(config: SparkConf) extends Logging { /** The version of Spark on which this application is running. */ def version = SPARK_VERSION - /** - * Return a list of all known jobs in a particular job group. The returned list may contain - * running, failed, and completed jobs, and may vary across invocations of this method. - */ - def getJobIdsForGroup(jobGroup: String): Array[Int] = { - jobProgressListener.synchronized { - val jobData = jobProgressListener.jobIdToData.valuesIterator - jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray - } - } - - /** - * Returns job information, or None if the job info could not be found or was garbage collected. - */ - def getJobInfo(jobId: Int): Option[SparkJobInfo] = { - jobProgressListener.synchronized { - jobProgressListener.jobIdToData.get(jobId).map { data => - new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) - } - } - } - - /** - * Returns stage information, or None if the stage info could not be found or was - * garbage collected. - */ - def getStageInfo(stageId: Int): Option[SparkStageInfo] = { - jobProgressListener.synchronized { - for ( - info <- jobProgressListener.stageIdToInfo.get(stageId); - data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) - ) yield { - new SparkStageInfoImpl( - stageId, - info.name, - info.numTasks, - data.numActiveTasks, - data.numCompleteTasks, - data.numFailedTasks) - } - } - } - - /** - * Return a map from the slave to the max memory available for caching and the remaining - * memory available for caching. - */ - def getExecutorMemoryStatus: Map[String, (Long, Long)] = { - env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => - (blockManagerId.host + ":" + blockManagerId.port, mem) - } - } - - /** - * :: DeveloperApi :: - * Return information about what RDDs are cached, if they are in mem or on disk, how much space - * they take, etc. - */ - @DeveloperApi - def getRDDStorageInfo: Array[RDDInfo] = { - val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray - StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) - rddInfos.filter(_.isCached) - } - - /** - * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. - * Note that this does not necessarily mean the caching or computation was successful. - */ - def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap - - /** - * :: DeveloperApi :: - * Return information about blocks stored in all of the slaves - */ - @DeveloperApi - def getExecutorStorageStatus: Array[StorageStatus] = { - env.blockManager.master.getStorageStatus - } - - /** - * :: DeveloperApi :: - * Return pools for fair scheduler - */ - @DeveloperApi - def getAllPools: Seq[Schedulable] = { - // TODO(xiajunluan): We should take nested pools into account - taskScheduler.rootPool.schedulableQueue.toSeq - } - - /** - * :: DeveloperApi :: - * Return the pool associated with the given name, if one exists - */ - @DeveloperApi - def getPoolForName(pool: String): Option[Schedulable] = { - Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) - } - - /** - * Return current scheduling mode - */ - def getSchedulingMode: SchedulingMode.SchedulingMode = { - taskScheduler.schedulingMode - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala new file mode 100644 index 000000000000..a721f03ca5b2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.Map +import scala.collection.JavaConversions._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SchedulingMode, Schedulable} +import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo} + + +/** + * Trait that implements Spark's status APIs. This trait is designed to be mixed into + * SparkContext; it allows the status API code to live in its own file. + */ +private[spark] trait SparkStatusAPI { this: SparkContext => + + /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.host + ":" + blockManagerId.port, mem) + } + } + + /** + * :: DeveloperApi :: + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + @DeveloperApi + def getRDDStorageInfo: Array[RDDInfo] = { + val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray + StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) + rddInfos.filter(_.isCached) + } + + /** + * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. + * Note that this does not necessarily mean the caching or computation was successful. + */ + def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap + + /** + * :: DeveloperApi :: + * Return information about blocks stored in all of the slaves + */ + @DeveloperApi + def getExecutorStorageStatus: Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + + /** + * :: DeveloperApi :: + * Return pools for fair scheduler + */ + @DeveloperApi + def getAllPools: Seq[Schedulable] = { + // TODO(xiajunluan): We should take nested pools into account + taskScheduler.rootPool.schedulableQueue.toSeq + } + + /** + * :: DeveloperApi :: + * Return the pool associated with the given name, if one exists + */ + @DeveloperApi + def getPoolForName(pool: String): Option[Schedulable] = { + Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) + } + + /** + * Return current scheduling mode + */ + def getSchedulingMode: SchedulingMode.SchedulingMode = { + taskScheduler.schedulingMode + } + + + /** + * Return a list of all known jobs in a particular job group. The returned list may contain + * running, failed, and completed jobs, and may vary across invocations of this method. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = { + jobProgressListener.synchronized { + val jobData = jobProgressListener.jobIdToData.valuesIterator + jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray + } + } + + /** + * Returns job information, or None if the job info could not be found or was garbage collected. + */ + def getJobInfo(jobId: Int): Option[SparkJobInfo] = { + jobProgressListener.synchronized { + jobProgressListener.jobIdToData.get(jobId).map { data => + new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) + } + } + } + + /** + * Returns stage information, or None if the stage info could not be found or was + * garbage collected. + */ + def getStageInfo(stageId: Int): Option[SparkStageInfo] = { + jobProgressListener.synchronized { + for ( + info <- jobProgressListener.stageIdToInfo.get(stageId); + data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) + ) yield { + new SparkStageInfoImpl( + stageId, + info.name, + info.numTasks, + data.numActiveTasks, + data.numCompleteTasks, + data.numFailedTasks) + } + } + } +} From 7f47d6dc45659026c87b70c472039cca058d424f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 15:10:33 -0700 Subject: [PATCH 12/18] Clean up SparkUI constructors, per Andrew's feedback. --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../deploy/history/FsHistoryProvider.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 38 ++++++++++++++----- 4 files changed, 33 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e3a53d8b9724..a16c0071e4f9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -237,8 +237,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.create(Some(this), conf, listenerBus, env.securityManager, appName, - jobProgressListener = Some(jobProgressListener))) + Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, + env.securityManager,appName)) } else { // For tests, do not enable the UI None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 53ed84414a5b..2d1609b97360 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.create(None, conf, replayBus, appSecManager, appId, + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, s"${HistoryServer.UI_PATH_PREFIX}/$appId") // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 733f3254eafc..2f81d472d7b7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -721,7 +721,7 @@ private[spark] class Master( try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = SparkUI.create(None, new SparkConf, replayBus, new SecurityManager(conf), + val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 11da8b56d9a2..7c4ff56de0cb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.storage.{StorageListener, StorageTab} /** * Top level user interface for a Spark application. */ -private[spark] class SparkUI private[ui] ( +private[spark] class SparkUI private ( val sc: Option[SparkContext], val conf: SparkConf, val securityManager: SecurityManager, @@ -95,6 +95,26 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } + def createLiveUI( + sc: SparkContext, + conf: SparkConf, + listenerBus: SparkListenerBus, + jobProgressListener: JobProgressListener, + securityManager: SecurityManager, + appName: String): SparkUI = { + create(Some(sc), conf, listenerBus, securityManager, appName, + jobProgressListener = Some(jobProgressListener)) + } + + def createHistoryUI( + conf: SparkConf, + listenerBus: ReplayListenerBus, + securityManager: SecurityManager, + appName: String, + basePath: String): SparkUI = { + create(None, conf, listenerBus, securityManager, appName, basePath) + } + /** * Create a new Spark UI. * @@ -102,14 +122,14 @@ private[spark] object SparkUI { * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the * web UI will create and register its own JobProgressListener. */ - def create( - sc: Option[SparkContext], - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None): SparkUI = { + private def create( + sc: Option[SparkContext], + conf: SparkConf, + listenerBus: SparkListenerBus, + securityManager: SecurityManager, + appName: String, + basePath: String = "", + jobProgressListener: Option[JobProgressListener] = None): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) From 646ff1def7bd592160281a3e93b6b5b33a88aeb8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 15:12:52 -0700 Subject: [PATCH 13/18] Document spark.ui.retainedJobs. --- docs/configuration.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 96fa1377ec39..1a03d2e93aec 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -364,7 +364,16 @@ Apart from these, the following properties are also available, and may be useful spark.ui.retainedStages 1000 - How many stages the Spark UI remembers before garbage collecting. + How many stages the Spark UI and status APIs remember before garbage + collecting. + + + + spark.ui.retainedJobs + 1000 + + How many stages the Spark UI and status APIs remember before garbage + collecting. From c28ba762bdc6939e764291e3a0ab3c39cf27ed79 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 15:39:01 -0700 Subject: [PATCH 14/18] Update demo code: - Use collectAsync(). - Re-read job id on each iteration through the loop so that users aren't surprised when adapting this code to multi-job actions, like take(). --- ...tusAPITest.java => JavaStatusAPIDemo.java} | 49 ++++++------------- 1 file changed, 14 insertions(+), 35 deletions(-) rename examples/src/main/java/org/apache/spark/examples/{JavaStatusAPITest.java => JavaStatusAPIDemo.java} (57%) diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java similarity index 57% rename from examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java rename to examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java index b00355384e1e..430e96ab14d9 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPITest.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java @@ -20,22 +20,20 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; /** * Example of using Spark's status APIs from Java. */ -public final class JavaStatusAPITest { +public final class JavaStatusAPIDemo { - public static final String APP_NAME = "JavaStatusAPITest"; + public static final String APP_NAME = "JavaStatusAPIDemo"; public static final class IdentityWithDelay implements Function { @Override @@ -49,43 +47,24 @@ public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); final JavaSparkContext sc = new JavaSparkContext(sparkConf); - // Example of implementing a simple progress reporter for a single-action job. - // TODO: refactor this to use collectAsync() once we've implemented AsyncRDDActions in Java. - - // Submit a single-stage job asynchronously: - ExecutorService pool = Executors.newFixedThreadPool(1); - Future> jobFuture = pool.submit(new Callable>() { - @Override - public List call() { - try { - sc.setJobGroup(APP_NAME, "[Job Group Description]"); - return sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( - new IdentityWithDelay()).collect(); - } catch (Exception e) { - System.out.println("Got exception while submitting job: "); - e.printStackTrace(); - throw e; - } - } - }); - - // Monitor the progress of our job - while (sc.getJobIdsForGroup(APP_NAME).length == 0) { - System.out.println("Waiting for job to be submitted to scheduler"); - Thread.sleep(1000); - } - int jobId = sc.getJobIdsForGroup(APP_NAME)[0]; - System.out.println("Job was submitted with id " + jobId); + // Example of implementing a progress reporter for a simple job. + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( + new IdentityWithDelay()); + JavaFutureAction> jobFuture = rdd.collectAsync(); while (!jobFuture.isDone()) { Thread.sleep(1000); // 1 second - SparkJobInfo jobInfo = sc.getJobInfo(jobId); + List jobIds = jobFuture.jobIds(); + if (jobIds.isEmpty()) { + continue; + } + int currentJobId = jobIds.get(jobIds.size() - 1); + SparkJobInfo jobInfo = sc.getJobInfo(currentJobId); SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + " active, " + stageInfo.numCompletedTasks() + " complete"); } System.out.println("Job results are: " + jobFuture.get()); - pool.shutdown(); sc.stop(); } } From 2707f98482a471d9f668643f7248b5d99739a825 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 15:52:16 -0700 Subject: [PATCH 15/18] Expose current stage attempt id --- core/src/main/java/org/apache/spark/SparkStageInfo.java | 1 + core/src/main/scala/org/apache/spark/SparkStatusAPI.scala | 1 + core/src/main/scala/org/apache/spark/StatusAPIImpl.scala | 1 + 3 files changed, 3 insertions(+) diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 186431eb3900..04e2247210ec 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -25,6 +25,7 @@ */ public interface SparkStageInfo { int stageId(); + int currentAttemptId(); String name(); int numTasks(); int numActiveTasks(); diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index a721f03ca5b2..e733d467985e 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -130,6 +130,7 @@ private[spark] trait SparkStatusAPI { this: SparkContext => ) yield { new SparkStageInfoImpl( stageId, + info.attemptId, info.name, info.numTasks, data.numActiveTasks, diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index 85ee28353b51..90b47c847fbc 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -25,6 +25,7 @@ private class SparkJobInfoImpl ( private class SparkStageInfoImpl( val stageId: Int, + val currentAttemptId: Int, val name: String, val numTasks: Int, val numActiveTasks: Int, From c96402d4935021edbc41f7a2d50eb2d45f0c959c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Oct 2014 13:21:50 -0700 Subject: [PATCH 16/18] Address review comments. --- core/src/main/scala/org/apache/spark/SparkStatusAPI.scala | 5 ++--- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index e733d467985e..a739ddf258fe 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SchedulingMode, Schedulable} import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo} - /** * Trait that implements Spark's status APIs. This trait is designed to be mixed into * SparkContext; it allows the status API code to live in its own file. @@ -108,7 +107,7 @@ private[spark] trait SparkStatusAPI { this: SparkContext => } /** - * Returns job information, or None if the job info could not be found or was garbage collected. + * Returns job information, or `None` if the job info could not be found or was garbage collected. */ def getJobInfo(jobId: Int): Option[SparkJobInfo] = { jobProgressListener.synchronized { @@ -119,7 +118,7 @@ private[spark] trait SparkStatusAPI { this: SparkContext => } /** - * Returns stage information, or None if the stage info could not be found or was + * Returns stage information, or `None` if the stage info could not be found or was * garbage collected. */ def getStageInfo(stageId: Int): Option[SparkStageInfo] = { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7c4ff56de0cb..9f393d131e05 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -98,7 +98,7 @@ private[spark] object SparkUI { def createLiveUI( sc: SparkContext, conf: SparkConf, - listenerBus: SparkListenerBus, + listenerBus: LiveListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String): SparkUI = { From b585c16c6ca1bf20e0d18ef68b9f3f34060a7586 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Oct 2014 13:31:07 -0700 Subject: [PATCH 17/18] Accept SparkListenerBus instead of more specific subclasses. --- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 9f393d131e05..049938f82729 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -98,7 +98,7 @@ private[spark] object SparkUI { def createLiveUI( sc: SparkContext, conf: SparkConf, - listenerBus: LiveListenerBus, + listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String): SparkUI = { @@ -108,7 +108,7 @@ private[spark] object SparkUI { def createHistoryUI( conf: SparkConf, - listenerBus: ReplayListenerBus, + listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String): SparkUI = { From e6aa78d16314c66903aa7d1d2c52e5237a9a9d0d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Oct 2014 14:55:30 -0700 Subject: [PATCH 18/18] Add tests. --- .../org/apache/spark/SparkStatusAPI.scala | 3 +- .../spark/api/java/JavaSparkContext.scala | 3 +- .../org/apache/spark/StatusAPISuite.scala | 78 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/StatusAPISuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index a739ddf258fe..1982499c5e1d 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -97,7 +97,8 @@ private[spark] trait SparkStatusAPI { this: SparkContext => /** * Return a list of all known jobs in a particular job group. The returned list may contain - * running, failed, and completed jobs, and may vary across invocations of this method. + * running, failed, and completed jobs, and may vary across invocations of this method. This + * method does not guarantee the order of the elements in its result. */ def getJobIdsForGroup(jobGroup: String): Array[Int] = { jobProgressListener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 9885ec234ee9..45168ba62d3c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -135,7 +135,8 @@ class JavaSparkContext(val sc: SparkContext) /** * Return a list of all known jobs in a particular job group. The returned list may contain - * running, failed, and completed jobs, and may vary across invocations of this method. + * running, failed, and completed jobs, and may vary across invocations of this method. This + * method does not guarantee the order of the elements in its result. */ def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup) diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala new file mode 100644 index 000000000000..4468fba8c1df --- /dev/null +++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.language.postfixOps + +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.JobExecutionStatus._ +import org.apache.spark.SparkContext._ + +class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { + + test("basic status API usage") { + val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() + val jobId: Int = eventually(timeout(10 seconds)) { + val jobIds = jobFuture.jobIds + jobIds.size should be(1) + jobIds.head + } + val jobInfo = eventually(timeout(10 seconds)) { + sc.getJobInfo(jobId).get + } + jobInfo.status() should not be FAILED + val stageIds = jobInfo.stageIds() + stageIds.size should be(2) + + val firstStageInfo = eventually(timeout(10 seconds)) { + sc.getStageInfo(stageIds(0)).get + } + firstStageInfo.stageId() should be(stageIds(0)) + firstStageInfo.currentAttemptId() should be(0) + firstStageInfo.numTasks() should be(2) + eventually(timeout(10 seconds)) { + val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get + updatedFirstStageInfo.numCompletedTasks() should be(2) + updatedFirstStageInfo.numActiveTasks() should be(0) + updatedFirstStageInfo.numFailedTasks() should be(0) + } + } + + test("getJobIdsForGroup()") { + sc.setJobGroup("my-job-group", "description") + sc.getJobIdsForGroup("my-job-group") should be (Seq.empty) + val firstJobFuture = sc.parallelize(1 to 1000).countAsync() + val firstJobId = eventually(timeout(10 seconds)) { + firstJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) + } + val secondJobFuture = sc.parallelize(1 to 1000).countAsync() + val secondJobId = eventually(timeout(10 seconds)) { + secondJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) + } + } +} \ No newline at end of file