From e49fbd32cd3cb6b047c1dc305a193e62bc5f3cd0 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 31 May 2017 11:09:14 +0800 Subject: [PATCH 1/4] create pr --- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index df2a9c0dd509..713d09fa358c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -85,7 +85,9 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec @Since("0.8.0") def computeCost(data: RDD[Vector]): Double = { val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + val cost data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + bcCentersWithNorm.destroy(blocking = false) + cost } From c390ab238b8c65bf88ae0ef0bd0a070b5f811da5 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 31 May 2017 11:09:38 +0800 Subject: [PATCH 2/4] create pr --- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 713d09fa358c..3ad08c46d204 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -85,7 +85,8 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec @Since("0.8.0") def computeCost(data: RDD[Vector]): Double = { val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm) - val cost data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + val cost = data + .map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() bcCentersWithNorm.destroy(blocking = false) cost } From 9280726dc1031db23758998e612d15ba783b6c20 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 1 Jun 2017 10:08:41 +0800 Subject: [PATCH 3/4] fix gd & lda --- .../main/scala/org/apache/spark/mllib/clustering/LDAModel.scala | 1 + .../org/apache/spark/mllib/optimization/GradientDescent.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 663f63c25a94..9597c0412f51 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -320,6 +320,7 @@ class LocalLDAModel private[spark] ( docBound }.sum() + ElogbetaBc.destroy(blocking = false) // Bound component for prob(topic-term distributions): // E[log p(beta | eta) - log q(beta | lambda)] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 07a67a9e719d..593cdd602faf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -246,6 +246,7 @@ object GradientDescent extends Logging { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) }) + bcWeights.destroy(blocking = false) if (miniBatchSize > 0) { /** From 3fd52a82c1cf8e7c1219c2a7dc9c93b3ee9a7a95 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 5 Jun 2017 10:16:44 +0800 Subject: [PATCH 4/4] del unused broadcasted object --- .../scala/org/apache/spark/mllib/clustering/LDAModel.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 9597c0412f51..4ab420058f33 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -373,7 +373,6 @@ class LocalLDAModel private[spark] ( */ private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = { val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) - val expElogbetaBc = sc.broadcast(expElogbeta) val docConcentrationBrz = this.docConcentration.asBreeze val gammaShape = this.gammaShape val k = this.k @@ -384,7 +383,7 @@ class LocalLDAModel private[spark] ( } else { val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, - expElogbetaBc.value, + expElogbeta, docConcentrationBrz, gammaShape, k)