diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 71e0390f39a4..eee9d8455f17 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -917,9 +917,6 @@ private[spark] class AppStatusListener( // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. liveRDDs.get(block.rddId).foreach { rdd => - if (updatedStorageLevel.isDefined) { - rdd.setStorageLevel(updatedStorageLevel.get) - } val partition = rdd.partition(block.name) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6d7b34ae979f..24f364ae013e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -507,7 +507,6 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { import LiveEntityHelpers._ - var storageLevel: String = weakIntern(info.storageLevel.description) var memoryUsed = 0L var diskUsed = 0L @@ -516,8 +515,8 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity { private val distributions = new HashMap[String, LiveRDDDistribution]() - def setStorageLevel(level: String): Unit = { - this.storageLevel = weakIntern(level) + def storageLevel: String = { + weakIntern(info.storageLevel.description) } def partition(blockName: String): LiveRDDPartition = { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index b5800661efa7..986be9771bbe 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1520,6 +1520,46 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("storage description should display correct replication if storage replication is 2") { + val listener = new AppStatusListener(store, conf, true) + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "host-1", 1234) + val bm2 = BlockManagerId("2", "host-2", 2345) + + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map.empty, Map.empty))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 42L)) + } + val rddBlock = RddBlock(1, 1, 1L, 2L) + + val level = StorageLevel.MEMORY_AND_DISK_2 + // `replication` value of the replicated block will be 1. + val levelBlockReplica = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDDs are recorded. + val rdd1Info = new RDDInfo(rddBlock.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + // Block update event, where replication = 2 + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rddBlock.blockId, level, rddBlock.memSize, rddBlock.diskSize))) + // Block update event, where replication = 1 + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize, + rddBlock.diskSize))) + + check[RDDStorageInfoWrapper](rddBlock.rddId) { wrapper => + val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name) + .get + assert(partitionInfo.storageLevel === level.description) + assert(partitionInfo.memoryUsed === 2 * rddBlock.memSize) + assert(partitionInfo.diskUsed === 2 * rddBlock.diskSize) + assert(partitionInfo.executors === Seq(bm1.executorId, bm2.executorId)) + } + } + test("storage information on executor lost/down") { val listener = new AppStatusListener(store, conf, true) val maxMemory = 42L