Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BreezeVector}
import org.apache.spark.broadcast.Broadcast

import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
Expand Down Expand Up @@ -51,6 +52,9 @@ class GaussianMixtureModel(

require(weights.length == gaussians.length, "Length of weight and Gaussian arrays must match")

private var bcWeights: Option[Broadcast[Array[Double]]] = None
private var bcDists: Option[Broadcast[Array[MultivariateGaussian]]] = None

override protected def formatVersion = "1.0"

override def save(sc: SparkContext, path: String): Unit = {
Expand Down Expand Up @@ -82,10 +86,18 @@ class GaussianMixtureModel(
*/
def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = {
val sc = points.sparkContext
val bcDists = sc.broadcast(gaussians)
val bcWeights = sc.broadcast(weights)
bcDists match {
case None => bcDists = Some(sc.broadcast(gaussians))
case _ =>
}
val lclBcDists = bcDists
bcWeights match {
case None => bcWeights = Some(sc.broadcast(weights))
case _ =>
}
val lclBcWeights = bcWeights
points.map { x =>
computeSoftAssignments(x.toBreeze.toDenseVector, bcDists.value, bcWeights.value, k)
computeSoftAssignments(x.toBreeze.toDenseVector, lclBcDists.get.value, lclBcWeights.get.value, k)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.mllib.clustering

import org.apache.spark.broadcast.Broadcast

import scala.collection.JavaConverters._

import org.json4s._
Expand All @@ -38,6 +40,8 @@ import org.apache.spark.sql.Row
class KMeansModel (
val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable {

private var bcCentersWithNorm: Option[Broadcast[Iterable[VectorWithNorm]]] = None

/** A Java-friendly constructor that takes an Iterable of Vectors. */
def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray)

Expand All @@ -51,9 +55,15 @@ class KMeansModel (

/** Maps given points to their cluster indices. */
def predict(points: RDD[Vector]): RDD[Int] = {
val centersWithNorm = clusterCentersWithNorm
val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1)
bcCentersWithNorm match {
case None => {
val centersWithNorm = clusterCentersWithNorm
bcCentersWithNorm = Some(points.context.broadcast(centersWithNorm))
}
case _ =>
}
val lclBcCentersWithNorm = bcCentersWithNorm
points.map(p => KMeans.findClosest(lclBcCentersWithNorm.get.value, new VectorWithNorm(p))._1)
}

/** Maps given points to their cluster indices. */
Expand All @@ -65,9 +75,15 @@ class KMeansModel (
* model on the given data.
*/
def computeCost(data: RDD[Vector]): Double = {
val centersWithNorm = clusterCentersWithNorm
val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
bcCentersWithNorm match {
case None => {
val centersWithNorm = clusterCentersWithNorm
bcCentersWithNorm = Some(data.context.broadcast(centersWithNorm))
}
case _ =>
}
val lclBcCentersWithNorm = bcCentersWithNorm
data.map(p => KMeans.pointCost(lclBcCentersWithNorm.get.value, new VectorWithNorm(p))).sum()
}

private def clusterCentersWithNorm: Iterable[VectorWithNorm] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum}
import breeze.numerics.{exp, lgamma}
import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -192,6 +193,9 @@ class LocalLDAModel private[clustering] (
override protected[clustering] val gammaShape: Double = 100)
extends LDAModel with Serializable {

private var expElogbetaBc: Option[Broadcast[BDM[Double]]] = None
private var ElogbetaBc: Option[Broadcast[BDM[Double]]] = None

override def k: Int = topics.numCols

override def vocabSize: Int = topics.numRows
Expand Down Expand Up @@ -282,13 +286,17 @@ class LocalLDAModel private[clustering] (
// transpose because dirichletExpectation normalizes by row and we need to normalize
// by topic (columns of lambda)
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
val ElogbetaBc = documents.sparkContext.broadcast(Elogbeta)
ElogbetaBc match {
case None => ElogbetaBc = Some(documents.sparkContext.broadcast(Elogbeta))
case _ =>
}
val lclElogbetaBc = ElogbetaBc

// Sum bound components for each document:
// component for prob(tokens) + component for prob(document-topic distribution)
val corpusPart =
documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) =>
val localElogbeta = ElogbetaBc.value
val localElogbeta = lclElogbetaBc.get.value
var docBound = 0.0D
val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, exp(localElogbeta), brzAlpha, gammaShape, k)
Expand Down Expand Up @@ -331,7 +339,11 @@ class LocalLDAModel private[clustering] (
// Double transpose because dirichletExpectation normalizes by row and we need to normalize
// by topic (columns of lambda)
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
val expElogbetaBc = documents.sparkContext.broadcast(expElogbeta)
expElogbetaBc match {
case None => expElogbetaBc = Some(documents.sparkContext.broadcast(expElogbeta))
case _ =>
}
val lclExpElogbetaBc = expElogbetaBc
val docConcentrationBrz = this.docConcentration.toBreeze
val gammaShape = this.gammaShape
val k = this.k
Expand All @@ -342,7 +354,7 @@ class LocalLDAModel private[clustering] (
} else {
val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
expElogbetaBc.value,
lclExpElogbetaBc.get.value,
docConcentrationBrz,
gammaShape,
k)
Expand Down