Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dataset API #19805

Closed

Conversation

ferdonline
Copy link
Contributor

@ferdonline ferdonline commented Nov 23, 2017

What changes were proposed in this pull request?

This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.

If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.

How was this patch tested?

Python land quick use case:

>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F

>>> def f(x):
    sleep(1)
    return x*2
   ...: 

>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))

>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms                           
Wall time: 12.2 s

>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms                            
Wall time: 10.3 s

>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms

>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms                           
Wall time: 20.3 s

@ferdonline
Copy link
Contributor Author

retest this please

@ferdonline ferdonline changed the title Adding localCheckpoint to Dataframe API [SQL] Adding localCheckpoint to Dataset API Nov 24, 2017
@ferdonline
Copy link
Contributor Author

cc @andrewor14 since I believe you know this part of spark pretty well, maybe you could help me integrating this.
Any idea why Jenkins didn't start the testing?

@felixcheung
Copy link
Member

Jenkins, test this please

@felixcheung
Copy link
Member

could you update the title to add [PYTHON]

*
* @group basic
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(eager: Boolean): Dataset[T] = {
def _checkpoint(eager: Boolean, local: Boolean = false): Dataset[T] = {
Copy link
Member

@felixcheung felixcheung Nov 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right - this is still a public API and it's not the convention here.
Add it as a private instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Regarding the naming, I'm not sure adding an underscore is a good choice. Let me know if it should be changed.

@@ -524,22 +524,41 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(): Dataset[T] = checkpoint(eager = true)
def checkpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager)
Copy link
Member

@felixcheung felixcheung Nov 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep this signature - there's already a def checkpoint(eager: Boolean = true) below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always try to avoid duplication of code, and with docs this takes ~10 lines for nothing - I believe a function with a default parameter is as readable as the function without the parameter. But please let me know if it goes against the code style. Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's going to break Java API compatibility. Default value does not work with Java.

* plan may grow exponentially. Local checkpoints are written to executor storage and despite
* potentially faster they are unreliable and may compromise job completion.
*
* @group basic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @since

@SparkQA
Copy link

SparkQA commented Nov 26, 2017

Test build #84196 has finished for PR 19805 at commit abe03ab.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @zsxwing @liancheng

@ferdonline ferdonline changed the title [SQL] Adding localCheckpoint to Dataset API [Python][SQL] Adding localCheckpoint to Dataset API Nov 27, 2017
@ferdonline ferdonline changed the title [Python][SQL] Adding localCheckpoint to Dataset API [PYTHON][SQL] Adding localCheckpoint to Dataset API Nov 27, 2017
@ferdonline
Copy link
Contributor Author

Thanks for you review. I'm working on the changes

@ferdonline
Copy link
Contributor Author

ferdonline commented Nov 28, 2017

I think I fixed everything that was suggested. Thanks for the input.
If someone could enable the tests and review would be great!

@ueshin
Copy link
Member

ueshin commented Nov 29, 2017

ok to test

@ueshin
Copy link
Member

ueshin commented Nov 29, 2017

@ferdonline Could you file a JIRA issue and add the id to the title like [SPARK-xxx][PYTHON][SQL] ...?

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments. Thanks!

@@ -518,13 +518,12 @@ class Dataset[T] private[sql](
* the logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should revert this back?

@@ -537,9 +536,55 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(eager: Boolean): Dataset[T] = {
def checkpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the default value for eager here.

*/
@Experimental
@InterfaceStability.Evolving
def localCheckpoint(eager: Boolean = true): Dataset[T] = _checkpoint(eager = eager, local = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

*/
@Experimental
@InterfaceStability.Evolving
private[sql] def _checkpoint(eager: Boolean, local: Boolean = false): Dataset[T] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed according to the first review. I also agree to be private so that "there's only one way of doing it" when using the API. But if you have a strong feeling I can surely change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we have 2 options here:

  • expose def checkpoint(eager: Boolean, local: Boolean): Dataset[T] as public, which can be used similar to localCheckpoint.
  • make def _checkpoint(eager: Boolean, local: Boolean = false): Dataset[T] private to be used only from the public APIs.

and I'm afraid the current one is not good anyway.

I'd prefer the second option but I don't have a strong feeling.
cc @felixcheung @HyukjinKwon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me to remove default param values, keeping private. Let me know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private checkpoint makes sense to me too. i think we don't need the underscore though as it's going to have private ahead and strictly it might be redundant to have the leading underscore to note it's private is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, so I remove both default values, otherwise we end up with a colliding signature.

@SparkQA
Copy link

SparkQA commented Nov 29, 2017

Test build #84282 has finished for PR 19805 at commit 59b5562.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ferdonline ferdonline changed the title [PYTHON][SQL] Adding localCheckpoint to Dataset API [SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dataset API Nov 29, 2017
@SparkQA
Copy link

SparkQA commented Nov 29, 2017

Test build #84289 has finished for PR 19805 at commit c5f1b2c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2017

Test build #84299 has finished for PR 19805 at commit 54b7f33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
@Experimental
@InterfaceStability.Evolving
private[sql] def checkpoint(eager: Boolean, local: Boolean): Dataset[T] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, can we just do this private def checkpoint instead of private[sql]? Also, I don't think it needs those @Experimental, @InterfaceStability.Evolving, @since 2.3.0 and @group basic.

@SparkQA
Copy link

SparkQA commented Nov 30, 2017

Test build #84334 has finished for PR 19805 at commit c743c34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Will review it this weekend.

@@ -537,9 +537,48 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(eager: Boolean): Dataset[T] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the test case of def checkpoint? At least we need to add a test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to create a test to localCheckpoint based on the one for checkpoint, but I'm not very familiar with Scala and the Spark scala API, so currently I don't feel at ease to create a meaningful test. Would anybody be up to add one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we already test checkpoint in DatasetSuite

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84706 has finished for PR 19805 at commit 9beb375.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84709 has finished for PR 19805 at commit da34c4a.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84720 has finished for PR 19805 at commit 7532fc4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ferdonline
Copy link
Contributor Author

Existing checkpoint tests applied to localCheckpoint as well, all working well. Please verify.
I'm getting an unrelated fail in SparkR, did anything change n the build system?

}
}
}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> } else {

@gatorsmile
Copy link
Member

retest this please

if (reliableCheckpoint) {
internalRdd.checkpoint()
} else {
internalRdd.localCheckpoint()
Copy link
Member

@gatorsmile gatorsmile Dec 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also issue a logWarning message here to indicate the checkpoint is not reliable? This call is a potential issue when users using AWS EC2 Spot instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. Thanks for the review.
From the point of view of the user being aware he's doing a local checkpoint we already force him to use localCheckpoint() (the generic checkpoint is private)
If we should warn users about the potential issues with localCheckpoint() shouldn't we do it in the RDD API, so that users are always warned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Dec 17, 2017

Test build #85017 has finished for PR 19805 at commit 7532fc4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2017

Test build #85027 has finished for PR 19805 at commit 45f4bf5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 20, 2017

Test build #85139 has finished for PR 19805 at commit 45f4bf5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master.

@asfgit asfgit closed this in 13268a5 Dec 20, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants