diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 920b57756b625..c084f60575ab6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -17,7 +17,8 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM, normalize, sum => brzSum, DenseVector => BDV} +import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize} +import breeze.numerics.{lgamma, digamma, exp} import org.apache.hadoop.fs.Path @@ -53,6 +54,34 @@ abstract class LDAModel private[clustering] extends Saveable { /** Vocabulary size (number of terms or terms in the vocabulary) */ def vocabSize: Int + /** Dirichlet parameters for document-topic distribution. */ + protected def docConcentration: Vector + + /** + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution. + */ + def getDocConcentration: Vector = this.docConcentration + + /** Dirichlet parameters for topic-word distribution. */ + protected def topicConcentration: Double + + /** + * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' + * distributions over terms. + * + * This is the parameter to a symmetric Dirichlet distribution. + * + * Note: The topics' distributions over terms are called "beta" in the original LDA paper + * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + */ + def getTopicConcentration: Double = this.topicConcentration + + /** Shape parameter for random initialization of gamma. */ + protected def gammaShape: Double + /** * Inferred topics, where each topic is represented by a distribution over terms. * This is a matrix of size vocabSize x k, where each column is a topic. @@ -168,7 +197,10 @@ abstract class LDAModel private[clustering] extends Saveable { */ @Experimental class LocalLDAModel private[clustering] ( - private val topics: Matrix) extends LDAModel with Serializable { + private val topics: Matrix, + protected val docConcentration: Vector, + protected val topicConcentration: Double, + protected val gammaShape: Double) extends LDAModel with Serializable { override def k: Int = topics.numCols @@ -186,17 +218,124 @@ class LocalLDAModel private[clustering] ( }.toArray } + /** + * Backwards compatibility constructor, assumes symmetric docConcentration=1/numTopics, + * topicConcentration=1, gammaShape=100. This is probably NOT what you want (instead you should + * set the parameters explicitly in constructor) + * @deprecated Provide alpha, eta, and gammaShape in constructor. + */ + @deprecated( + "Provide docConcentration, topicConcentration, and gammaShape in constructor", "1.5.0") + def this(topics: Matrix) { + this(topics, Vectors.dense(Array.fill(topics.numRows)(1.0 / topics.numRows)), 1D, 100D) + } + override protected def formatVersion = "1.0" override def save(sc: SparkContext, path: String): Unit = { LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix) } + // TODO // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ??? // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??? + /** + * Calculate and return per-word likelihood bound, using the `batch` of + * documents as evaluation corpus. + */ + // TODO: calcualte logPerplexity over training set online during training, reusing gammad instead + // of performing variational inference again in [[bound()]] + def logPerplexity( + batch: RDD[(Long, Vector)], + totalDocs: Long): Double = { + val corpusWords = batch + .flatMap { case (_, termCounts) => termCounts.toArray } + .reduce(_ + _) + val subsampleRatio = totalDocs.toDouble / batch.count() + val batchVariationalBound = bound( + batch, + subsampleRatio, + docConcentration, + topicConcentration, + topicsMatrix.toBreeze.toDenseMatrix, + gammaShape, + k, + vocabSize) + val perWordBound = batchVariationalBound / (subsampleRatio * corpusWords) + + perWordBound + } + + + /** + * Estimate the variational bound of documents from `batch`: + * E_q[log p(bath)] - E_q[log q(batch)] + */ + private def bound( + batch: RDD[(Long, Vector)], + subsampleRatio: Double, + alpha: Vector, + eta: Double, + lambda: BDM[Double], + gammaShape: Double, + k: Int, + vocabSize: Long): Double = { + val brzAlpha = alpha.toBreeze.toDenseVector + // transpose because dirichletExpectation normalizes by row and we need to normalize + // by topic (columns of lambda) + val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t + + var score = batch.map { case (id: Long, termCounts: Vector) => + var docScore = 0.0D + val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference( + termCounts, exp(Elogbeta), brzAlpha, gammaShape, k) + val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad) + + // E[log p(doc | theta, beta)] + termCounts.foreachActive { case (id, count) => + docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(id, ::).t) + } + // E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector + docScore += sum((brzAlpha - gammad) :* Elogthetad) + docScore += sum(lgamma(gammad) - lgamma(brzAlpha)) + docScore += lgamma(sum(brzAlpha)) - lgamma(sum(gammad)) + + docScore + }.sum() + + // compensate likelihood for when `corpus` above is only a sample of the whole corpus + score *= subsampleRatio + + // E[log p(beta | eta) - log q (beta | lambda)]; assumes eta is a scalar + score += sum((eta - lambda) :* Elogbeta) + score += sum(lgamma(lambda) - lgamma(eta)) + + val sumEta = eta * vocabSize + score += sum(lgamma(sumEta) - lgamma(sum(lambda(::, breeze.linalg.*)))) + + score + } + + /** + * Predicts the topic mixture distribution gamma for a document. + */ + def topicDistribution(doc: (Long, Vector)): (Long, Vector) = { + // Double transpose because dirichletExpectation normalizes by row and we need to normalize + // by topic (columns of lambda) + val Elogbeta = LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t + + val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference( + doc._2, + exp(Elogbeta), + this.docConcentration, + this.gammaShape, + this.k) + (doc._1, Vectors.dense((gamma / sum(gamma)).toArray)) + } + } @Experimental @@ -212,6 +351,8 @@ object LocalLDAModel extends Loader[LocalLDAModel] { // as a Row in data. case class Data(topic: Vector, index: Int) + // TODO: explicitly save docConcentration, topicConcentration, and gammaShape for use in + // model.predict() def save(sc: SparkContext, path: String, topicsMatrix: Matrix): Unit = { val sqlContext = SQLContext.getOrCreate(sc) import sqlContext.implicits._ @@ -287,15 +428,26 @@ class DistributedLDAModel private ( private val globalTopicTotals: LDA.TopicCounts, val k: Int, val vocabSize: Int, - private val docConcentration: Double, - private val topicConcentration: Double, + protected val docConcentration: Vector, + protected val topicConcentration: Double, + protected val gammaShape: Double, private[spark] val iterationTimes: Array[Double]) extends LDAModel { import LDA._ - private[clustering] def this(state: EMLDAOptimizer, iterationTimes: Array[Double]) = { - this(state.graph, state.globalTopicTotals, state.k, state.vocabSize, state.docConcentration, - state.topicConcentration, iterationTimes) + private[clustering] def this( + state: EMLDAOptimizer, + iterationTimes: Array[Double], + gammaShape: Double) = { + this( + state.graph, + state.globalTopicTotals, + state.k, + state.vocabSize, + Vectors.dense(Array.fill(state.k)(state.docConcentration)), + state.topicConcentration, + gammaShape, + iterationTimes) } /** @@ -303,7 +455,11 @@ class DistributedLDAModel private ( * The local model stores the inferred topics but not the topic distributions for training * documents. */ - def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix) + def toLocal: LocalLDAModel = new LocalLDAModel( + topicsMatrix, + docConcentration, + topicConcentration, + gammaShape) /** * Inferred topics, where each topic is represented by a distribution over terms. @@ -375,8 +531,9 @@ class DistributedLDAModel private ( * hyperparameters. */ lazy val logLikelihood: Double = { - val eta = topicConcentration - val alpha = docConcentration + // TODO: generalize this for asymmetric (non-scalar) alpha + val alpha = this.docConcentration(0) // To avoid closure capture of enclosing object + val eta = this.topicConcentration assert(eta > 1.0) assert(alpha > 1.0) val N_k = globalTopicTotals @@ -400,8 +557,9 @@ class DistributedLDAModel private ( * log P(topics, topic distributions for docs | alpha, eta) */ lazy val logPrior: Double = { - val eta = topicConcentration - val alpha = docConcentration + // TODO: generalize this for asymmetric (non-scalar) alpha + val alpha = this.docConcentration(0) // To avoid closure capture of enclosing object + val eta = this.topicConcentration // Term vertices: Compute phi_{wk}. Use to compute prior log probability. // Doc vertex: Compute theta_{kj}. Use to compute prior log probability. val N_k = globalTopicTotals @@ -412,12 +570,12 @@ class DistributedLDAModel private ( val N_wk = vertex._2 val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0) val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k - (eta - 1.0) * brzSum(phi_wk.map(math.log)) + (eta - 1.0) * sum(phi_wk.map(math.log)) } else { val N_kj = vertex._2 val smoothed_N_kj: TopicCounts = N_kj + (alpha - 1.0) val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) - (alpha - 1.0) * brzSum(theta_kj.map(math.log)) + (alpha - 1.0) * sum(theta_kj.map(math.log)) } } graph.vertices.aggregate(0.0)(seqOp, _ + _) @@ -448,7 +606,7 @@ class DistributedLDAModel private ( override def save(sc: SparkContext, path: String): Unit = { DistributedLDAModel.SaveLoadV1_0.save( sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, - iterationTimes) + iterationTimes, gammaShape) } } @@ -478,17 +636,20 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { globalTopicTotals: LDA.TopicCounts, k: Int, vocabSize: Int, - docConcentration: Double, + docConcentration: Vector, topicConcentration: Double, - iterationTimes: Array[Double]): Unit = { + iterationTimes: Array[Double], + gammaShape: Double): Unit = { val sqlContext = SQLContext.getOrCreate(sc) import sqlContext.implicits._ val metadata = compact(render (("class" -> classNameV1_0) ~ ("version" -> thisFormatVersion) ~ - ("k" -> k) ~ ("vocabSize" -> vocabSize) ~ ("docConcentration" -> docConcentration) ~ - ("topicConcentration" -> topicConcentration) ~ - ("iterationTimes" -> iterationTimes.toSeq))) + ("k" -> k) ~ ("vocabSize" -> vocabSize) ~ + ("docConcentration" -> docConcentration.toArray.toSeq) ~ + ("topicConcentration" -> topicConcentration) ~ + ("iterationTimes" -> iterationTimes.toSeq) ~ + ("gammaShape" -> gammaShape))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString @@ -510,9 +671,10 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { sc: SparkContext, path: String, vocabSize: Int, - docConcentration: Double, + docConcentration: Vector, topicConcentration: Double, - iterationTimes: Array[Double]): DistributedLDAModel = { + iterationTimes: Array[Double], + gammaShape: Double): DistributedLDAModel = { val dataPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString val vertexDataPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString val edgeDataPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString @@ -536,7 +698,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges) new DistributedLDAModel(graph, globalTopicTotals, globalTopicTotals.length, vocabSize, - docConcentration, topicConcentration, iterationTimes) + docConcentration, topicConcentration, gammaShape, iterationTimes) } } @@ -546,15 +708,23 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { implicit val formats = DefaultFormats val expectedK = (metadata \ "k").extract[Int] val vocabSize = (metadata \ "vocabSize").extract[Int] - val docConcentration = (metadata \ "docConcentration").extract[Double] + val docConcentration = + Vectors.dense((metadata \ "docConcentration").extract[Seq[Double]].toArray) val topicConcentration = (metadata \ "topicConcentration").extract[Double] val iterationTimes = (metadata \ "iterationTimes").extract[Seq[Double]] + val gammaShape = (metadata \ "gammaShape").extract[Double] val classNameV1_0 = SaveLoadV1_0.classNameV1_0 val model = (loadedClassName, loadedVersion) match { case (className, "1.0") if className == classNameV1_0 => { DistributedLDAModel.SaveLoadV1_0.load( - sc, path, vocabSize, docConcentration, topicConcentration, iterationTimes.toArray) + sc, + path, + vocabSize, + docConcentration, + topicConcentration, + iterationTimes.toArray, + gammaShape) } case _ => throw new Exception( s"DistributedLDAModel.load did not recognize model with (className, format version):" + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index f4170a3d98dd8..5c5434f72c59e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -19,10 +19,9 @@ package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum} -import breeze.numerics.{abs, digamma, exp} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, all, normalize, sum} +import breeze.numerics.{trigamma, abs, digamma, exp} import breeze.stats.distributions.{Gamma, RandBasis} - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl @@ -208,7 +207,9 @@ final class EMLDAOptimizer extends LDAOptimizer { override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() - new DistributedLDAModel(this, iterationTimes) + // This assumes gammaShape = 100 in OnlineLDAOptimizer to ensure equivalence in LDAModel.toLocal + // conversion + new DistributedLDAModel(this, iterationTimes, 100) } } @@ -225,7 +226,6 @@ final class EMLDAOptimizer extends LDAOptimizer { */ @DeveloperApi final class OnlineLDAOptimizer extends LDAOptimizer { - // LDA common parameters private var k: Int = 0 private var corpusSize: Long = 0 @@ -250,6 +250,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { private var tau0: Double = 1024 private var kappa: Double = 0.51 private var miniBatchFraction: Double = 0.05 + private var optimizeAlpha: Boolean = false // internal data structure private var docs: RDD[(Long, Vector)] = null @@ -319,6 +320,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer { this } + /** + * Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution) + * will be optimized during training. + */ + def getOptimzeAlpha: Boolean = this.optimizeAlpha + + /** + * Sets whether to optimize alpha parameter during training. + * + * Default: false + */ + def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = { + this.optimizeAlpha = optimizeAlpha + this + } + /** * (private[clustering]) * Set the Dirichlet parameter for the posterior over topics. @@ -371,7 +388,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } override private[clustering] def next(): OnlineLDAOptimizer = { - val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) + val batch = docs.sample(withReplacement = false, miniBatchFraction, randomGenerator.nextLong()) if (batch.isEmpty()) return this submitMiniBatch(batch) } @@ -385,73 +402,87 @@ final class OnlineLDAOptimizer extends LDAOptimizer { iteration += 1 val k = this.k val vocabSize = this.vocabSize - val Elogbeta = dirichletExpectation(lambda).t - val expElogbeta = exp(Elogbeta) + val expElogbeta = exp(LDAUtils.dirichletExpectation(lambda)).t val alpha = this.alpha.toBreeze val gammaShape = this.gammaShape - val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => + val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => val stat = BDM.zeros[Double](k, vocabSize) - docs.foreach { doc => - val termCounts = doc._2 - val (ids: List[Int], cts: Array[Double]) = termCounts match { - case v: DenseVector => ((0 until v.size).toList, v.values) - case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Online LDA does not support vector type " - + v.getClass) + var gammaPart = List[BDV[Double]]() + docs.foreach { case (_, termCounts: Vector) => + val ids: List[Int] = termCounts match { + case v: DenseVector => (0 until v.size).toList + case v: SparseVector => v.indices.toList } if (!ids.isEmpty) { - - // Initialize the variational distribution q(theta|gamma) for the mini-batch - val gammad: BDV[Double] = - new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k) // K - val expElogthetad: BDV[Double] = exp(digamma(gammad) - digamma(sum(gammad))) // K - val expElogbetad: BDM[Double] = expElogbeta(ids, ::).toDenseMatrix // ids * K - - val phinorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids - var meanchange = 1D - val ctsVector = new BDV[Double](cts) // ids - - // Iterate between gamma and phi until convergence - while (meanchange > 1e-3) { - val lastgamma = gammad.copy - // K K * ids ids - gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phinorm))) :+ alpha - expElogthetad := exp(digamma(gammad) - digamma(sum(gammad))) - phinorm := expElogbetad * expElogthetad :+ 1e-100 - meanchange = sum(abs(gammad - lastgamma)) / k - } - - stat(::, ids) := expElogthetad.asDenseMatrix.t * (ctsVector :/ phinorm).asDenseMatrix + val (gammad, sstats) = OnlineLDAOptimizer.variationalTopicInference( + termCounts, expElogbeta, alpha, gammaShape, k) + stat(::, ids) := sstats + gammaPart = gammad :: gammaPart } } - Iterator(stat) + Iterator((stat, gammaPart)) } - - val statsSum: BDM[Double] = stats.reduce(_ += _) + val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _) + val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( + stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*) val batchResult = statsSum :* expElogbeta.t // Note that this is an optimization to avoid batch.count - update(batchResult, iteration, (miniBatchFraction * corpusSize).ceil.toInt) + updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) + if (optimizeAlpha) updateAlpha(gammat) this } override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { - new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) + new LocalLDAModel(Matrices.fromBreeze(lambda).transpose, alpha, eta, gammaShape) } /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = { + private def updateLambda(stat: BDM[Double], batchSize: Int): Unit = { // weight of the mini-batch. - val weight = math.pow(getTau0 + iter, -getKappa) + val weight = rho() // Update lambda based on documents. - lambda = lambda * (1 - weight) + - (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight + lambda := (1 - weight) * lambda + + weight * (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) } + /** + * Update (in place) parameters for the Dirichlet prior on the per-document + * topic weights `alpha` given the last `gammat`. + * Uses Newton's method, + * @see Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters + * (http://www.stanford.edu/~jhuang11/research/dirichlet/dirichlet.pdf) + */ + private def updateAlpha(gammat: BDM[Double]): Unit = { + val weight = rho() + val N = gammat.rows.toDouble + val alpha = this.alpha.toBreeze.toDenseVector + val logphat: BDM[Double] = sum(LDAUtils.dirichletExpectation(gammat)(::, breeze.linalg.*)) / N + val gradf = N * (-LDAUtils.dirichletExpectation(alpha) + logphat.toDenseVector) + + val c = N * trigamma(sum(alpha)) + val q = -N * trigamma(alpha) + + val b = sum(gradf / q) / (1D / c + sum(1D / q)) + + val dalpha = -(gradf - b) / q + + if (all((weight * dalpha + alpha) :> 0D)) { + alpha :+= weight * dalpha + this.alpha = Vectors.dense(alpha.toArray) + } + } + + /** Calculates learning rate rho, which decays as a function of [[iteration]] */ + private def rho(): Double = { + math.pow(getTau0 + this.iteration, -getKappa) + } + + /** * Get a random matrix to initialize lambda */ @@ -462,16 +493,59 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val temp = gammaRandomGenerator.sample(row * col).toArray new BDM[Double](col, row, temp).t } +} + +/** + * Serializable companion object containing helper methods for OnlineLDA + */ +object OnlineLDAOptimizer { + private[clustering] def variationalTopicInference( + termCounts: Vector, + expElogbeta: BDM[Double], + alpha: breeze.linalg.Vector[Double], + gammaShape: Double, + k: Int): (BDV[Double], BDM[Double]) = { + val (ids: List[Int], cts: Array[Double]) = termCounts match { + case v: DenseVector => ((0 until v.size).toList, v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Online LDA does not support vector type " + + v.getClass) + } + // Initialize the variational distribution q(theta|gamma) for the mini-batch + val gammad: BDV[Double] = + new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k) // K + val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K + val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K + + val phinorm: BDV[Double] = expElogbetad * expElogthetad + 1e-100 // ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts) // ids + + // Iterate between gamma and phi until convergence + while (meanchange > 1e-3) { + val lastgamma = gammad.copy + // K K * ids ids + gammad := (expElogthetad :* (expElogbetad.t * (ctsVector / phinorm))) + alpha + expElogthetad := exp(LDAUtils.dirichletExpectation(gammad)) + phinorm := expElogbetad * expElogthetad + 1e-100 + meanchange = sum(abs(gammad - lastgamma)) / k + } + + val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector / phinorm).asDenseMatrix + (gammad, sstatsd) + } - /** - * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation - * uses digamma which is accurate but expensive. - */ - private def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { - val rowSum = sum(alpha(breeze.linalg.*, ::)) - val digAlpha = digamma(alpha) - val digRowSum = digamma(rowSum) - val result = digAlpha(::, breeze.linalg.*) - digRowSum - result + private[clustering] def variationalTopicInference( + termCounts: Vector, + expElogbeta: BDM[Double], + alpha: Vector, + gammaShape: Double, + k: Int): (BDV[Double], BDM[Double]) = { + OnlineLDAOptimizer.variationalTopicInference( + termCounts, + expElogbeta, + alpha.toBreeze, + gammaShape, + k) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala new file mode 100644 index 0000000000000..f7e5ce1665fe6 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, max, sum} +import breeze.numerics._ + +/** + * Utility methods for LDA. + */ +object LDAUtils { + /** + * Log Sum Exp with overflow protection using the identity: + * For any a: \log \sum_{n=1}^N \exp\{x_n\} = a + \log \sum_{n=1}^N \exp\{x_n - a\} + */ + private[clustering] def logSumExp(x: BDV[Double]): Double = { + val a = max(x) + a + log(sum(exp(x :- a))) + } + + /** + * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation + * uses [[breeze.numerics.digamma]] which is accurate but expensive. + */ + private[clustering] def dirichletExpectation(alpha: BDV[Double]): BDV[Double] = { + digamma(alpha) - digamma(sum(alpha)) + } + + /** + * Computes [[dirichletExpectation()]] row-wise, assuming each row of alpha are + * Dirichlet parameters. + */ + private[clustering] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { + val rowSum = sum(alpha(breeze.linalg.*, ::)) + val digAlpha = digamma(alpha) + val digRowSum = digamma(rowSum) + val result = digAlpha(::, breeze.linalg.*) - digRowSum + result + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index da70d9bd7c790..87e88110d653f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseMatrix => BDM} +import breeze.linalg.{DenseMatrix => BDM, max, argmax} import org.apache.spark.SparkFunSuite import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors} @@ -221,7 +221,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { .setOptimizer(op) .setSeed(12345) - val ldaModel = lda.run(docs) + // NOTE: this assumes OnlineLDAOptimizer returns a LocalLDAModel + val ldaModel: LocalLDAModel = lda.run(docs).asInstanceOf[LocalLDAModel] val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) val topics = topicIndices.map { case (terms, termWeights) => terms.zip(termWeights) @@ -234,6 +235,104 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { } } + test("LocalLDAModel logPerplexity and predict") { + val k = 2 + val vocabSize = 6 + val alpha = 0.01 + val eta = 0.01 + val gammaShape = 100 + val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array( + 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597, + 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124)) + + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + val docs = sc.parallelize(toydata) + + + val ldaModel: LocalLDAModel = new LocalLDAModel( + topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape) + + /* Verify results using gensim: + import numpy as np + from gensim import models + corpus = [ + [(0, 1.0), (1, 1.0)], + [(1, 1.0), (2, 1.0)], + [(0, 1.0), (2, 1.0)], + [(3, 1.0), (4, 1.0)], + [(3, 1.0), (5, 1.0)], + [(4, 1.0), (5, 1.0)]] + np.random.seed(2345) + lda = models.ldamodel.LdaModel( + corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100, + decay=0.51, offset=1024) + print(lda.log_perplexity(corpus)) + > -3.69051285096 + print(list(lda.get_document_topics(corpus))) + > [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)], + > [(0, 0.99504950495049516)], [(1, 0.99504950495049516)], + > [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]] + */ + + // check logPerplexity + assert(ldaModel.logPerplexity(docs, docs.count()) ~== -3.690D relTol 1E-3D) + + // check predicting topic distribution for a document + val expectedPredictions = List( + (0, 0.99504), (0, 0.99504), + (0, 0.99504), (1, 0.99504), + (1, 0.99504), (1, 0.99504)) + expectedPredictions.zip(docs.map { doc => + val (_, topics) = ldaModel.topicDistribution(doc) + val topicsBz = topics.toBreeze.toDenseVector + (argmax(topicsBz), max(topicsBz)) + }.collect()).forall { + case (expected, actual) => + expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D) + } + } + + test("OnlineLDAOptimizer alpha hyperparameter optimization") { + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + + val k = 2 + val docs = sc.parallelize(toydata) + val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51) + .setGammaShape(100).setOptimzeAlpha(true) + val lda = new LDA().setK(k) + .setDocConcentration(1D / k) + .setTopicConcentration(0.01) + .setMaxIterations(100) + .setOptimizer(op) + .setSeed(2345) + + val ldaModel: LocalLDAModel = lda.run(docs).asInstanceOf[LocalLDAModel] + + /* Verify the results with gensim: + import numpy as np + from gensim import models + corpus = [ + [(4, 1.0), (5, 1.0)]] + np.random.seed(2345) + lda = models.ldamodel.LdaModel( + corpus=corpus, alpha='auto', eta=0.01, num_topics=2, update_every=0, passes=100, + decay=0.51, offset=1024) + print(lda.alpha) + > [ 0.3210193 1.83267602 ] + */ + + assert(ldaModel.getDocConcentration ~== Vectors.dense(0.321, 1.832) relTol 0.05) + } + test("OnlineLDAOptimizer with asymmetric prior") { def toydata: Array[(Long, Vector)] = Array( Vectors.sparse(6, Array(0, 1), Array(1, 1)),