diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 76aeebd703d4e..f85e181be2a42 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector} +import org.apache.spark.broadcast.Broadcast import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -51,6 +52,9 @@ class GaussianMixtureModel( require(weights.length == gaussians.length, "Length of weight and Gaussian arrays must match") + private var bcWeights: Option[Broadcast[Array[Double]]] = None + private var bcDists: Option[Broadcast[Array[MultivariateGaussian]]] = None + override protected def formatVersion = "1.0" override def save(sc: SparkContext, path: String): Unit = { @@ -82,10 +86,18 @@ class GaussianMixtureModel( */ def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = { val sc = points.sparkContext - val bcDists = sc.broadcast(gaussians) - val bcWeights = sc.broadcast(weights) + bcDists match { + case None => bcDists = Some(sc.broadcast(gaussians)) + case _ => + } + val lclBcDists = bcDists + bcWeights match { + case None => bcWeights = Some(sc.broadcast(weights)) + case _ => + } + val lclBcWeights = bcWeights points.map { x => - computeSoftAssignments(x.toBreeze.toDenseVector, bcDists.value, bcWeights.value, k) + computeSoftAssignments(x.toBreeze.toDenseVector, lclBcDists.get.value, lclBcWeights.get.value, k) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 96359024fa228..96b160cfa133a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.broadcast.Broadcast + import scala.collection.JavaConverters._ import org.json4s._ @@ -38,6 +40,8 @@ import org.apache.spark.sql.Row class KMeansModel ( val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable { + private var bcCentersWithNorm: Option[Broadcast[Iterable[VectorWithNorm]]] = None + /** A Java-friendly constructor that takes an Iterable of Vectors. */ def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray) @@ -51,9 +55,15 @@ class KMeansModel ( /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): RDD[Int] = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = points.context.broadcast(centersWithNorm) - points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1) + bcCentersWithNorm match { + case None => { + val centersWithNorm = clusterCentersWithNorm + bcCentersWithNorm = Some(points.context.broadcast(centersWithNorm)) + } + case _ => + } + val lclBcCentersWithNorm = bcCentersWithNorm + points.map(p => KMeans.findClosest(lclBcCentersWithNorm.get.value, new VectorWithNorm(p))._1) } /** Maps given points to their cluster indices. */ @@ -65,9 +75,15 @@ class KMeansModel ( * model on the given data. */ def computeCost(data: RDD[Vector]): Double = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = data.context.broadcast(centersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + bcCentersWithNorm match { + case None => { + val centersWithNorm = clusterCentersWithNorm + bcCentersWithNorm = Some(data.context.broadcast(centersWithNorm)) + } + case _ => + } + val lclBcCentersWithNorm = bcCentersWithNorm + data.map(p => KMeans.pointCost(lclBcCentersWithNorm.get.value, new VectorWithNorm(p))).sum() } private def clusterCentersWithNorm: Iterable[VectorWithNorm] = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 82f05e4a18cee..f03fe88fdf8e2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum} import breeze.numerics.{exp, lgamma} import org.apache.hadoop.fs.Path +import org.apache.spark.broadcast.Broadcast import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -192,6 +193,9 @@ class LocalLDAModel private[clustering] ( override protected[clustering] val gammaShape: Double = 100) extends LDAModel with Serializable { + private var expElogbetaBc: Option[Broadcast[BDM[Double]]] = None + private var ElogbetaBc: Option[Broadcast[BDM[Double]]] = None + override def k: Int = topics.numCols override def vocabSize: Int = topics.numRows @@ -282,13 +286,17 @@ class LocalLDAModel private[clustering] ( // transpose because dirichletExpectation normalizes by row and we need to normalize // by topic (columns of lambda) val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t - val ElogbetaBc = documents.sparkContext.broadcast(Elogbeta) + ElogbetaBc match { + case None => ElogbetaBc = Some(documents.sparkContext.broadcast(Elogbeta)) + case _ => + } + val lclElogbetaBc = ElogbetaBc // Sum bound components for each document: // component for prob(tokens) + component for prob(document-topic distribution) val corpusPart = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) => - val localElogbeta = ElogbetaBc.value + val localElogbeta = lclElogbetaBc.get.value var docBound = 0.0D val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, exp(localElogbeta), brzAlpha, gammaShape, k) @@ -331,7 +339,11 @@ class LocalLDAModel private[clustering] ( // Double transpose because dirichletExpectation normalizes by row and we need to normalize // by topic (columns of lambda) val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t) - val expElogbetaBc = documents.sparkContext.broadcast(expElogbeta) + expElogbetaBc match { + case None => expElogbetaBc = Some(documents.sparkContext.broadcast(expElogbeta)) + case _ => + } + val lclExpElogbetaBc = expElogbetaBc val docConcentrationBrz = this.docConcentration.toBreeze val gammaShape = this.gammaShape val k = this.k @@ -342,7 +354,7 @@ class LocalLDAModel private[clustering] ( } else { val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, - expElogbetaBc.value, + lclExpElogbetaBc.get.value, docConcentrationBrz, gammaShape, k)