Skip to content

Conversation

@WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jul 24, 2016

What changes were proposed in this pull request?

update unused broadcast in KMeans/Word2Vec,
use destroy(false) to release memory in time.

and several place destroy() update to destroy(false) so that it will be async-called,
it will better than blocking called.

and update bcNewCenters in KMeans to make it destroy in correct time.
I use a list to store all historical bcNewCenters generated in each loop iteration and delay them to release at the end of loop.

fix TODO in BisectingKMeans.run "unpersist old indices",
Implements the pattern "persist current step RDD, and unpersist previous one" in the loop iteration.

How was this patch tested?

Existing tests.

@WeichenXu123 WeichenXu123 force-pushed the broadvar_unpersist_to_destroy branch from 52afc03 to c40f7f8 Compare July 24, 2016 13:31
@SparkQA
Copy link

SparkQA commented Jul 24, 2016

Test build #62768 has finished for PR 14333 at commit 52afc03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2016

Test build #62769 has finished for PR 14333 at commit c40f7f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jul 24, 2016

How about the same for bcNewCenters in KMeans?

Yeah, it seems like it's pretty rare to want to call unpersist here. The only context where it seems valid are the two left in Word2Vec like bcSyn0Global where the driver's state needs to be rebroadcast on each loop. But even then you could make a new broadcast for the same variable and destroy it in the loop.

I suppose it saves the overhead of new bookkeeping for a Broadcast. But unless I miss something it's not really worth the separate API. I wouldn't go so far as deprecating unpersist but doesn't look like something that would be added today if it weren't there.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 25, 2016

@srowen
I think the bcNewCenters in KMeans can't be destroyed directly in each loop but should be delayed until the whole loop finished. I update the code about it, in which I using a list to store bcNewCenters generated in each loop and release them when loop finished. The reason is described in the following, in my opinion.

The second problem, what's the meaning of broadcast.unpersist, eh, I think, there is another senario, suppose there is a RDD lineage, when executing in normal case, it executed successfully, and in code we can unpersist useless broadcast var in time, but, if some exception happened, the spark can recovery from it, it need to recovery the broken RDD from the RDD lineage and in such case may re-use the broadcast var we had unpersisted. If we simply destroy it, the broadcast var cannot be recover
so that the recovery will fail.
for example:
val midRdd = rdd1.transform1().transform2().cache() //suppose the transform here will use broadvar1
broadvar1.unpersist() //here can unpersist broadvar1 to save memory
var result = midRDD.transform3().action1()
broadvar1.destroy() //here it can safely destroy

So that I think the safe place to use broadcast.destroy is the place where some action to RDD has successfully executed, and the whole RDD lineage is no longer needed.

@SparkQA
Copy link

SparkQA commented Jul 25, 2016

Test build #62778 has finished for PR 14333 at commit f129a2b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123 WeichenXu123 force-pushed the broadvar_unpersist_to_destroy branch from f129a2b to ecd15b2 Compare July 25, 2016 02:30
@WeichenXu123 WeichenXu123 changed the title [SPARK-16696][ML][MLLib] unused broadcast variables do destroy call to release memory in time [SPARK-16696][ML][MLLib] fix KMeans bcNewCenters unpersist in wrong time and update unused broadcast variables do destroy call to release memory in time Jul 25, 2016
@SparkQA
Copy link

SparkQA commented Jul 25, 2016

Test build #62779 has finished for PR 14333 at commit ecd15b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123 WeichenXu123 force-pushed the broadvar_unpersist_to_destroy branch from ecd15b2 to 01f4d3a Compare July 25, 2016 05:07
@WeichenXu123 WeichenXu123 changed the title [SPARK-16696][ML][MLLib] fix KMeans bcNewCenters unpersist in wrong time and update unused broadcast variables do destroy call to release memory in time [SPARK-16696][ML][MLLib] destroy KMeans bcNewCenters when loop finished and update unused broadcast variables do destroy call to release memory in time Jul 25, 2016
@srowen
Copy link
Member

srowen commented Jul 25, 2016

I don't understand the last change. As far as I can see it can be destroyed inside the loop iteration. It's also possible to reuse the broadcast (declare outside the loop), and unpersist each iteration, and destroy afterwards. But I don't see the need to hold on to all these broadcasts?

Yes you make a fair point about recovery. I think your changes are safe in this respect, good.

@SparkQA
Copy link

SparkQA commented Jul 25, 2016

Test build #62793 has finished for PR 14333 at commit 01f4d3a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

@srowen
I check the code about KMean bcNewCenters again, if we want to make sure the recovery of RDD will successful in any unexcepted case, we have to keep all the bcNewCenters generated in each loop, until the loop is done.
because each loop the costs:RDD is build using preCosts:RDD, so that it became a RDD link. and the loop will and only will keep latest two RDDs being persisted. if the last two RDDs is broken, spark will need to rebuild them from the first RDD, in such case, each historical bcNewCenters generated will be used.

@srowen
Copy link
Member

srowen commented Jul 25, 2016

Oh, it is indeed building up a lineage. I think it's easier to leave this broadcast as-is then unless we know that destroying them is essential for reclaiming driver resources.

Here's another issue though: we now have a lineage where all of the RDDs are persisted (the cost RDDs), but I think only the last one is unpersisted. Ideally each would be persisted, materialized, and then unpersist the previous one. We had this problem and solution pattern in Word2Vec. Does that make sense?

It might be worth tackling here, because, if that's implemented, then I think the broadcast can safely be disposed at each iteration too, rather than only at the end.

@WeichenXu123
Copy link
Contributor Author

@srowen The KMeans.initKMeansParallel already implements the pattern "persist current step RDD, and unpersist previous one", but I think an RDD persisted can also break down because of disk error or something? If we want to reach the goal "the broadcast can safely be disposed(broadcast.destroy) at each iteration too, rather than only at the end", I think it need to use RDD.checkpoint instead of RDD.persist ?

@srowen
Copy link
Member

srowen commented Jul 25, 2016

Yeah, I think you're right, because the unpersisted RDD can still be recomputed but not a destroyed Broadcast. Hm, then isn't this also true of bcSyn0Global?

I suppose I think we should prefer to keep it simple and correct first, and only introduce complexity to optimize while preserving correctness. If some of the current unpersist calls can't be safely changed to destroy, maybe best to leave them rather than find a way to destroy them, if we don't know that it's a problem.

I think we still have an RDD-related problem here in that the intermediate RDDs aren't unpersisted, and all of them remain persisted after the loop. Kind of a separate issue, I suppose.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 25, 2016

@srowen yeah, but the bcSyn0Global in Word2Vec is a difference case, it looks safe there to destroy,
because in each loop iteration, the RDD transform which use bcSyn0Global ends with a collect,
after the collect action, we no longer need the RDD(the RDD's all computation has done, no more possible recovery) so we also can destroy the bcSyn0Global directly in each loop iteration.

@srowen
Copy link
Member

srowen commented Jul 25, 2016

OK sounds good. So maybe we're back to this: for bcNewCenters, is it really worth the overhead to track and destroy them? or just settle for unpersisting within the loop? I could go either way, your call.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 25, 2016

@srowen
The sparkContext, by default, will running a cleaner to release unused RDD/broadcasts on background. But, I think, we'd better to release them by ourselves because the SparkContext auto-cleaner depends on java-gc. If gc not triggered the cleaner won't release the unused broadcasts.
As we can see, if a broadcast has been unpersisted but not destroyed, its metadata will be keep in the sparkContext and the serialized data of the var broadcasted is kept in driver-side, here in KMeans bcNewCenters is a two-dimension array so I think it is better to release them as soon as possible.
About the overhead, I think to track these historical bcNewCenters broadcasts only need a reference list and it can destroyed in async way so the overhead is acceptable.

@srowen
Copy link
Member

srowen commented Jul 26, 2016

Yes, I suppose the issue is consistency ... there are loads of places where RDDs and broadcasts aren't really cleaned up properly in the code. Maybe it's fine to take extra steps here to at least ensure that all broadcasts we know of are handled correctly. OK I think it's pretty good if you're OK with the change as is.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 27, 2016

@srowen
I check RDD.persist referenced place:
AFTSuvivalRegression, LinearRegression, LogisticRegression, will persist input training RDD and unpersist them when train return, seems OK.
recommend.ALS persist many RDDs and seems unpersist them all OK.
mllib BisectingKMeans.run contains a TODO "unpersist old indices",
it also need the pattern "persist current step RDD, and unpersist previous one", which is similar to the one in KMeans.initKMeansParallel. Now I update the code here.
Others seems OK.

Broadcast.persist referenced place already checked in this PR I think they are all properly handled here.

@SparkQA
Copy link

SparkQA commented Jul 27, 2016

Test build #62907 has finished for PR 14333 at commit dc17da8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123 WeichenXu123 changed the title [SPARK-16696][ML][MLLib] destroy KMeans bcNewCenters when loop finished and update unused broadcast variables do destroy call to release memory in time [SPARK-16696][ML][MLLib] destroy KMeans bcNewCenters when loop finished and update code where should release unused broadcast/RDD in proper time Jul 27, 2016
@SparkQA
Copy link

SparkQA commented Jul 27, 2016

Test build #62908 has finished for PR 14333 at commit 7f042a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val indices = updateAssignments(assignments, divisibleIndices, newClusterCenters).keys
if (preIndices != null) preIndices.unpersist()
preIndices = indices
indices = updateAssignments(assignments, divisibleIndices, newClusterCenters).keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are probably ahead of me on this, but let me check something. To compute assignments in the next line, the current indices is needed, and that in turn needs the current value of assignments, which needs the previous copy of indices (preIndices). But that was unpersisted just above. Should preIndices be unpersisted later, after indices is materialized?

But ... is there even an action here? if this just creating a large lineage then the persisting isn't helping much.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 27, 2016

@srowen yeah, the code logic here seems confusing, but I think it is right.
Now I can explain it in a clear way:
in essence, the logic can be expressed as following:
A0->I1->A1->I2->A2->...
A0 is the initial assignments, I1 is step-1 indices, A1 is step-1 assignment, I2 is step-2 indices, and so on.
There is dependency between them as the arrows show.
Now the key point is that when we compute I(K), we must make sure I(K-1) is persisted, and I(K-2) and older ones can be unpersisted.
NOW, check my code logic, in fact, in each iteration, I do the following thing:

  1. unpersist I(K-1)
  2. compute I(K+1) using A(K), and because of dependency, A(K) must use I(K), And I(K) is STILL PERSISTED.
  3. compute A(K+1) using I(K+1)

@srowen
Copy link
Member

srowen commented Jul 29, 2016

If the last problem is really pretty related to this code, then it should change here as well. However if you're not sure there's an easy fix, we can leave it for later. Are you comfortable that the current change is ready?

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 30, 2016

I check other modifications in this PR again and it seems no problems.

So I think the PR is OK now. Thanks!

@srowen
Copy link
Member

srowen commented Jul 30, 2016

Merged to master

@asfgit asfgit closed this in bce354c Jul 30, 2016
@WeichenXu123 WeichenXu123 deleted the broadvar_unpersist_to_destroy branch July 31, 2016 05:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants