Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec
@Since("0.8.0")
def computeCost(data: RDD[Vector]): Double = {
val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm)
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
val cost = data
.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
bcCentersWithNorm.destroy(blocking = false)
cost
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ class LocalLDAModel private[spark] (

docBound
}.sum()
ElogbetaBc.destroy(blocking = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good; how about in the getTopicDistributionMethod method too? I actually think that broadcast is pointless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTopicDistributionMethod method is only used in ml.clustering.LDA#transform, so I think this maybe useful if the model size is large.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhengruifeng but the broadcast isn't actually used. Its .value is called, locally, not from a distributed method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen You are right. I removed the unnecessary broadcasting.


// Bound component for prob(topic-term distributions):
// E[log p(beta | eta) - log q(beta | lambda)]
Expand Down Expand Up @@ -372,7 +373,6 @@ class LocalLDAModel private[spark] (
*/
private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = {
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
val expElogbetaBc = sc.broadcast(expElogbeta)
val docConcentrationBrz = this.docConcentration.asBreeze
val gammaShape = this.gammaShape
val k = this.k
Expand All @@ -383,7 +383,7 @@ class LocalLDAModel private[spark] (
} else {
val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
expElogbetaBc.value,
expElogbeta,
docConcentrationBrz,
gammaShape,
k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ object GradientDescent extends Logging {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
bcWeights.destroy(blocking = false)

if (miniBatchSize > 0) {
/**
Expand Down