Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{RDDInfo, StorageLevel}

/**
* A representation of a generic cluster graph used for storing information on RDD operations.
Expand Down Expand Up @@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
Expand All @@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)

var rootNodeCount = 0
val addRDDIds = new mutable.HashSet[Int]()
val dropRDDIds = new mutable.HashSet[Int]()

// Find nodes, edges, and operation scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
stage.rddInfos.sortBy(_.id).foreach { rdd =>
val parentIds = rdd.parentIds
val isAllowed =
if (parentIds.isEmpty) {
rootNodeCount += 1
rootNodeCount <= retainedNodes
} else {
parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
}

if (isAllowed) {
addRDDIds += rdd.id
edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
} else {
dropRDDIds += rdd.id
}

// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))

if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
rootCluster.attachChildNode(node)
if (isAllowed) {
rootCluster.attachChildNode(node)
}
} else {
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
Expand All @@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
rootCluster.attachChildCluster(cluster)
}
}
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
if (isAllowed) {
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]

// How many root nodes to retain in DAG Graph
private[ui] val retainedNodes =
conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)

// How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
Expand Down Expand Up @@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
trimStagesIfNecessary()
}

Expand Down