diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c85b3caf8a5e..7da0a9d2285b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -234,8 +234,8 @@ private[spark] class AppStatusListener( (partition.memoryUsed / partition.executors.length) * -1) rdd.diskUsed = addDeltaToValue(rdd.diskUsed, (partition.diskUsed / partition.executors.length) * -1) - partition.update(partition.executors - .filter(!_.equals(event.executorId)), rdd.storageLevel, + partition.update( + partition.executors.filter(!_.equals(event.executorId)), addDeltaToValue(partition.memoryUsed, (partition.memoryUsed / partition.executors.length) * -1), addDeltaToValue(partition.diskUsed, @@ -495,7 +495,7 @@ private[spark] class AppStatusListener( event.stageInfo.rddInfos.foreach { info => if (info.storageLevel.isValid) { - liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now) + liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info, info.storageLevel)), now) } } @@ -916,12 +916,6 @@ private[spark] class AppStatusListener( val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) - val updatedStorageLevel = if (storageLevel.isValid) { - Some(storageLevel.description) - } else { - None - } - // We need information about the executor to update some memory accounting values in the // RDD info, so read that beforehand. val maybeExec = liveExecutors.get(executorId) @@ -936,13 +930,9 @@ private[spark] class AppStatusListener( // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. liveRDDs.get(block.rddId).foreach { rdd => - if (updatedStorageLevel.isDefined) { - rdd.setStorageLevel(updatedStorageLevel.get) - } - val partition = rdd.partition(block.name) - val executors = if (updatedStorageLevel.isDefined) { + val executors = if (storageLevel.isValid) { val current = partition.executors if (current.contains(executorId)) { current @@ -957,7 +947,7 @@ private[spark] class AppStatusListener( // Only update the partition if it's still stored in some executor, otherwise get rid of it. if (executors.nonEmpty) { - partition.update(executors, rdd.storageLevel, + partition.update(executors, addDeltaToValue(partition.memoryUsed, memoryDelta), addDeltaToValue(partition.diskUsed, diskDelta)) } else { diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index aa4a21c1bb81..00c991b49920 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -30,7 +30,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} import org.apache.spark.status.api.v1 -import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet @@ -458,7 +458,13 @@ private class LiveStage extends LiveEntity { } -private class LiveRDDPartition(val blockName: String) { +/** + * Data about a single partition of a cached RDD. The RDD storage level is used to compute the + * effective storage level of the partition, which takes into account the storage actually being + * used by the partition in the executors, and thus may differ from the storage level requested + * by the application. + */ +private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { import LiveEntityHelpers._ @@ -476,12 +482,13 @@ private class LiveRDDPartition(val blockName: String) { def update( executors: Seq[String], - storageLevel: String, memoryUsed: Long, diskUsed: Long): Unit = { + val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap, + if (memoryUsed > 0) rddLevel.deserialized else false, executors.size) value = new v1.RDDPartitionInfo( blockName, - weakIntern(storageLevel), + weakIntern(level.description), memoryUsed, diskUsed, executors) @@ -520,27 +527,31 @@ private class LiveRDDDistribution(exec: LiveExecutor) { } -private class LiveRDD(val info: RDDInfo) extends LiveEntity { +/** + * Tracker for data related to a persisted RDD. + * + * The RDD storage level is immutable, following the current behavior of `RDD.persist()`, even + * though it is mutable in the `RDDInfo` structure. Since the listener does not track unpersisted + * RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage + * it started after the RDD is marked for caching. + */ +private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { import LiveEntityHelpers._ - var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L var diskUsed = 0L + private val levelDescription = weakIntern(storageLevel.description) private val partitions = new HashMap[String, LiveRDDPartition]() private val partitionSeq = new RDDPartitionSeq() private val distributions = new HashMap[String, LiveRDDDistribution]() - def setStorageLevel(level: String): Unit = { - this.storageLevel = weakIntern(level) - } - def partition(blockName: String): LiveRDDPartition = { partitions.getOrElseUpdate(blockName, { - val part = new LiveRDDPartition(blockName) - part.update(Nil, storageLevel, 0L, 0L) + val part = new LiveRDDPartition(blockName, storageLevel) + part.update(Nil, 0L, 0L) partitionSeq.addPartition(part) part }) @@ -578,7 +589,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { info.name, info.numPartitions, partitions.size, - storageLevel, + levelDescription, memoryUsed, diskUsed, dists, diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 4b71a4844bde..6bf163506e0c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -42,6 +42,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) .set(ASYNC_TRACKING_ENABLED, false) + private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2) + private var time: Long = _ private var testDir: File = _ private var store: ElementTrackingStore = _ @@ -697,8 +699,16 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val rdd2b1 = RddBlock(2, 1, 5L, 6L) val level = StorageLevel.MEMORY_AND_DISK + // Submit a stage for the first RDD before it's marked for caching, to make sure later + // the listener picks up the correct storage level. + val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, StorageLevel.NONE, false, Nil) + val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage0, new Properties())) + listener.onStageCompleted(SparkListenerStageCompleted(stage0)) + assert(store.count(classOf[RDDStorageInfoWrapper]) === 0) + // Submit a stage and make sure the RDDs are recorded. - val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil) + rdd1Info.storageLevel = level val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil) val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1") listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) @@ -763,6 +773,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(part.memoryUsed === rdd1b1.memSize * 2) assert(part.diskUsed === rdd1b1.diskSize * 2) assert(part.executors === Seq(bm1.executorId, bm2.executorId)) + assert(part.storageLevel === twoReplicaMemAndDiskLevel.description) } check[ExecutorSummaryWrapper](bm2.executorId) { exec => @@ -800,9 +811,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize) } - // Remove block 1 from bm 1. + // Evict block 1 from memory in bm 1. Note that because of SPARK-29319, the disk size + // is reported as "0" here to avoid double-counting; the current behavior of the block + // manager is to provide the actual disk size of the block. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.DISK_ONLY, + rdd1b1.memSize, 0L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.numCachedPartitions === 2L) + assert(wrapper.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize) + assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 2L) + assert(exec.info.memoryUsed === rdd1b2.memSize) + assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize) + } + + // Remove block 1 from bm 1; note memSize = 0 due to the eviction above. listener.onBlockUpdated(SparkListenerBlockUpdated( - BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize))) + BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0, rdd1b1.diskSize))) check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => assert(wrapper.info.numCachedPartitions === 2L) @@ -1571,7 +1603,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get - assert(part1.storageLevel === level.description) + assert(part1.storageLevel === twoReplicaMemAndDiskLevel.description) assert(part1.memoryUsed === 2 * rdd1b1.memSize) assert(part1.diskUsed === 2 * rdd1b1.diskSize) assert(part1.executors === Seq(bm1.executorId, bm2.executorId)) diff --git a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala index 8e23de0053e0..2c11f7816fd6 100644 --- a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala +++ b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.status import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel class LiveEntitySuite extends SparkFunSuite { @@ -59,8 +60,8 @@ class LiveEntitySuite extends SparkFunSuite { } private def newPartition(i: Int): LiveRDDPartition = { - val part = new LiveRDDPartition(i.toString) - part.update(Seq(i.toString), i.toString, i, i) + val part = new LiveRDDPartition(i.toString, StorageLevel.MEMORY_AND_DISK) + part.update(Seq(i.toString), i, i) part }