diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 406686e6df724..c642b2a47ae7f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -367,6 +367,20 @@ def checkpoint(self, eager=True): jdf = self._jdf.checkpoint(eager) return DataFrame(jdf, self.sql_ctx) + @since(2.3) + def localCheckpoint(self, eager=True): + """Returns a locally checkpointed version of this Dataset. Checkpointing can be used to truncate the + logical plan of this DataFrame, which is especially useful in iterative algorithms where the + plan may grow exponentially. Local checkpoints are stored in the executors using the caching subsystem + and therefore they are not reliable. + + :param eager: Whether to checkpoint this DataFrame immediately + + .. note:: Experimental + """ + jdf = self._jdf.localCheckpoint(eager) + return DataFrame(jdf, self.sql_ctx) + @since(2.1) def withWatermark(self, eventTime, delayThreshold): """Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1620ab3aa2094..4fb4dbcb3fab9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -514,8 +514,8 @@ class Dataset[T] private[sql]( def isStreaming: Boolean = logicalPlan.isStreaming /** - * Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate - * the logical plan of this Dataset, which is especially useful in iterative algorithms where the + * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the + * logical plan of this Dataset, which is especially useful in iterative algorithms where the * plan may grow exponentially. It will be saved to files inside the checkpoint * directory set with `SparkContext#setCheckpointDir`. * @@ -524,22 +524,41 @@ class Dataset[T] private[sql]( */ @Experimental @InterfaceStability.Evolving - def checkpoint(): Dataset[T] = checkpoint(eager = true) + def checkpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager) + + /** + * Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to truncate + * the logical plan of this Dataset, which is especially useful in iterative algorithms where the + * plan may grow exponentially. Local checkpoints are written to executor storage and despite + * potentially faster they are unreliable and may compromise job completion. + * + * @group basic + */ + @Experimental + @InterfaceStability.Evolving + def localCheckpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager, local = true) /** * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the * logical plan of this Dataset, which is especially useful in iterative algorithms where the - * plan may grow exponentially. It will be saved to files inside the checkpoint - * directory set with `SparkContext#setCheckpointDir`. + * plan may grow exponentially. + * By default reliable checkpoints are created and saved to files inside the checkpoint + * directory set with `SparkContext#setCheckpointDir`. If local is set to true a local checkpoint + * is performed instead. Local checkpoints are written to executor storage and despite + * potentially faster they are unreliable and may compromise job completion. * * @group basic * @since 2.1.0 */ @Experimental @InterfaceStability.Evolving - def checkpoint(eager: Boolean): Dataset[T] = { + def _checkpoint(eager: Boolean, local: Boolean = false): Dataset[T] = { val internalRdd = queryExecution.toRdd.map(_.copy()) - internalRdd.checkpoint() + if (local) { + internalRdd.localCheckpoint() + } else { + internalRdd.checkpoint() + } if (eager) { internalRdd.count()