Skip to content

Conversation

@szhem
Copy link
Contributor

@szhem szhem commented Sep 27, 2017

What changes were proposed in this pull request?

Fix for SPARK-22150 JIRA issue.

In case of checkpointing RDDs which depend on previously checkpointed RDDs (for example in iterative algorithms) PeriodicCheckpointer removes already checkpointed materialized RDDs too early leading to FileNotFoundExceptions.

Consider the following snippet

// create a periodic checkpointer with interval of 2
val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc)

val rdd1 = createRDD(sc)
checkpointer.update(rdd1)
// on the second update rdd1 is checkpointed
checkpointer.update(rdd1)
// on action checkpointed rdd is materialized and its lineage is truncated
rdd1.count() 

// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)
checkpointer.update(rdd2)
// on the second update rdd2 is checkpointed and checkpoint files of rdd1 are deleted
checkpointer.update(rdd2)
// on action it's necessary to read already removed checkpoint files of rdd1
rdd2.count()

This PR proposes to preserve all the checkpoints the last one depends on to be able to evaluate the final RDD even if the last checkpoint (the final RDD depends on) is not yet materialized.

How was this patch tested?

Unit tests as well as manually in production jobs which previously were failing until this PR applied.

@szhem szhem changed the title [SPARK-22150][CORE] PeriodicCheckpointer fails in case of dependant RDDs [SPARK-22150][CORE] PeriodicCheckpointer fails in case of dependent RDDs Oct 3, 2017
@szhem
Copy link
Contributor Author

szhem commented Oct 16, 2017

I would be happy if anyone can take a look at this PR.

*
* TODO: Move this out of MLlib?
*/
private[spark] class PeriodicRDDCheckpointer[T](

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scaladoc for this class needs to be updated to include this new behaviour. Particularly, the 'WARNINGS' section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sujithjay, thanks a lot for noticing!
Just updated the docs a little bit to clarify the new behaviour.

@sujithjay
Copy link

sujithjay commented Mar 26, 2018

Hi @szhem , you could consider identifying contributors who have worked on the code being changed, and reach out to them for review.

@sujithjay
Copy link

cc: @felixcheung @jkbradley @mengxr
Could you please review this PR?

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is deleting earlier checkpoint after the current checkpoint is called though?

is this just an issue with DataSet.checkpoint(eager = true)?

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@checkpoint(eager:Boolean):org.apache.spark.sql.Dataset[T]

@szhem
Copy link
Contributor Author

szhem commented Mar 27, 2018

@felixcheung

It is deleting earlier checkpoint after the current checkpoint is called though?

Currently PeriodicCheckpointer can fail in case of checkpointing RDDs which depend on each other like in the sample below.

// create a periodic checkpointer with interval of 2
val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc)
val rdd1 = createRDD(sc)

// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)
checkpointer.update(rdd2)
// on the second update rdd2 is checkpointed and checkpoint files of rdd1 are deleted
checkpointer.update(rdd2)
// on action it's necessary to read already removed checkpoint files of rdd1
rdd2.count()

It's about deleting files of the already checkpointed and materialized RDD in case of another RDD depends on it.

If RDDs are cached before checkpointing (like it is often recommended) then this issue is likely to be not visible, because the checkpointed RDD will be read from cache and not from the materiazed files.

The good example of such a behaviour is described in this PR - #19410, where GraphX fails with FileNotFoundException in case of insufficient memory resources when cached blocks of checkpointed and materialized RDDs are evicted from memory, causing them to be read from already deleted files.

is this just an issue with DataSet.checkpoint(eager = true)?

This PR does not include modifications to DataSet API and affects mainly PeriodicCheckpointer and PeriodicRDDCheckpointer.
It was created as a preliminary PR to this one - #19410 (where GraphX fails in case of reading cached RDDs already evicted from memory).

@szhem
Copy link
Contributor Author

szhem commented Mar 27, 2018

BTW, how do you think guys, may be it would be better to merge changes from #19410 into this one?
The #19410 is almost about the same issue and fixes the described behaviour for GraphX.

@felixcheung
Copy link
Member

to clarify, what I mean is the issue is caused by checkpointing being lazy - so therefore if you remove the previous checkpoint before the new checkpoint is started or completed, this fails.

so the fix might be to change to call checkpoint() to checkpoint(eager: true) - this ensures by the time checkpoint call is returned the checkpointing is completed.

@szhem
Copy link
Contributor Author

szhem commented Mar 29, 2018

@felixcheung,
Unfortunately, RDDs, PeriodicRDDCheckpointer is based on, do not have checkpoint(eager: true) yet.
It's a functionality of DataSets.

I've experimented with the similar method for RDDs ...

def checkpoint(eager: Boolean): RDD[T] = {
  checkpoint()
  if (eager) {
    count()
  }
  this
}

... and it does not work for PeriodicRDDCheckpointer in some scenarios.
Please, consider the following example

val checkpointInterval = 2

val checkpointer = new PeriodicRDDCheckpointer[(Int, Int)](checkpointInterval, sc)
val rdd1 = sc.makeRDD((0 until 10).map(i => i -> i))

// rdd1 is not materialized yet, checkpointer(update=1, checkpointInterval=2)
checkpointer.update(rdd1)
// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)

// rdd1 is materialized, checkpointer(update=2, checkpointInterval=2)
checkpointer.update(rdd1)
// rdd3 depends on rdd1
val rdd3 = rdd1.filter(_ => true)

// rdd3 is not materialized yet, checkpointer(update=3, checkpointInterval=2)
checkpointer.update(rdd3)
// rdd3 is materialized, rdd1's files are removed, checkpointer(update=4, checkpointInterval=2)
checkpointer.update(rdd3)

// fails with FileNotFoundException because
// rdd1's files were removed on the previous step and
// rdd2 depends on rdd1
rdd2.count()

It fails with FileNotFoundException even in case of eager checkpointing, and passes in case of preserving parent checkpointed RDDs like it's done in this PR.

@szhem
Copy link
Contributor Author

szhem commented Apr 2, 2018

so the fix might be to change to call checkpoint() to checkpoint(eager: true) - this ensures by the time checkpoint call is returned the checkpointing is completed.

Even if checkpoint is completed, PeriodicRDDCheckpointer removes files of the checkpointed and materialized RDDs later on, so it may happen that another RDD depends on the already removed files.

@szhem
Copy link
Contributor Author

szhem commented Jun 25, 2018

Just a kind remainder...

@szhem
Copy link
Contributor Author

szhem commented Sep 30, 2018

Hello @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already more than a year passed since this pull request has been opened.
I'm just wondering whether there is any chance for this PR to be reviewed (understanding that all of you have a little or probably no time having your own more important activities) by someone and either rejected or merged.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 15, 2020
@github-actions github-actions bot closed this Jan 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants