diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8e20..8000071f6558e 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -132,6 +132,12 @@ class StorageLevel private( override def hashCode(): Int = toInt * 41 + replication + /** Name of the storage level if it is predefined or `None` otherwise. */ + def name: Option[String] = { + StorageLevel.PREDEFINED + .collectFirst { case (storageLevel, name) if storageLevel == this => name } + } + def description: String = { var result = "" result += (if (useDisk) "Disk " else "") @@ -163,25 +169,31 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) + private val PREDEFINED = Seq( + NONE -> "NONE", + DISK_ONLY -> "DISK_ONLY", + DISK_ONLY_2 -> "DISK_ONLY_2", + MEMORY_ONLY -> "MEMORY_ONLY", + MEMORY_ONLY_2 -> "MEMORY_ONLY_2", + MEMORY_ONLY_SER -> "MEMORY_ONLY_SER", + MEMORY_ONLY_SER_2 -> "MEMORY_ONLY_SER_2", + MEMORY_AND_DISK -> "MEMORY_AND_DISK", + MEMORY_AND_DISK_2 -> "MEMORY_AND_DISK_2", + MEMORY_AND_DISK_SER -> "MEMORY_AND_DISK_SER", + MEMORY_AND_DISK_SER_2 -> "MEMORY_AND_DISK_SER_2", + OFF_HEAP -> "OFF_HEAP") + /** * :: DeveloperApi :: * Return the StorageLevel object with the specified name. */ @DeveloperApi - def fromString(s: String): StorageLevel = s match { - case "NONE" => NONE - case "DISK_ONLY" => DISK_ONLY - case "DISK_ONLY_2" => DISK_ONLY_2 - case "MEMORY_ONLY" => MEMORY_ONLY - case "MEMORY_ONLY_2" => MEMORY_ONLY_2 - case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER - case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2 - case "MEMORY_AND_DISK" => MEMORY_AND_DISK - case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2 - case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER - case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 - case "OFF_HEAP" => OFF_HEAP - case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s") + def fromString(s: String): StorageLevel = { + PREDEFINED.collectFirst { + case (storageLevel, name) if name == s => storageLevel + }.getOrElse { + throw new IllegalArgumentException(s"Invalid StorageLevel: $s") + } } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5e60218c5740b..240b178d519e5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -445,10 +445,14 @@ private[spark] object JsonProtocol { } def storageLevelToJson(storageLevel: StorageLevel): JValue = { - ("Use Disk" -> storageLevel.useDisk) ~ - ("Use Memory" -> storageLevel.useMemory) ~ - ("Deserialized" -> storageLevel.deserialized) ~ - ("Replication" -> storageLevel.replication) + storageLevel.name match { + case Some(name) => name + case None => + ("Use Disk" -> storageLevel.useDisk) ~ + ("Use Memory" -> storageLevel.useMemory) ~ + ("Deserialized" -> storageLevel.deserialized) ~ + ("Replication" -> storageLevel.replication) + } } def blockStatusToJson(blockStatus: BlockStatus): JValue = { @@ -988,12 +992,20 @@ private[spark] object JsonProtocol { rddInfo } - def storageLevelFromJson(json: JValue): StorageLevel = { - val useDisk = (json \ "Use Disk").extract[Boolean] - val useMemory = (json \ "Use Memory").extract[Boolean] - val deserialized = (json \ "Deserialized").extract[Boolean] - val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + def storageLevelFromJson(json: JValue): StorageLevel = json match { + case _: JString => + // One of the predefined storage levels, e.g. "DISK_ONLY". + StorageLevel.fromString(json.extract[String]) + case _: JObject => + // Generic case for compatibility with older event logs and for + // user-defined storage levels. + val useDisk = (json \ "Use Disk").extract[Boolean] + val useMemory = (json \ "Use Memory").extract[Boolean] + val deserialized = (json \ "Deserialized").extract[Boolean] + val replication = (json \ "Replication").extract[Int] + StorageLevel(useDisk, useMemory, deserialized, replication) + case _ => + throw new IllegalArgumentException(s"unexpected json value $json for storage level") } def blockStatusFromJson(json: JValue): BlockStatus = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 4abbb8e7894f5..49ded37797c8d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.Map -import org.json4s.JsonAST.{JArray, JInt, JString, JValue} +import org.json4s.JsonAST.{JArray, JInt, JObject, JString, JValue} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.scalatest.Assertions @@ -148,6 +148,7 @@ class JsonProtocolSuite extends SparkFunSuite { testStorageLevel(StorageLevel.MEMORY_AND_DISK_2) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2) + testStorageLevel(StorageLevel(true, false, false, false, 3)) // JobResult val exception = new Exception("Out of Memory! Please restock film.") @@ -436,6 +437,19 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + test("StorageLevel backward compatibility") { + testStorageLevelLegacyFormat(StorageLevel.NONE) + testStorageLevelLegacyFormat(StorageLevel.DISK_ONLY) + testStorageLevelLegacyFormat(StorageLevel.DISK_ONLY_2) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_2) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_SER) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_ONLY_SER_2) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_2) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_SER) + testStorageLevelLegacyFormat(StorageLevel.MEMORY_AND_DISK_SER_2) + } } @@ -473,6 +487,16 @@ private[spark] object JsonProtocolSuite extends Assertions { assertEquals(level, newLevel) } + private def testStorageLevelLegacyFormat(level: StorageLevel) { + val levelJson = JObject( + "Use Disk" -> level.useDisk, + "Use Memory" -> level.useMemory, + "Deserialized" -> level.deserialized, + "Replication" -> level.replication) + val newLevel = JsonProtocol.storageLevelFromJson(levelJson) + assertEquals(level, newLevel) + } + private def testTaskMetrics(metrics: TaskMetrics) { val newMetrics = JsonProtocol.taskMetricsFromJson(JsonProtocol.taskMetricsToJson(metrics)) assertEquals(metrics, newMetrics) @@ -936,12 +960,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "101", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 201, | "Number of Cached Partitions": 301, | "Memory Size": 401, @@ -1154,12 +1173,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | { | "Block ID": "rdd_0_0", | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": false, - | "Replication": 2 - | }, + | "Storage Level": "MEMORY_AND_DISK_SER_2", | "Memory Size": 0, | "Disk Size": 0 | } @@ -1255,12 +1269,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | { | "Block ID": "rdd_0_0", | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": false, - | "Replication": 2 - | }, + | "Storage Level": "MEMORY_AND_DISK_SER_2", | "Memory Size": 0, | "Disk Size": 0 | } @@ -1356,12 +1365,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | { | "Block ID": "rdd_0_0", | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": false, - | "Replication": 2 - | }, + | "Storage Level": "MEMORY_AND_DISK_SER_2", | "Memory Size": 0, | "Disk Size": 0 | } @@ -1389,12 +1393,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "1", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 200, | "Number of Cached Partitions": 300, | "Memory Size": 400, @@ -1433,12 +1432,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "2", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 400, | "Number of Cached Partitions": 600, | "Memory Size": 800, @@ -1449,12 +1443,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "3", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 401, | "Number of Cached Partitions": 601, | "Memory Size": 801, @@ -1493,12 +1482,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "3", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 600, | "Number of Cached Partitions": 900, | "Memory Size": 1200, @@ -1509,12 +1493,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "4", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 601, | "Number of Cached Partitions": 901, | "Memory Size": 1201, @@ -1525,12 +1504,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "5", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 602, | "Number of Cached Partitions": 902, | "Memory Size": 1202, @@ -1569,12 +1543,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "4", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 800, | "Number of Cached Partitions": 1200, | "Memory Size": 1600, @@ -1585,12 +1554,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "5", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 801, | "Number of Cached Partitions": 1201, | "Memory Size": 1601, @@ -1601,12 +1565,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "6", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 802, | "Number of Cached Partitions": 1202, | "Memory Size": 1602, @@ -1617,12 +1576,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "mayor", | "Callsite": "7", | "Parent IDs": [1, 4, 7], - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_AND_DISK", | "Number of Partitions": 803, | "Number of Cached Partitions": 1203, | "Memory Size": 1603, @@ -1886,12 +1840,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | { | "Block ID": "rdd_0_0", | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Deserialized": false, - | "Replication": 2 - | }, + | "Storage Level": "MEMORY_AND_DISK_SER_2", | "Memory Size": 0, | "Disk Size": 0 | } @@ -2022,12 +1971,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Port": 300 | }, | "Block ID": "rdd_0_0", - | "Storage Level": { - | "Use Disk": false, - | "Use Memory": true, - | "Deserialized": true, - | "Replication": 1 - | }, + | "Storage Level": "MEMORY_ONLY", | "Memory Size": 100, | "Disk Size": 0 | }