Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,24 +376,48 @@ def foreachPartition(self, f):

@since(1.3)
def cache(self):
""" Persists with the default storage level (C{MEMORY_ONLY}).
"""Persists the :class:`DataFrame` with the default storage level (C{MEMORY_AND_DISK}).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I updated the default in the doc, as it was actually incorrect previously.


.. note:: the default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0.
"""
self.is_cached = True
self._jdf.cache()
return self

@since(1.3)
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""Sets the storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY}).
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK):
Copy link
Contributor Author

@MLnick MLnick Jun 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I updated the default here in persist, to match cache.

But actually, it's still not quite correct - the default storage levels for Python are all serialized. But the MEMORY-based ones don't match the Scala side (which are deserialized). This was done for RDDs but doesn't quite work for DataFrames (since DF on the Scala side is cached deserialized by default).

So here df.cache() results in MEMORY_AND_DISK (deser) while df.persist() results in MEMORY_AND_DISK (ser). Ideally I'd say we don't want to encourage users to accidentally use the serialized forms for memory-based DF caching (since it is less efficient, as I understand it?). Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to set the default storage level here to None instead, and if it's not set call _jfd.persist() to ensure behaviour is the same as cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @rxin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @rxin on this comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside of that approach is the user can't easily explicitly cache in-memory only deserialized or even cache on two machines deserialized easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is true. The issue is that the deser versions were deprecated in #10092 and made to equal the ser versions, and so can't actually be specified in Python any more. Hence we have a discrepancy now between Python RDDs (always stored ser) and DataFrames (stored deser in tungsten/spark sql binary format by default, but it is possible to store ser though AFAIK that will always be non-optimal so should certainly be discouraged).

cc @gatorsmile @davies @rxin

"""Sets the storage level to persist the contents of the :class:`DataFrame` across
operations after the first time it is computed. This can only be used to assign
a new storage level if the :class:`DataFrame` does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_AND_DISK}).

.. note:: the default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0.
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
self._jdf.persist(javaStorageLevel)
return self

@property
@since(2.0)
def storageLevel(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mark this as a property so we don't need to use parenthesis?

"""Get the :class:`DataFrame`'s current storage level.

>>> df.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df.cache().storageLevel
StorageLevel(True, True, False, True, 1)
>>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel
StorageLevel(True, False, False, False, 2)
"""
java_storage_level = self._jdf.storageLevel()
storage_level = StorageLevel(java_storage_level.useDisk(),
java_storage_level.useMemory(),
java_storage_level.useOffHeap(),
java_storage_level.deserialized(),
java_storage_level.replication())
return storage_level

@since(1.3)
def unpersist(self, blocking=False):
"""Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,18 @@ class Dataset[T] private[sql](
this
}

/**
* Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.
*
* @group basic
* @since 2.0.0
*/
def storageLevel: StorageLevel = {
sparkSession.sharedState.cacheManager.lookupCachedData(this).map { cachedData =>
cachedData.cachedRepresentation.storageLevel
}.getOrElse(StorageLevel.NONE)
}

/**
* Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,32 @@ import scala.language.postfixOps

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.storage.StorageLevel


class DatasetCacheSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("get storage level") {
val ds1 = Seq("1", "2").toDS().as("a")
val ds2 = Seq(2, 3).toDS().as("b")

// default storage level
ds1.persist()
ds2.cache()
assert(ds1.storageLevel == StorageLevel.MEMORY_AND_DISK)
assert(ds2.storageLevel == StorageLevel.MEMORY_AND_DISK)
// unpersist
ds1.unpersist()
assert(ds1.storageLevel == StorageLevel.NONE)
// non-default storage level
ds1.persist(StorageLevel.MEMORY_ONLY_2)
Copy link
Member

@gatorsmile gatorsmile Jun 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When writing black-box testing, I might just try all the levels in the test case. Even we can include some customized StorageLevel, which is different from the defined one.

    import org.apache.spark.storage.StorageLevel._
    Seq(NONE, DISK_ONLY, DISK_ONLY_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER,
      MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER,
      MEMORY_AND_DISK_SER_2, OFF_HEAP).foreach { level =>
      ds1.persist(level)
      assert(ds1.storageLevel() == level)
      ds1.unpersist()
      assert(ds1.storageLevel() == StorageLevel.NONE)
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kinda neutral on this - it doesn't really seem necessary to me, since pretty much by definition if one storage level works then they all do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew. : ) That is white box testing. Normally, writing test cases should not be done by the same person who wrote the code.

assert(ds1.storageLevel == StorageLevel.MEMORY_ONLY_2)
// joined Dataset should not be persisted
val joined = ds1.joinWith(ds2, $"a.value" === $"b.value")
assert(joined.storageLevel == StorageLevel.NONE)
}

test("persist and unpersist") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS().select(expr("_2 + 1").as[Int])
val cached = ds.cache()
Expand All @@ -39,8 +60,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
2, 3, 4)
// Drop the cache.
cached.unpersist()
assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty,
"The Dataset should not be cached.")
assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.")
}

test("persist and then rebind right encoder when join 2 datasets") {
Expand All @@ -57,11 +77,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(joined, 2)

ds1.unpersist()
assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty,
"The Dataset ds1 should not be cached.")
assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.")
ds2.unpersist()
assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty,
"The Dataset ds2 should not be cached.")
assert(ds2.storageLevel == StorageLevel.NONE, "The Dataset ds2 should not be cached.")
}

test("persist and then groupBy columns asKey, map") {
Expand All @@ -76,10 +94,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(agged.filter(_._1 == "b"))

ds.unpersist()
assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty,
"The Dataset ds should not be cached.")
assert(ds.storageLevel == StorageLevel.NONE, "The Dataset ds should not be cached.")
agged.unpersist()
assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty,
"The Dataset agged should not be cached.")
assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.")
}
}