From c47e294c1f065e377855d2030d54547d1407c968 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 14:38:27 -0800 Subject: [PATCH 1/6] Remove StatusAPI mixin trait. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes binary compatibility easier to reason about and might avoid some pitfalls that I’ve run into while attempting to refactor other parts of SparkContext to use mixin traits (see #3071, for example). Requiring users to access status API methods through `sc.statusAPI.*` also avoids SparkContext bloat and buys us extra freedom for adding parallel higher / lower-level APIs. --- .../scala/org/apache/spark/SparkContext.scala | 68 +++++++++++++- .../org/apache/spark/SparkStatusAPI.scala | 92 ++++--------------- .../spark/api/java/JavaSparkContext.scala | 21 +---- .../org/apache/spark/StatusAPISuite.scala | 12 +-- .../spark/examples/JavaStatusAPIDemo.java | 4 +- 5 files changed, 95 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ea672c813d..b8ed5317f204 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -61,7 +62,7 @@ import org.apache.spark.util._ * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { +class SparkContext(config: SparkConf) extends Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) + val statusAPI = SparkStatusAPI(this) + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { @@ -1001,6 +1004,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** The version of Spark on which this application is running. */ def version = SPARK_VERSION + /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.host + ":" + blockManagerId.port, mem) + } + } + + /** + * :: DeveloperApi :: + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + @DeveloperApi + def getRDDStorageInfo: Array[RDDInfo] = { + val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray + StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) + rddInfos.filter(_.isCached) + } + + /** + * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. + * Note that this does not necessarily mean the caching or computation was successful. + */ + def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap + + /** + * :: DeveloperApi :: + * Return information about blocks stored in all of the slaves + */ + @DeveloperApi + def getExecutorStorageStatus: Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + + /** + * :: DeveloperApi :: + * Return pools for fair scheduler + */ + @DeveloperApi + def getAllPools: Seq[Schedulable] = { + // TODO(xiajunluan): We should take nested pools into account + taskScheduler.rootPool.schedulableQueue.toSeq + } + + /** + * :: DeveloperApi :: + * Return the pool associated with the given name, if one exists + */ + @DeveloperApi + def getPoolForName(pool: String): Option[Schedulable] = { + Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) + } + + /** + * Return current scheduling mode + */ + def getSchedulingMode: SchedulingMode.SchedulingMode = { + taskScheduler.schedulingMode + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index 1982499c5e1d..5e5052409bb4 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -17,83 +17,21 @@ package org.apache.spark -import scala.collection.Map -import scala.collection.JavaConversions._ - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{SchedulingMode, Schedulable} -import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo} - /** - * Trait that implements Spark's status APIs. This trait is designed to be mixed into - * SparkContext; it allows the status API code to live in its own file. + * Low-level status reporting APIs for monitoring job and stage progress. + * + * These APIs intentionally provide very weak consistency semantics; consumers of these APIs should + * be prepared to handle empty / missing information. For example, a job's stage ids may be known + * but the status API may not have any information about the details of those stages, so + * `getStageInfo` could potentially return `None` for a valid stage id. + * + * To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs + * will provide information for the last `spark.ui.retainedStages` stages and + * `spark.ui.retainedJobs` jobs. */ -private[spark] trait SparkStatusAPI { this: SparkContext => - - /** - * Return a map from the slave to the max memory available for caching and the remaining - * memory available for caching. - */ - def getExecutorMemoryStatus: Map[String, (Long, Long)] = { - env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => - (blockManagerId.host + ":" + blockManagerId.port, mem) - } - } - - /** - * :: DeveloperApi :: - * Return information about what RDDs are cached, if they are in mem or on disk, how much space - * they take, etc. - */ - @DeveloperApi - def getRDDStorageInfo: Array[RDDInfo] = { - val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray - StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) - rddInfos.filter(_.isCached) - } - - /** - * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. - * Note that this does not necessarily mean the caching or computation was successful. - */ - def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap - - /** - * :: DeveloperApi :: - * Return information about blocks stored in all of the slaves - */ - @DeveloperApi - def getExecutorStorageStatus: Array[StorageStatus] = { - env.blockManager.master.getStorageStatus - } - - /** - * :: DeveloperApi :: - * Return pools for fair scheduler - */ - @DeveloperApi - def getAllPools: Seq[Schedulable] = { - // TODO(xiajunluan): We should take nested pools into account - taskScheduler.rootPool.schedulableQueue.toSeq - } - - /** - * :: DeveloperApi :: - * Return the pool associated with the given name, if one exists - */ - @DeveloperApi - def getPoolForName(pool: String): Option[Schedulable] = { - Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) - } - - /** - * Return current scheduling mode - */ - def getSchedulingMode: SchedulingMode.SchedulingMode = { - taskScheduler.schedulingMode - } +class SparkStatusAPI private (sc: SparkContext) { + private val jobProgressListener = sc.jobProgressListener /** * Return a list of all known jobs in a particular job group. The returned list may contain @@ -140,3 +78,9 @@ private[spark] trait SparkStatusAPI { this: SparkContext => } } } + +private[spark] object SparkStatusAPI { + def apply(sc: SparkContext): SparkStatusAPI = { + new SparkStatusAPI(sc) + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 5c6e8d32c5c8..665d30acba0e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -105,6 +105,8 @@ class JavaSparkContext(val sc: SparkContext) private[spark] val env = sc.env + def statusAPI = JavaSparkStatusAPI(sc) + def isLocal: java.lang.Boolean = sc.isLocal def sparkUser: String = sc.sparkUser @@ -134,25 +136,6 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions - - /** - * Return a list of all known jobs in a particular job group. The returned list may contain - * running, failed, and completed jobs, and may vary across invocations of this method. This - * method does not guarantee the order of the elements in its result. - */ - def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup) - - /** - * Returns job information, or `null` if the job info could not be found or was garbage collected. - */ - def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull - - /** - * Returns stage information, or `null` if the stage info could not be found or was - * garbage collected. - */ - def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull - /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala index 4468fba8c1df..b90fe39e5b3d 100644 --- a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala @@ -37,20 +37,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { jobIds.head } val jobInfo = eventually(timeout(10 seconds)) { - sc.getJobInfo(jobId).get + sc.statusAPI.getJobInfo(jobId).get } jobInfo.status() should not be FAILED val stageIds = jobInfo.stageIds() stageIds.size should be(2) val firstStageInfo = eventually(timeout(10 seconds)) { - sc.getStageInfo(stageIds(0)).get + sc.statusAPI.getStageInfo(stageIds(0)).get } firstStageInfo.stageId() should be(stageIds(0)) firstStageInfo.currentAttemptId() should be(0) firstStageInfo.numTasks() should be(2) eventually(timeout(10 seconds)) { - val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get + val updatedFirstStageInfo = sc.statusAPI.getStageInfo(stageIds(0)).get updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numActiveTasks() should be(0) updatedFirstStageInfo.numFailedTasks() should be(0) @@ -59,20 +59,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { test("getJobIdsForGroup()") { sc.setJobGroup("my-job-group", "description") - sc.getJobIdsForGroup("my-job-group") should be (Seq.empty) + sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000).countAsync() val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } eventually(timeout(10 seconds)) { - sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) + sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) } val secondJobFuture = sc.parallelize(1 to 1000).countAsync() val secondJobId = eventually(timeout(10 seconds)) { secondJobFuture.jobIds.head } eventually(timeout(10 seconds)) { - sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) + sc.statusAPI.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) } } } \ No newline at end of file diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java index 430e96ab14d9..1d9d0076fb5e 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java @@ -58,8 +58,8 @@ public static void main(String[] args) throws Exception { continue; } int currentJobId = jobIds.get(jobIds.size() - 1); - SparkJobInfo jobInfo = sc.getJobInfo(currentJobId); - SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); + SparkJobInfo jobInfo = sc.statusAPI().getJobInfo(currentJobId); + SparkStageInfo stageInfo = sc.statusAPI().getStageInfo(jobInfo.stageIds()[0]); System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + " active, " + stageInfo.numCompletedTasks() + " complete"); } From a227984df660c7e8eb108c35ac376dfc0acb58fd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 15:05:22 -0800 Subject: [PATCH 2/6] getJobIdsForGroup(null) should return jobs for default group --- .../scala/org/apache/spark/SparkStatusAPI.scala | 11 +++++++---- .../scala/org/apache/spark/StatusAPISuite.scala | 13 ++++++++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index 5e5052409bb4..e2f4ff7987f7 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -34,14 +34,17 @@ class SparkStatusAPI private (sc: SparkContext) { private val jobProgressListener = sc.jobProgressListener /** - * Return a list of all known jobs in a particular job group. The returned list may contain - * running, failed, and completed jobs, and may vary across invocations of this method. This - * method does not guarantee the order of the elements in its result. + * Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then + * returns all known jobs that are not associated with a job group. + * + * The returned list may contain running, failed, and completed jobs, and may vary across + * invocations of this method. This method does not guarantee the order of the elements in + * its result. */ def getJobIdsForGroup(jobGroup: String): Array[Int] = { jobProgressListener.synchronized { val jobData = jobProgressListener.jobIdToData.valuesIterator - jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray + jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray } } diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala index b90fe39e5b3d..def3716a319e 100644 --- a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala @@ -27,9 +27,10 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.JobExecutionStatus._ import org.apache.spark.SparkContext._ -class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { +class StatusAPISuite extends FunSuite with Matchers with LocalSparkContext { test("basic status API usage") { + sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() val jobId: Int = eventually(timeout(10 seconds)) { val jobIds = jobFuture.jobIds @@ -58,6 +59,16 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { } test("getJobIdsForGroup()") { + sc = new SparkContext("local", "test", new SparkConf(false)) + // Passing `null` should return jobs that were not run in a job group: + val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync() + val defaultJobGroupJobId = eventually(timeout(10 seconds)) { + defaultJobGroupFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.statusAPI.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId)) + } + // Test jobs submitted in job groups: sc.setJobGroup("my-job-group", "description") sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000).countAsync() From d5eab1f2378451ad86470d01147d9d63a5c89fc8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 16:50:52 -0800 Subject: [PATCH 3/6] Add getActive[Stage|Job]Ids() methods. --- .../org/apache/spark/SparkStatusAPI.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index e2f4ff7987f7..670830bae56e 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -48,6 +48,28 @@ class SparkStatusAPI private (sc: SparkContext) { } } + /** + * Returns an array containing the ids of all active stages. + * + * This method does not guarantee the order of the elements in its result. + */ + def getActiveStageIds(): Array[Int] = { + jobProgressListener.synchronized { + jobProgressListener.activeStages.values.map(_.stageId).toArray + } + } + + /** + * Returns an array containing the ids of all active jobs. + * + * This method does not guarantee the order of the elements in its result. + */ + def getActiveJobIds(): Array[Int] = { + jobProgressListener.synchronized { + jobProgressListener.activeJobs.values.map(_.jobId).toArray + } + } + /** * Returns job information, or `None` if the job info could not be found or was garbage collected. */ From 2cc7353ba9af00c5dfe390b6b27d477187c1e090 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 16:51:22 -0800 Subject: [PATCH 4/6] Add missing file. --- .../spark/api/java/JavaSparkStatusAPI.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala new file mode 100644 index 000000000000..f37059997417 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java + +import org.apache.spark.{SparkStageInfo, SparkJobInfo, SparkContext} + +/** + * Low-level status reporting APIs for monitoring job and stage progress. + * + * These APIs intentionally provide very weak consistency semantics; consumers of these APIs should + * be prepared to handle empty / missing information. For example, a job's stage ids may be known + * but the status API may not have any information about the details of those stages, so + * `getStageInfo` could potentially return `null` for a valid stage id. + * + * To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs + * will provide information for the last `spark.ui.retainedStages` stages and + * `spark.ui.retainedJobs` jobs. + */ +class JavaSparkStatusAPI private (sc: SparkContext) { + + /** + * Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then + * returns all known jobs that are not associated with a job group. + * + * The returned list may contain running, failed, and completed jobs, and may vary across + * invocations of this method. This method does not guarantee the order of the elements in + * its result. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.statusAPI.getJobIdsForGroup(jobGroup) + + /** + * Returns an array containing the ids of all active stages. + * + * This method does not guarantee the order of the elements in its result. + */ + def getActiveStageIds(): Array[Int] = sc.statusAPI.getActiveStageIds() + + /** + * Returns an array containing the ids of all active jobs. + * + * This method does not guarantee the order of the elements in its result. + */ + def getActiveJobIds(): Array[Int] = sc.statusAPI.getActiveJobIds() + + /** + * Returns job information, or `null` if the job info could not be found or was garbage collected. + */ + def getJobInfo(jobId: Int): SparkJobInfo = sc.statusAPI.getJobInfo(jobId).orNull + + /** + * Returns stage information, or `null` if the stage info could not be found or was + * garbage collected. + */ + def getStageInfo(stageId: Int): SparkStageInfo = sc.statusAPI.getStageInfo(stageId).orNull +} + +private[spark] object JavaSparkStatusAPI { + def apply(sc: SparkContext): JavaSparkStatusAPI = { + new JavaSparkStatusAPI(sc) + } +} \ No newline at end of file From d1b08d8c78c5aab0b858c730c5ed7f1806cffbe6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Nov 2014 17:16:16 -0800 Subject: [PATCH 5/6] Add missing newlines --- core/src/main/scala/org/apache/spark/SparkStatusAPI.scala | 3 ++- .../scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala index 670830bae56e..6dc4b5dc8eb8 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -108,4 +108,5 @@ private[spark] object SparkStatusAPI { def apply(sc: SparkContext): SparkStatusAPI = { new SparkStatusAPI(sc) } -} \ No newline at end of file +} + diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala index f37059997417..5f56d5dcbe62 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala @@ -73,4 +73,5 @@ private[spark] object JavaSparkStatusAPI { def apply(sc: SparkContext): JavaSparkStatusAPI = { new JavaSparkStatusAPI(sc) } -} \ No newline at end of file +} + From 30b0afa7e2f0b4e40f56867a515ec1cea07c43e9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Nov 2014 21:37:48 -0800 Subject: [PATCH 6/6] Rename SparkStatusAPI to SparkStatusTracker. Remove factory methods and replace with private[spark] constructors. --- .../scala/org/apache/spark/SparkContext.scala | 2 +- ...atusAPI.scala => SparkStatusTracker.scala} | 11 +++------- .../spark/api/java/JavaSparkContext.scala | 2 +- ...API.scala => JavaSparkStatusTracker.scala} | 21 +++++++------------ ...PISuite.scala => StatusTrackerSuite.scala} | 16 +++++++------- ...PIDemo.java => JavaStatusTrackerDemo.java} | 6 +++--- 6 files changed, 24 insertions(+), 34 deletions(-) rename core/src/main/scala/org/apache/spark/{SparkStatusAPI.scala => SparkStatusTracker.scala} (95%) rename core/src/main/scala/org/apache/spark/api/java/{JavaSparkStatusAPI.scala => JavaSparkStatusTracker.scala} (82%) rename core/src/test/scala/org/apache/spark/{StatusAPISuite.scala => StatusTrackerSuite.scala} (82%) rename examples/src/main/java/org/apache/spark/examples/{JavaStatusAPIDemo.java => JavaStatusTrackerDemo.java} (92%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b8ed5317f204..65edeeffb837 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) - val statusAPI = SparkStatusAPI(this) + val statusTracker = new SparkStatusTracker(this) // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala similarity index 95% rename from core/src/main/scala/org/apache/spark/SparkStatusAPI.scala rename to core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 6dc4b5dc8eb8..c18d763d7ff4 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -28,8 +28,10 @@ package org.apache.spark * To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs * will provide information for the last `spark.ui.retainedStages` stages and * `spark.ui.retainedJobs` jobs. + * + * NOTE: this class's constructor should be considered private and may be subject to change. */ -class SparkStatusAPI private (sc: SparkContext) { +class SparkStatusTracker private[spark] (sc: SparkContext) { private val jobProgressListener = sc.jobProgressListener @@ -103,10 +105,3 @@ class SparkStatusAPI private (sc: SparkContext) { } } } - -private[spark] object SparkStatusAPI { - def apply(sc: SparkContext): SparkStatusAPI = { - new SparkStatusAPI(sc) - } -} - diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 665d30acba0e..d50ed32ca085 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -105,7 +105,7 @@ class JavaSparkContext(val sc: SparkContext) private[spark] val env = sc.env - def statusAPI = JavaSparkStatusAPI(sc) + def statusTracker = new JavaSparkStatusTracker(sc) def isLocal: java.lang.Boolean = sc.isLocal diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusTracker.scala similarity index 82% rename from core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala rename to core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusTracker.scala index 5f56d5dcbe62..3300cad9efba 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusAPI.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkStatusTracker.scala @@ -30,8 +30,10 @@ import org.apache.spark.{SparkStageInfo, SparkJobInfo, SparkContext} * To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs * will provide information for the last `spark.ui.retainedStages` stages and * `spark.ui.retainedJobs` jobs. + * + * NOTE: this class's constructor should be considered private and may be subject to change. */ -class JavaSparkStatusAPI private (sc: SparkContext) { +class JavaSparkStatusTracker private[spark] (sc: SparkContext) { /** * Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then @@ -41,37 +43,30 @@ class JavaSparkStatusAPI private (sc: SparkContext) { * invocations of this method. This method does not guarantee the order of the elements in * its result. */ - def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.statusAPI.getJobIdsForGroup(jobGroup) + def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.statusTracker.getJobIdsForGroup(jobGroup) /** * Returns an array containing the ids of all active stages. * * This method does not guarantee the order of the elements in its result. */ - def getActiveStageIds(): Array[Int] = sc.statusAPI.getActiveStageIds() + def getActiveStageIds(): Array[Int] = sc.statusTracker.getActiveStageIds() /** * Returns an array containing the ids of all active jobs. * * This method does not guarantee the order of the elements in its result. */ - def getActiveJobIds(): Array[Int] = sc.statusAPI.getActiveJobIds() + def getActiveJobIds(): Array[Int] = sc.statusTracker.getActiveJobIds() /** * Returns job information, or `null` if the job info could not be found or was garbage collected. */ - def getJobInfo(jobId: Int): SparkJobInfo = sc.statusAPI.getJobInfo(jobId).orNull + def getJobInfo(jobId: Int): SparkJobInfo = sc.statusTracker.getJobInfo(jobId).orNull /** * Returns stage information, or `null` if the stage info could not be found or was * garbage collected. */ - def getStageInfo(stageId: Int): SparkStageInfo = sc.statusAPI.getStageInfo(stageId).orNull -} - -private[spark] object JavaSparkStatusAPI { - def apply(sc: SparkContext): JavaSparkStatusAPI = { - new JavaSparkStatusAPI(sc) - } + def getStageInfo(stageId: Int): SparkStageInfo = sc.statusTracker.getStageInfo(stageId).orNull } - diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala similarity index 82% rename from core/src/test/scala/org/apache/spark/StatusAPISuite.scala rename to core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index def3716a319e..8577e4ac7e33 100644 --- a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.JobExecutionStatus._ import org.apache.spark.SparkContext._ -class StatusAPISuite extends FunSuite with Matchers with LocalSparkContext { +class StatusTrackerSuite extends FunSuite with Matchers with LocalSparkContext { test("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) @@ -38,20 +38,20 @@ class StatusAPISuite extends FunSuite with Matchers with LocalSparkContext { jobIds.head } val jobInfo = eventually(timeout(10 seconds)) { - sc.statusAPI.getJobInfo(jobId).get + sc.statusTracker.getJobInfo(jobId).get } jobInfo.status() should not be FAILED val stageIds = jobInfo.stageIds() stageIds.size should be(2) val firstStageInfo = eventually(timeout(10 seconds)) { - sc.statusAPI.getStageInfo(stageIds(0)).get + sc.statusTracker.getStageInfo(stageIds(0)).get } firstStageInfo.stageId() should be(stageIds(0)) firstStageInfo.currentAttemptId() should be(0) firstStageInfo.numTasks() should be(2) eventually(timeout(10 seconds)) { - val updatedFirstStageInfo = sc.statusAPI.getStageInfo(stageIds(0)).get + val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numActiveTasks() should be(0) updatedFirstStageInfo.numFailedTasks() should be(0) @@ -66,24 +66,24 @@ class StatusAPISuite extends FunSuite with Matchers with LocalSparkContext { defaultJobGroupFuture.jobIds.head } eventually(timeout(10 seconds)) { - sc.statusAPI.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId)) + sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId)) } // Test jobs submitted in job groups: sc.setJobGroup("my-job-group", "description") - sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq.empty) + sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000).countAsync() val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } eventually(timeout(10 seconds)) { - sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) + sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) } val secondJobFuture = sc.parallelize(1 to 1000).countAsync() val secondJobId = eventually(timeout(10 seconds)) { secondJobFuture.jobIds.head } eventually(timeout(10 seconds)) { - sc.statusAPI.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) + sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) } } } \ No newline at end of file diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java similarity index 92% rename from examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java rename to examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java index 1d9d0076fb5e..e68ec74c3ed5 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java @@ -31,7 +31,7 @@ /** * Example of using Spark's status APIs from Java. */ -public final class JavaStatusAPIDemo { +public final class JavaStatusTrackerDemo { public static final String APP_NAME = "JavaStatusAPIDemo"; @@ -58,8 +58,8 @@ public static void main(String[] args) throws Exception { continue; } int currentJobId = jobIds.get(jobIds.size() - 1); - SparkJobInfo jobInfo = sc.statusAPI().getJobInfo(currentJobId); - SparkStageInfo stageInfo = sc.statusAPI().getStageInfo(jobInfo.stageIds()[0]); + SparkJobInfo jobInfo = sc.statusTracker().getJobInfo(currentJobId); + SparkStageInfo stageInfo = sc.statusTracker().getStageInfo(jobInfo.stageIds()[0]); System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + " active, " + stageInfo.numCompletedTasks() + " complete"); }