Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,6 @@ private[spark] class AppStatusListener(
// Update the block entry in the RDD info, keeping track of the deltas above so that we
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>
if (updatedStorageLevel.isDefined) {
rdd.setStorageLevel(updatedStorageLevel.get)
}

val partition = rdd.partition(block.name)

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {

import LiveEntityHelpers._

var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
var diskUsed = 0L

Expand All @@ -516,8 +515,8 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {

private val distributions = new HashMap[String, LiveRDDDistribution]()

def setStorageLevel(level: String): Unit = {
this.storageLevel = weakIntern(level)
def storageLevel: String = {
weakIntern(info.storageLevel.description)
}

def partition(blockName: String): LiveRDDPartition = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,46 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("storage description should display correct replication if storage replication is 2") {
val listener = new AppStatusListener(store, conf, true)
// Register a couple of block managers.
val bm1 = BlockManagerId("1", "host-1", 1234)
val bm2 = BlockManagerId("2", "host-2", 2345)

Seq(bm1, bm2).foreach { bm =>
listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 42L))
}
val rddBlock = RddBlock(1, 1, 1L, 2L)

val level = StorageLevel.MEMORY_AND_DISK_2
// `replication` value of the replicated block will be 1.
val levelBlockReplica = StorageLevel.MEMORY_AND_DISK

// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rddBlock.rddId, "rdd1", 2, level, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

// Block update event, where replication = 2
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rddBlock.blockId, level, rddBlock.memSize, rddBlock.diskSize)))
// Block update event, where replication = 1
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize,
rddBlock.diskSize)))

check[RDDStorageInfoWrapper](rddBlock.rddId) { wrapper =>
val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name)
.get
assert(partitionInfo.storageLevel === level.description)
assert(partitionInfo.memoryUsed === 2 * rddBlock.memSize)
assert(partitionInfo.diskUsed === 2 * rddBlock.diskSize)
assert(partitionInfo.executors === Seq(bm1.executorId, bm2.executorId))
}
}

test("storage information on executor lost/down") {
val listener = new AppStatusListener(store, conf, true)
val maxMemory = 42L
Expand Down