Skip to content

Commit

Permalink
Adding localCheckpoint to Dataframe API
Browse files Browse the repository at this point in the history
  • Loading branch information
ferdonline committed Nov 23, 2017
1 parent b4edafa commit abe03ab
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
14 changes: 14 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@ def checkpoint(self, eager=True):
jdf = self._jdf.checkpoint(eager)
return DataFrame(jdf, self.sql_ctx)

@since(2.3)
def localCheckpoint(self, eager=True):
"""Returns a locally checkpointed version of this Dataset. Checkpointing can be used to truncate the
logical plan of this DataFrame, which is especially useful in iterative algorithms where the
plan may grow exponentially. Local checkpoints are stored in the executors using the caching subsystem
and therefore they are not reliable.
:param eager: Whether to checkpoint this DataFrame immediately
.. note:: Experimental
"""
jdf = self._jdf.localCheckpoint(eager)
return DataFrame(jdf, self.sql_ctx)

@since(2.1)
def withWatermark(self, eventTime, delayThreshold):
"""Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point
Expand Down
33 changes: 26 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,8 @@ class Dataset[T] private[sql](
def isStreaming: Boolean = logicalPlan.isStreaming

/**
* Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
* the logical plan of this Dataset, which is especially useful in iterative algorithms where the
* Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
* logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
Expand All @@ -524,22 +524,41 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(): Dataset[T] = checkpoint(eager = true)
def checkpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager)

/**
* Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to truncate
* the logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. Local checkpoints are written to executor storage and despite
* potentially faster they are unreliable and may compromise job completion.
*
* @group basic
*/
@Experimental
@InterfaceStability.Evolving
def localCheckpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager, local = true)

/**
* Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
* logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
* plan may grow exponentially.
* By default reliable checkpoints are created and saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`. If local is set to true a local checkpoint
* is performed instead. Local checkpoints are written to executor storage and despite
* potentially faster they are unreliable and may compromise job completion.
*
* @group basic
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(eager: Boolean): Dataset[T] = {
def _checkpoint(eager: Boolean, local: Boolean = false): Dataset[T] = {
val internalRdd = queryExecution.toRdd.map(_.copy())
internalRdd.checkpoint()
if (local) {
internalRdd.localCheckpoint()
} else {
internalRdd.checkpoint()
}

if (eager) {
internalRdd.count()
Expand Down

0 comments on commit abe03ab

Please sign in to comment.