Skip to content

Conversation

@szhem
Copy link
Contributor

@szhem szhem commented Oct 2, 2017

What changes were proposed in this pull request?

Fix for SPARK-22184 JIRA issue (and also includes the related #19373).

In case of GraphX jobs, when checkpoints are enabled, GraphX can fail with FileNotFoundException.

The failure can happen during Pregel iterations or when Pregel completes only in cases of insufficient memory when checkpointed RDDs are evicted from memory and have to be read from disk (but already removed from there).

This PR proposes to preserve all the checkpoints the last one (checkpoint) of messages and graph depends on during the iterations, and all the checkpoints of messages and graph the resulting graph depends at the end of Pregel iterations.

How was this patch tested?

Unit tests as well as manually in production jobs which previously were failing until this PR applied.

@szhem
Copy link
Contributor Author

szhem commented Oct 16, 2017

I would be happy if anyone can take a look at this PR.

@szhem
Copy link
Contributor Author

szhem commented Mar 30, 2018

Hello @dding3, @viirya, @mallman, @felixcheung,

You were reviewing graph checkpointing, introduced here #15125, and this PR changes the behaviour a little bit.

Could you please review this PR too if possible?

@szhem
Copy link
Contributor Author

szhem commented Jun 25, 2018

Just a kind remainder...

@mallman
Copy link
Contributor

mallman commented Jun 26, 2018

Hi @szhem. Thanks for the kind reminder and thanks for your contribution. I'm sorry I did not respond sooner.

I no longer work where I regularly used the checkpointing code with large graphs. And I don't have access to any similar graph to test with now. I'm somewhat hamstrung by that limitation. That being said, I'll do my best to help.

With respect to the failure you're seeing, can you tell me what happens if you set your graph's storage level to StorageLevel.MEMORY_AND_DISK or StorageLevel.MEMORY_AND_DISK_SER_2 without applying this patch?

@szhem
Copy link
Contributor Author

szhem commented Jun 27, 2018

Hi @mallman!

In case of

StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_SER_2

... tests pass.

They still fail in case of

StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_ONLY_SER

Although it works, I'm not sure that changing the caching level of the graph is really a good option to go with as Spark starts complaining here and here

18/06/27 16:08:46.802 Executor task launch worker for task 3 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.
18/06/27 16:08:47.000 Executor task launch worker for task 4 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.
18/06/27 16:08:47.164 Executor task launch worker for task 5 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.
18/06/27 16:08:48.724 Executor task launch worker for task 18 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
18/06/27 16:08:48.749 Executor task launch worker for task 18 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.
18/06/27 16:08:48.868 Executor task launch worker for task 19 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
18/06/27 16:08:48.899 Executor task launch worker for task 19 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.
18/06/27 16:08:49.008 Executor task launch worker for task 20 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
18/06/27 16:08:49.028 Executor task launch worker for task 20 WARN ShippableVertexPartitionOps: Diffing two VertexPartitions with different indexes is slow.

P.S. To emulate the lack of memory I just set the following options like here.

spark.testing.reservedMemory
spark.testing.memory

@mallman
Copy link
Contributor

mallman commented Jul 2, 2018

Hi @szhem. I dug deeper and think I understand the problem better.

To state the obvious, the periodic checkpointer deletes checkpoint files of RDDs that are potentially still accessible. In fact, that's the problem here. It deletes the checkpoint files of an RDD that's later used.

The algorithm being used to find checkpoint files that can be "safely" deleted is flawed, and this PR aims to fix that.

I have a few thoughts from this.

  1. Why does the periodic checkpointer delete checkpoint files? I absolutely understand the preciousness of cache memory and wanting to keep the cache as clean as possible, but this has nothing to do with that. We're talking about deleting files from disk storage. I'm making some assumptions, like we're using a filesystem that's not backed by RAM, but disk storage is dirt cheap these days. Why can't we just let the user delete the checkpoint files themselves?

    1.a. Can we and should we support a mode where the automatic deletion of checkpoint files is an option (with a warning of potential failures)? To maintain backwards compatibility, we set this option to true by default, but "power" users can set this value to false to do the cleanup themselves and ensure the checkpointer doesn't delete files it shouldn't.

  2. I think the JVM gives us a built-in solution for the automatic and safe deletion of checkpoint files, and the ContextCleaner does just that (and more). Can we leverage that functionality?

What do you think? @felixcheung or @viirya, can you weigh in on this, please?

@szhem
Copy link
Contributor Author

szhem commented Jul 3, 2018

@mallman

Just my two cents regarding built-in solutions:

Periodic checkpointer deletes checkpoint files not to pollute the hard drive. Although disk storage is cheap it's not free.

For example, in my case (graph with >1B vertices and about the same amount of edges) checkpoint directory with a single checkpoint took about 150-200GB.
Checkpoint interval was set to 5, and then job was able to complete in about 100 iterations.
So in case of not cleaning up unnecessary checkpoints, the checkpoint directory could grow up to 3-4TB (which is quite a lot) in my case.

@EthanRock
Copy link

Hi, I met the same problem today.

@mallman
Copy link
Contributor

mallman commented Jul 9, 2018

Hi @szhem.

Thanks for the information regarding disk use for your scenario. What do you think about my second point, using the ContextCleaner?

@szhem
Copy link
Contributor Author

szhem commented Jul 9, 2018

Hi @mallman,

I believe, that ContextCleaner currently does not delete checkpoint data it case of unexpected failures.
Also as it works at the end of the job then there is still a chance that a job processing quite a big graph with a lot of iterations can influence other running jobs by consuming a lot of disk during its run.

@mallman
Copy link
Contributor

mallman commented Jul 9, 2018

Hi @szhem.

I understand you've put a lot of work into this implementation, however I think you should try a simpler approach before we consider something more complicated. I believe an approach based on weak references and a reference queue would be a much simpler alternative. Can you give that a try?

@EthanRock
Copy link

I have tried to set graph's storage level to StorageLevel.MEMORY_AND_DISK in my case and the error still happens.

@szhem
Copy link
Contributor Author

szhem commented Sep 1, 2018

Hi @mallman ,

I believe that the solution with weak references will work and probably with ContextCleaner too, but there are some points I'd like to discuss if you don't mind

  • Let's take ContextCleaner first. In that case we should have a pretty simple solution, but the backward compatibility of PeriodicRDDCheckpointer and PeriodicGraphCheckpointer will be lost, because

    • it will be necessary to update these classes to prevent deleting checkpoint files
    • user will always have to provide spark.cleaner.referenceTracking.cleanCheckpoints property in order to clean unnecessary checkpoints.
    • the users who haven't specified spark.cleaner.referenceTracking.cleanCheckpoints previously (and I believe there will be most of them) will be affected by this new unexpected behaviour
  • In case of custom solution based on weak references

    • it will be necessary to poll a reference queue at some place and moment to remove unnecessary checkpoint files.
    • polling the reference queue in the separate thread will complicate the code
    • polling the reference queue synchronously does not guarantee deleting all the unnecessary checkpoint files.

In case of reference queue, could you please recommend the convenient place in the source code to do it?

As for me

  • setting spark.cleaner.referenceTracking.cleanCheckpoints to true by default should work
  • setting spark.cleaner.referenceTracking.cleanCheckpoints to true by default will allow us to simplify PeriodicRDDCheckpointer and PeriodicGraphCheckpointer too by deleting unnecessary code that cleans up checkpoint files
  • setting spark.cleaner.referenceTracking.cleanCheckpoints to true by default sounds reasonable and should not break the code of those users who didn't do it previously
  • but setting spark.cleaner.referenceTracking.cleanCheckpoints to true will probably break the backward compatibility (although this PR tries to preserve it)

What do you think? Will the community accept setting spark.cleaner.referenceTracking.cleanCheckpoints to trueby default?

@szhem
Copy link
Contributor Author

szhem commented Sep 3, 2018

I've tested the mentioned checkpointers with spark.cleaner.referenceTracking.cleanCheckpoints set to true and without explicit checkpoint files removal.

It seems that there are somewhere hard references remained - old checkpoint files are not deleted at all and it seems that ContextCleaner.doCleanCheckpoint is never called.

@szhem
Copy link
Contributor Author

szhem commented Sep 30, 2018

Hello @mallman, @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already about a year passed since this pull request has been opened.
I'm just wondering whether there is any chance to get any feedback for this PR (understanding that all of you have a little or probably no time having your own more important activities) and get it either rejected or merged?

@mallman
Copy link
Contributor

mallman commented Oct 29, 2018

Hi @szhem.

I'm sorry I haven't been more responsive here. I can relate to your frustration, and I do want to help you make progress on this PR and merge it in. I have indeed been busy with other responsibilities, but I can rededicate time to reviewing this PR.

Of all the approaches you've proposed so far, I like the ContextCleaner-based one the best. Personally, I'm okay with setting spark.cleaner.referenceTracking.cleanCheckpoints to true by default for the next major Spark release and documenting this change of behavior in the release notes. However, that may not be okay with the senior maintainers. As an alternative I wonder if we could instead create a new config just for graph RDD checkpoint cleaning such as spark.cleaner.referenceTracking.cleanGraphCheckpoints and set that to true by default. Then use that config value in PeriodicGraphCheckpointer instead of spark.cleaner.referenceTracking.cleanCheckpoints.

Would you be willing to open another PR with your ContextCleaner-based approach? I'm not suggesting you close this PR. We can call each PR alternative solutions for the same JIRA ticket and cross-reference each PR. If you do that then I will try to debug the problem with the retained checkpoints.

Thank you.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 15, 2020
@maropu
Copy link
Member

maropu commented Jan 15, 2020

I'll close this pr and the corresponding jira because this pr is stale. If necessary, please reopen this. Thanks!

@maropu maropu closed this Jan 15, 2020
@ral51
Copy link

ral51 commented Aug 12, 2020

I ran into same issue today. Is there a workaround? @szhem @EthanRock

@szhem
Copy link
Contributor Author

szhem commented Aug 12, 2020

@ral51 the workaround is this PR, that has been closed without being merged, unfortunately.
You can backport it into your own spark distribution.

@ral51
Copy link

ral51 commented Aug 13, 2020

@szhem Your PR looks like keeping checkpoints around so that's good. Thank you
Reading your comments above, just setting spark.cleaner.referenceTracking.cleanCheckpoints to true didn't work for me. Is that expected?

@szhem
Copy link
Contributor Author

szhem commented Aug 15, 2020

@ral51 setting spark.cleaner.referenceTracking.cleanCheckpoints to true didn't work for me too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants