Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.mllib.evaluation.binary._
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.storage.StorageLevel

/**
* Evaluator for binary classification.
Expand Down Expand Up @@ -165,13 +166,17 @@ class BinaryClassificationMetrics @Since("3.0.0") (
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count weighted positives and
// negatives within each bin, and then sort by score values in descending order.
val counts = scoreLabelsWeight.combineByKey(
val binnedWeights = scoreLabelsWeight.combineByKey(
createCombiner = (labelAndWeight: (Double, Double)) =>
new BinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2),
mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) =>
c += (labelAndWeight._1, labelAndWeight._2),
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortByKey(ascending = false)
)
if (scoreLabelsWeight.getStorageLevel != StorageLevel.NONE) {
binnedWeights.persist()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still isn't unpersisted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean it should be unpersisted after use?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes otherwise the caller has no way to unpersist it until it's GCed

}
val counts = binnedWeights.sortByKey(ascending = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, hm, I don't understand this. You persist binnedWeights, but it is now only used once. Why? If anything it's binnedCounts that needs persisting. I'm still not clear if it makes enough difference to matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binnedCounts is a child RDD of binnedWeights. And here one action sortByKey is performed on binnedWeights.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but, why bother persisting binnedWeights? you recompute everything in between it and binnedCounts twice, when I think that would be the point, to avoid that.

Copy link
Contributor Author

@amanomer amanomer Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think binnedWeights is required to be persisted because more than one action is getting applied here.

binnedWeights
      |
      | sortByKey (action)
     V
counts
      |
      | count (action)
     V
binnedCounts (on which action collect is applied to compute agg)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here. Kindly correct me @srowen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caching helps where more than one action is performed on the same RDD. That's not the case here. Each of the first two has one thing executed on it. sortByKey is not an action, anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. One question here, will it be worth persisting counts since actions count and collect is applied directly on it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem so. But that is the question I'd put to you in these cases - are you sure it makes a difference meaningful enough to overcome the overhead? I could imagine so here, just wondering if these are based on more investigation or benchmarking, vs just trying to persist lots of things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure it makes a difference meaningful enough to overcome the overhead?

I think, no. Persisting count doesn't makes sense here. It will just be an overhead. Now I am getting clear picture of where to use persist.
Key learnings from this PR about persist.

  • persist introduce memory and CPU overheads.

  • So only important inputs (such as intermediate results, user data which is already cached, etc) should be persisted or RDD on which more than one action is performed.

  • Avoid using persist in loop.

  • Persist should be meaningful enough to overcome overheads.

Copy link
Contributor Author

@amanomer amanomer Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TYSM @srowen . Looking forward for more learning opportunities.


val binnedCounts =
// Only down-sample if bins is > 0
Expand Down Expand Up @@ -215,6 +220,7 @@ class BinaryClassificationMetrics @Since("3.0.0") (
val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
binnedWeights.unpersist()
logInfo(s"Total counts: $totalCount")
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
Expand Down