Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ class StorageLevel private(

override def hashCode(): Int = toInt * 41 + replication

/** Name of the storage level if it is predefined or `None` otherwise. */
def name: Option[String] = {
StorageLevel.PREDEFINED
.collectFirst { case (storageLevel, name) if storageLevel == this => name }
}

def description: String = {
var result = ""
result += (if (useDisk) "Disk " else "")
Expand Down Expand Up @@ -163,25 +169,31 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

private val PREDEFINED = Seq(
NONE -> "NONE",
DISK_ONLY -> "DISK_ONLY",
DISK_ONLY_2 -> "DISK_ONLY_2",
MEMORY_ONLY -> "MEMORY_ONLY",
MEMORY_ONLY_2 -> "MEMORY_ONLY_2",
MEMORY_ONLY_SER -> "MEMORY_ONLY_SER",
MEMORY_ONLY_SER_2 -> "MEMORY_ONLY_SER_2",
MEMORY_AND_DISK -> "MEMORY_AND_DISK",
MEMORY_AND_DISK_2 -> "MEMORY_AND_DISK_2",
MEMORY_AND_DISK_SER -> "MEMORY_AND_DISK_SER",
MEMORY_AND_DISK_SER_2 -> "MEMORY_AND_DISK_SER_2",
OFF_HEAP -> "OFF_HEAP")

/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
def fromString(s: String): StorageLevel = {
PREDEFINED.collectFirst {
case (storageLevel, name) if name == s => storageLevel
}.getOrElse {
throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
}

/**
Expand Down
32 changes: 22 additions & 10 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,14 @@ private[spark] object JsonProtocol {
}

def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
storageLevel.name match {
case Some(name) => name
case None =>
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
}

def blockStatusToJson(blockStatus: BlockStatus): JValue = {
Expand Down Expand Up @@ -988,12 +992,20 @@ private[spark] object JsonProtocol {
rddInfo
}

def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
StorageLevel(useDisk, useMemory, deserialized, replication)
def storageLevelFromJson(json: JValue): StorageLevel = json match {
case _: JString =>
// One of the predefined storage levels, e.g. "DISK_ONLY".
StorageLevel.fromString(json.extract[String])
case _: JObject =>
// Generic case for compatibility with older event logs and for
// user-defined storage levels.
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
StorageLevel(useDisk, useMemory, deserialized, replication)
case _ =>
throw new IllegalArgumentException(s"unexpected json value $json for storage level")
}

def blockStatusFromJson(json: JValue): BlockStatus = {
Expand Down
138 changes: 41 additions & 97 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.Map

import org.json4s.JsonAST.{JArray, JInt, JString, JValue}
import org.json4s.JsonAST.{JArray, JInt, JObject, JString, JValue}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.scalatest.Assertions
Expand Down Expand Up @@ -148,6 +148,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.MEMORY_AND_DISK_2)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
testStorageLevel(StorageLevel(true, false, false, false, 3))

// JobResult
val exception = new Exception("Out of Memory! Please restock film.")
Expand Down Expand Up @@ -436,6 +437,19 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

test("StorageLevel backward compatibility") {
testStorageLevelLegacyFormat(StorageLevel.NONE)
testStorageLevelLegacyFormat(StorageLevel.DISK_ONLY)
testStorageLevelLegacyFormat(StorageLevel.DISK_ONLY_2)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_2)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_SER)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_SER_2)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_2)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_SER)
testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_SER_2)
}
}


Expand Down Expand Up @@ -473,6 +487,16 @@ private[spark] object JsonProtocolSuite extends Assertions {
assertEquals(level, newLevel)
}

private def testStorageLevelLegacyFormat(level: StorageLevel) {
val levelJson = JObject(
"Use Disk" -> level.useDisk,
"Use Memory" -> level.useMemory,
"Deserialized" -> level.deserialized,
"Replication" -> level.replication)
val newLevel = JsonProtocol.storageLevelFromJson(levelJson)
assertEquals(level, newLevel)
}

private def testTaskMetrics(metrics: TaskMetrics) {
val newMetrics = JsonProtocol.taskMetricsFromJson(JsonProtocol.taskMetricsToJson(metrics))
assertEquals(metrics, newMetrics)
Expand Down Expand Up @@ -936,12 +960,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "101",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
Expand Down Expand Up @@ -1154,12 +1173,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| {
| "Block ID": "rdd_0_0",
| "Status": {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": false,
| "Replication": 2
| },
| "Storage Level": "MEMORY_AND_DISK_SER_2",
| "Memory Size": 0,
| "Disk Size": 0
| }
Expand Down Expand Up @@ -1255,12 +1269,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| {
| "Block ID": "rdd_0_0",
| "Status": {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": false,
| "Replication": 2
| },
| "Storage Level": "MEMORY_AND_DISK_SER_2",
| "Memory Size": 0,
| "Disk Size": 0
| }
Expand Down Expand Up @@ -1356,12 +1365,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| {
| "Block ID": "rdd_0_0",
| "Status": {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": false,
| "Replication": 2
| },
| "Storage Level": "MEMORY_AND_DISK_SER_2",
| "Memory Size": 0,
| "Disk Size": 0
| }
Expand Down Expand Up @@ -1389,12 +1393,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "1",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
Expand Down Expand Up @@ -1433,12 +1432,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "2",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
Expand All @@ -1449,12 +1443,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "3",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
Expand Down Expand Up @@ -1493,12 +1482,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "3",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
Expand All @@ -1509,12 +1493,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "4",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
Expand All @@ -1525,12 +1504,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "5",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
Expand Down Expand Up @@ -1569,12 +1543,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "4",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
Expand All @@ -1585,12 +1554,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "5",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
Expand All @@ -1601,12 +1565,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "6",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
Expand All @@ -1617,12 +1576,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "mayor",
| "Callsite": "7",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_AND_DISK",
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
Expand Down Expand Up @@ -1886,12 +1840,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| {
| "Block ID": "rdd_0_0",
| "Status": {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Deserialized": false,
| "Replication": 2
| },
| "Storage Level": "MEMORY_AND_DISK_SER_2",
| "Memory Size": 0,
| "Disk Size": 0
| }
Expand Down Expand Up @@ -2022,12 +1971,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Port": 300
| },
| "Block ID": "rdd_0_0",
| "Storage Level": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll also want to retain some tests of the old format to ensure it's still read. Maybe there are outside of the diff and I'm not seeing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a to/from JSON test for a custom StorageLevel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I mean, doesn't this no longer test whether it can read the verbose, old style format? like this test does here and the ones above, that are being removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These strings are used with testEvent which assert that the serialized representation matches the one given in the string literal. See https://github.com/criteo-forks/spark/blob/7869e63a569a6fb6725996084f0c5c55fc130ac8/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala#L457

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share sean's concern, I don't think your response is addressing it. You added one test on L151 makes sure that some event which is not predefined still works. But you don't have a test making sure that the old, verbose string can still be parsed (or is it somewhere else?)

probably this is indirectly covered by HistoryServerSuite but a more direrct test would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test ensuring all predefine storage levels can be read from the legacy format.

Sidenote: I've also noticed that the legacy format incorrectly handled the predefined StorageLevel.OFF_HEAP and an fact any other custom storage level with useOffHeap = true. It looks like a bug to me, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I completely agree that off heap is not respected in the json format. can you file a bug? I think its still relevant even after this goes in, for custom levels

| "Use Disk": false,
| "Use Memory": true,
| "Deserialized": true,
| "Replication": 1
| },
| "Storage Level": "MEMORY_ONLY",
| "Memory Size": 100,
| "Disk Size": 0
| }
Expand Down