-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30390][MLLIB] Avoid double caching in mllib.KMeans#runWithWeights. #27052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| if (data.getStorageLevel == StorageLevel.NONE) { | ||
| zippedData.persist(StorageLevel.MEMORY_AND_DISK) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can remove the two warnings in this method? it's not a big deal now if the source data is uncached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about caching norms if data is already cached? like this:
val handlePersistence = data.getStorageLevel == StorageLevel.NONE
val norms = ...
val zippedData = if (handlePersistence) {
data.zip(norms).map { case ((v, w), norm) =>
(new VectorWithNorm(v, norm), w)
}.persist(StorageLevel.MEMORY_AND_DISK)
} else {
norms.persist(StorageLevel.MEMORY_AND_DISK)
data.zip(norms).map { case ((v, w), norm) =>
(new VectorWithNorm(v, norm), w)
}
}
...
if (handlePersistence) {
zippedData.unpersist()
} else {
norms.unpersist()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about caching norms if data is already cached?
Won't this lead to double caching problem which we are trying to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I thought that was your point. If zippedData were expensive, I'd agree that caching the intermediate values too is worthwhile, and we do that in some places. Here it's not, and the original behavior was to always cache internally, so I guess this is less of a change. This at least skips it where it can be inexpensively computed
|
Jenkins, test this please |
|
Test build #115963 has finished for PR 27052 at commit
|
|
We can change this further, but this is an improvement and less of a change than anything else we'd do. I'll merge it. |
|
Thanks @srowen |
| zippedData.unpersist() | ||
|
|
||
| // Warn at the end of the run as well, for increased visibility. | ||
| if (data.getStorageLevel == StorageLevel.NONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I was testing spark kmeans. There should be an issue that no matter we persist the parent RDD, here the data.getStorageLevel will always be NONE due to the following operation, this will cause double caching.
def run(data: RDD[Vector]): KMeansModel = {
val instances: RDD[(Vector, Double)] = data.map {
case (point) => (point, 1.0)
}
runWithWeight(instances, None)
}
What changes were proposed in this pull request?
Check before caching zippedData (as suggested in #26483 (comment)).
Why are the changes needed?
If the
datais already cached before callingrunmethod ofKMeansthenzippedData.persist()will hurt the performance. Hence, persisting it conditionally.Does this PR introduce any user-facing change?
No
How was this patch tested?
Manually.