Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -61,7 +62,7 @@ import org.apache.spark.util._
* this config overrides the default configs as well as system properties.
*/

class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
Expand Down Expand Up @@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)

val statusTracker = new SparkStatusTracker(this)

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Expand Down Expand Up @@ -1001,6 +1004,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}

/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}

/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
Expand Down
142 changes: 0 additions & 142 deletions core/src/main/scala/org/apache/spark/SparkStatusAPI.scala

This file was deleted.

107 changes: 107 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Low-level status reporting APIs for monitoring job and stage progress.
*
* These APIs intentionally provide very weak consistency semantics; consumers of these APIs should
* be prepared to handle empty / missing information. For example, a job's stage ids may be known
* but the status API may not have any information about the details of those stages, so
* `getStageInfo` could potentially return `None` for a valid stage id.
*
* To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs
* will provide information for the last `spark.ui.retainedStages` stages and
* `spark.ui.retainedJobs` jobs.
*
* NOTE: this class's constructor should be considered private and may be subject to change.
*/
class SparkStatusTracker private[spark] (sc: SparkContext) {

private val jobProgressListener = sc.jobProgressListener

/**
* Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then
* returns all known jobs that are not associated with a job group.
*
* The returned list may contain running, failed, and completed jobs, and may vary across
* invocations of this method. This method does not guarantee the order of the elements in
* its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
}
}

/**
* Returns an array containing the ids of all active stages.
*
* This method does not guarantee the order of the elements in its result.
*/
def getActiveStageIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeStages.values.map(_.stageId).toArray
}
}

/**
* Returns an array containing the ids of all active jobs.
*
* This method does not guarantee the order of the elements in its result.
*/
def getActiveJobIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeJobs.values.map(_.jobId).toArray
}
}

/**
* Returns job information, or `None` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized {
jobProgressListener.jobIdToData.get(jobId).map { data =>
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
}
}

/**
* Returns stage information, or `None` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized {
for (
info <- jobProgressListener.stageIdToInfo.get(stageId);
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
) yield {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.name,
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class JavaSparkContext(val sc: SparkContext)

private[spark] val env = sc.env

def statusTracker = new JavaSparkStatusTracker(sc)

def isLocal: java.lang.Boolean = sc.isLocal

def sparkUser: String = sc.sparkUser
Expand Down Expand Up @@ -134,25 +136,6 @@ class JavaSparkContext(val sc: SparkContext)
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions


/**
* Return a list of all known jobs in a particular job group. The returned list may contain
* running, failed, and completed jobs, and may vary across invocations of this method. This
* method does not guarantee the order of the elements in its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)

/**
* Returns job information, or `null` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull

/**
* Returns stage information, or `null` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
Loading