Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ varargsToStrEnv <- function(...) {

getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"DISK_ONLY_3",
"MEMORY_AND_DISK",
"MEMORY_AND_DISK_2",
"MEMORY_AND_DISK_SER",
Expand All @@ -390,6 +391,7 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
Expand All @@ -415,6 +417,8 @@ storageLevelToString <- function(levelObj) {
"DISK_ONLY"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
"DISK_ONLY_2"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) {
"DISK_ONLY_3"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
"MEMORY_ONLY"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel DISK_ONLY_3 = create(true, false, false, false, 3);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
Expand All @@ -172,6 +173,7 @@ object StorageLevel {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "DISK_ONLY_3" => DISK_ONLY_3
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler

val clusterUrl = "local-cluster[2,1,1024]"
val clusterUrl = "local-cluster[3,1,1024]"

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
Expand Down Expand Up @@ -174,7 +174,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex

private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = {
sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 60000)
val data = sc.parallelize(1 to 1000, 10)
val cachedData = data.persist(storageLevel)
assert(cachedData.count === 1000)
Expand Down Expand Up @@ -206,7 +206,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
"caching on disk" -> StorageLevel.DISK_ONLY,
"caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
"caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
"caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 2" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 3" -> StorageLevel.DISK_ONLY_3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happen if there aren't 3 executors? do we have a test that needs updating?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happen if there aren't 3 executors?

The number of copies becomes 2 and this test case fail reasonably.

do we have a test that needs updating?

Yes. This test suite is updated at line 41.

"caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
"caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
).foreach { case (testName, storageLevel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
assert(transform(StorageLevel.MEMORY_ONLY_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2)
assert(transform(StorageLevel.DISK_ONLY) === StorageLevel.DISK_ONLY)
assert(transform(StorageLevel.DISK_ONLY_2) === StorageLevel.DISK_ONLY_2)
assert(transform(StorageLevel.DISK_ONLY_3) === StorageLevel.DISK_ONLY_3)
assert(transform(StorageLevel.MEMORY_AND_DISK) === StorageLevel.MEMORY_AND_DISK)
assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER)
assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.NONE)
testStorageLevel(StorageLevel.DISK_ONLY)
testStorageLevel(StorageLevel.DISK_ONLY_2)
testStorageLevel(StorageLevel.DISK_ONLY_3)
testStorageLevel(StorageLevel.MEMORY_ONLY)
testStorageLevel(StorageLevel.MEMORY_ONLY_2)
testStorageLevel(StorageLevel.MEMORY_ONLY_SER)
Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ storage levels is:

**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2`, and `DISK_ONLY_3`.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we need to update the table above as well?
it might be nice to say what happens if you specify a level > 1 but you don't have that many executors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rephrase the request.

  1. Adding both DISK_ONLY_2 and DISK_ONLY_3 to the above table.
  2. Adding a description about the corner case for MEMORY_ONLY_2, MEMORY_AND_DISK_2, DISK_ONLY_2, DISK_ONLY_3

Is there something more I can do, @tgravescs ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats it.


Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.

Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-syntax-aux-cache-cache-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ CACHE [ LAZY ] TABLE table_identifier
* `NONE`
* `DISK_ONLY`
* `DISK_ONLY_2`
* `DISK_ONLY_3`
* `MEMORY_ONLY`
* `MEMORY_ONLY_2`
* `MEMORY_ONLY_SER`
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ Management
SparkFiles.getRootDirectory
StorageLevel.DISK_ONLY
StorageLevel.DISK_ONLY_2
StorageLevel.DISK_ONLY_3
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/storagelevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __str__(self):

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
Expand Down