From 840ac03fa73f6de04759659feaceca5f791320bb Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Fri, 8 Jul 2016 23:17:22 -0700 Subject: [PATCH 1/9] [SPARK-16240][ML] Model loading backward compatibility for LDA --- .../org/apache/spark/ml/clustering/LDA.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 034f2c3fa2fd..129e794c1347 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVect Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} @@ -574,16 +575,14 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) - .select("vocabSize", "topicsMatrix", "docConcentration", "topicConcentration", - "gammaShape") - .head() - val vocabSize = data.getAs[Int](0) - val topicsMatrix = data.getAs[Matrix](1) - val docConcentration = data.getAs[Vector](2) - val topicConcentration = data.getAs[Double](3) - val gammaShape = data.getAs[Double](4) + val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, + topicConcentration: Double, gammaShape: Double, topicDistributionCol: String) = + MLUtils.convertVectorColumnsToML(data, "vocabSize", "topicsMatrix", + "docConcentration", "topicConcentration", "gammaShape"). + select("vocabSize", "topicsMatrix", "docConcentration", + "topicConcentration", "gammaShape").head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, - gammaShape) + gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) DefaultParamsReader.getAndSetParams(model, metadata) model From 190a69fe01161eb022b98aeefa702cc0c13358aa Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Mon, 11 Jul 2016 11:04:42 -0700 Subject: [PATCH 2/9] Fixing topicDistributionCol issue --- .../org/apache/spark/ml/clustering/LDA.scala | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 129e794c1347..be9a69166701 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -18,6 +18,9 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging @@ -575,16 +578,28 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) + val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, - topicConcentration: Double, gammaShape: Double, topicDistributionCol: String) = - MLUtils.convertVectorColumnsToML(data, "vocabSize", "topicsMatrix", - "docConcentration", "topicConcentration", "gammaShape"). - select("vocabSize", "topicsMatrix", "docConcentration", - "topicConcentration", "gammaShape").head() + topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( + vectorConverted, "topicsMatrix").select("vocabSize", "topicsMatrix", "docConcentration", + "topicConcentration", "gammaShape").head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) - DefaultParamsReader.getAndSetParams(model, metadata) + implicit val format = DefaultFormats + metadata.params match { + case JObject(pairs) => + pairs.foreach { case (paramName, jsonValue) => + val variable = + if (paramName == "topicDistribution") "topicDistributionCol" else paramName + val param = model.getParam(variable) + val value = param.jsonDecode(compact(render(jsonValue))) + model.set(param, value) + } + case _ => + throw new IllegalArgumentException( + s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") + } model } } From 45fa3b06a7ca0ff3bffbdf532e0bbccbf2638bc1 Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Mon, 11 Jul 2016 16:00:43 -0700 Subject: [PATCH 3/9] Review comments and Seperate coding logic --- .../org/apache/spark/ml/clustering/LDA.scala | 63 +++++++++++-------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index be9a69166701..016cd4067341 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -84,7 +84,8 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. - * @group param + * + * @group param */ @Since("1.6.0") final val docConcentration = new DoubleArrayParam(this, "docConcentration", @@ -125,7 +126,8 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. - * @group param + * + * @group param */ @Since("1.6.0") final val topicConcentration = new DoubleParam(this, "topicConcentration", @@ -553,13 +555,6 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { private[LocalLDAModel] class LocalLDAModelWriter(instance: LocalLDAModel) extends MLWriter { - private case class Data( - vocabSize: Int, - topicsMatrix: Matrix, - docConcentration: Vector, - topicConcentration: Double, - gammaShape: Double) - override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) val oldModel = instance.oldLocalModel @@ -570,36 +565,52 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { } } + private case class Data( + vocabSize: Int, + topicsMatrix: Matrix, + docConcentration: Vector, + topicConcentration: Double, + gammaShape: Double) + private class LocalLDAModelReader extends MLReader[LocalLDAModel] { private val className = classOf[LocalLDAModel].getName override def load(path: String): LocalLDAModel = { + // Import implicits for Dataset Encoder + val sparkSession = super.sparkSession + import sparkSession.implicits._ + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, - topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( - vectorConverted, "topicsMatrix").select("vocabSize", "topicsMatrix", "docConcentration", - "topicConcentration", "gammaShape").head() + topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( + vectorConverted, "topicsMatrix").as[Data] val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, - gammaShape) + gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) - implicit val format = DefaultFormats - metadata.params match { - case JObject(pairs) => - pairs.foreach { case (paramName, jsonValue) => - val variable = - if (paramName == "topicDistribution") "topicDistributionCol" else paramName - val param = model.getParam(variable) - val value = param.jsonDecode(compact(render(jsonValue))) - model.set(param, value) - } - case _ => - throw new IllegalArgumentException( - s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") + + metadata.sparkVersion match { + case "1.6" => + implicit val format = DefaultFormats + metadata.params match { + case JObject(pairs) => + pairs.foreach { case (paramName, jsonValue) => + val origParam = + if (paramName == "topicDistribution") "topicDistributionCol" else paramName + val param = model.getParam(origParam) + val value = param.jsonDecode(compact(render(jsonValue))) + model.set(param, value) + } + case _ => + throw new IllegalArgumentException( + s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") } + case "2.x" => + DefaultParamsReader.getAndSetParams(model, metadata) + } model } } From ac874c52008bd12b178dcd2d4e65a3cadf623653 Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Wed, 13 Jul 2016 19:55:26 -0700 Subject: [PATCH 4/9] Added model loading logic to Distributed LDA and fixed review comments --- .../org/apache/spark/ml/clustering/LDA.scala | 55 ++++++++++++++----- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 016cd4067341..4915afb7ca39 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -84,8 +84,8 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. - * - * @group param + * + * @group param */ @Since("1.6.0") final val docConcentration = new DoubleArrayParam(this, "docConcentration", @@ -126,8 +126,8 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. - * - * @group param + * + * @group param */ @Since("1.6.0") final val topicConcentration = new DoubleParam(this, "topicConcentration", @@ -566,11 +566,11 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { } private case class Data( - vocabSize: Int, - topicsMatrix: Matrix, - docConcentration: Vector, - topicConcentration: Double, - gammaShape: Double) + vocabSize: Int, + topicsMatrix: Matrix, + docConcentration: Vector, + topicConcentration: Double, + gammaShape: Double) private class LocalLDAModelReader extends MLReader[LocalLDAModel] { @@ -580,6 +580,8 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { // Import implicits for Dataset Encoder val sparkSession = super.sparkSession import sparkSession.implicits._ + // Pattern to determine sparkversion + val pattern = """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString @@ -587,11 +589,10 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( - vectorConverted, "topicsMatrix").as[Data] + vectorConverted, "topicsMatrix").as[Data].head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) - metadata.sparkVersion match { case "1.6" => implicit val format = DefaultFormats @@ -608,7 +609,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { throw new IllegalArgumentException( s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") } - case "2.x" => + case pattern => DefaultParamsReader.getAndSetParams(model, metadata) } model @@ -753,16 +754,40 @@ object DistributedLDAModel extends MLReadable[DistributedLDAModel] { private val className = classOf[DistributedLDAModel].getName override def load(path: String): DistributedLDAModel = { + + // Pattern to determine sparkversion + val pattern = + """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val modelPath = new Path(path, "oldModel").toString val oldModel = OldDistributedLDAModel.load(sc, modelPath) - val model = new DistributedLDAModel( - metadata.uid, oldModel.vocabSize, oldModel, sparkSession, None) - DefaultParamsReader.getAndSetParams(model, metadata) + val model = new DistributedLDAModel(metadata.uid, oldModel.vocabSize, + oldModel, sparkSession, None) + metadata.sparkVersion match { + case "1.6" => + implicit val format = DefaultFormats + metadata.params match { + case JObject(pairs) => + pairs.foreach { case (paramName, jsonValue) => + val origParam = + if (paramName == "topicDistribution") "topicDistributionCol" else paramName + val param = model.getParam(origParam) + val value = param.jsonDecode(compact(render(jsonValue))) + model.set(param, value) + } + case _ => + throw new IllegalArgumentException( + s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") + } + case pattern => + DefaultParamsReader.getAndSetParams(model, metadata) + } model } } + @Since("1.6.0") override def read: MLReader[DistributedLDAModel] = new DistributedLDAModelReader From 3b69b889bfd5acd4e4ee870c66607e9ec81a3e3b Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Wed, 13 Jul 2016 21:17:47 -0700 Subject: [PATCH 5/9] Bug fix --- mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 4915afb7ca39..4401c0bd4d36 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -589,7 +589,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( - vectorConverted, "topicsMatrix").as[Data].head() + vectorConverted, "topicsMatrix").as[Data] val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) From 34f8f3b5fbe159bc09aa3a8d96bd67f3af9606b0 Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Wed, 13 Jul 2016 22:47:18 -0700 Subject: [PATCH 6/9] Bug fix --- .../src/main/scala/org/apache/spark/ml/clustering/LDA.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 4401c0bd4d36..9b6a38818f4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -581,7 +581,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val sparkSession = super.sparkSession import sparkSession.implicits._ // Pattern to determine sparkversion - val pattern = """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + val versionRegex = """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString @@ -589,7 +589,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( - vectorConverted, "topicsMatrix").as[Data] + vectorConverted, "topicsMatrix").as[Data].head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) @@ -609,7 +609,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { throw new IllegalArgumentException( s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") } - case pattern => + case versionRegex => DefaultParamsReader.getAndSetParams(model, metadata) } model From 22fe753362bd0274b7b7c79245cfb1fba3c349eb Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Thu, 14 Jul 2016 10:57:36 -0700 Subject: [PATCH 7/9] Selecting columns directly instead of as[Data]- Unit test issue --- mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 9b6a38818f4e..4c50cf3811cc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -589,7 +589,8 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( - vectorConverted, "topicsMatrix").as[Data].head() + vectorConverted, "topicsMatrix").select("vocabSize", "topicsMatrix", "docConcentration", + "topicConcentration", "gammaShape").head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) From a8bdd7a748e63a1948f98130a704972b9cbca54b Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Tue, 19 Jul 2016 17:33:41 -0700 Subject: [PATCH 8/9] Added loading logic to LDA and wrapped topicdistribution loading in a separate function --- .../org/apache/spark/ml/clustering/LDA.scala | 371 +++++++++--------- 1 file changed, 192 insertions(+), 179 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 4c50cf3811cc..3f16392bf3f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -21,27 +21,23 @@ import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonAST.JObject import org.json4s.jackson.JsonMethods._ - import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{Matrix, Vector, VectorUDT, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} +import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util._ -import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, - EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, - LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, - OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.impl.PeriodicCheckpointer -import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, - Vectors => OldVectors} +import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} import org.apache.spark.sql.types.StructType @@ -49,10 +45,10 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM with HasSeed with HasCheckpointInterval { /** - * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. - * - * @group param - */ + * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. + * + * @group param + */ @Since("1.6.0") final val k = new IntParam(this, "k", "The number of topics (clusters) to infer. " + "Must be > 1.", ParamValidators.gt(1)) @@ -62,31 +58,31 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getK: Int = $(k) /** - * Concentration parameter (commonly named "alpha") for the prior placed on documents' - * distributions over topics ("theta"). - * - * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing - * (more regularization). - * - * If not set by the user, then docConcentration is set automatically. If set to - * singleton vector [alpha], then alpha is replicated to a vector of length k in fitting. - * Otherwise, the [[docConcentration]] vector must be length k. - * (default = automatic) - * - * Optimizer-specific parameter settings: - * - EM - * - Currently only supports symmetric distributions, so all values in the vector should be - * the same. - * - Values should be > 1.0 - * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows - * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - Online - * - Values should be >= 0 - * - default = uniformly (1.0 / k), following the implementation from - * [[https://github.com/Blei-Lab/onlineldavb]]. - * - * @group param - */ + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If not set by the user, then docConcentration is set automatically. If set to + * singleton vector [alpha], then alpha is replicated to a vector of length k in fitting. + * Otherwise, the [[docConcentration]] vector must be length k. + * (default = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * + * @group param + */ @Since("1.6.0") final val docConcentration = new DoubleArrayParam(this, "docConcentration", "Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + @@ -106,29 +102,29 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM } /** - * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' - * distributions over terms. - * - * This is the parameter to a symmetric Dirichlet distribution. - * - * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. - * - * If not set by the user, then topicConcentration is set automatically. - * (default = automatic) - * - * Optimizer-specific parameter settings: - * - EM - * - Value should be > 1.0 - * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows - * Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - Online - * - Value should be >= 0 - * - default = (1.0 / k), following the implementation from - * [[https://github.com/Blei-Lab/onlineldavb]]. - * - * @group param - */ + * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' + * distributions over terms. + * + * This is the parameter to a symmetric Dirichlet distribution. + * + * Note: The topics' distributions over terms are called "beta" in the original LDA paper + * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * + * If not set by the user, then topicConcentration is set automatically. + * (default = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * + * @group param + */ @Since("1.6.0") final val topicConcentration = new DoubleParam(this, "topicConcentration", "Concentration parameter (commonly named \"beta\" or \"eta\") for the prior placed on topic'" + @@ -152,23 +148,23 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM final val supportedOptimizers: Array[String] = Array("online", "em") /** - * Optimizer or inference algorithm used to estimate the LDA model. - * Currently supported (case-insensitive): - * - "online": Online Variational Bayes (default) - * - "em": Expectation-Maximization - * - * For details, see the following papers: - * - Online LDA: - * Hoffman, Blei and Bach. "Online Learning for Latent Dirichlet Allocation." - * Neural Information Processing Systems, 2010. - * [[http://www.cs.columbia.edu/~blei/papers/HoffmanBleiBach2010b.pdf]] - * - EM: - * Asuncion et al. "On Smoothing and Inference for Topic Models." - * Uncertainty in Artificial Intelligence, 2009. - * [[http://arxiv.org/pdf/1205.2662.pdf]] - * - * @group param - */ + * Optimizer or inference algorithm used to estimate the LDA model. + * Currently supported (case-insensitive): + * - "online": Online Variational Bayes (default) + * - "em": Expectation-Maximization + * + * For details, see the following papers: + * - Online LDA: + * Hoffman, Blei and Bach. "Online Learning for Latent Dirichlet Allocation." + * Neural Information Processing Systems, 2010. + * [[http://www.cs.columbia.edu/~blei/papers/HoffmanBleiBach2010b.pdf]] + * - EM: + * Asuncion et al. "On Smoothing and Inference for Topic Models." + * Uncertainty in Artificial Intelligence, 2009. + * [[http://arxiv.org/pdf/1205.2662.pdf]] + * + * @group param + */ @Since("1.6.0") final val optimizer = new Param[String](this, "optimizer", "Optimizer or inference" + " algorithm used to estimate the LDA model. Supported: " + supportedOptimizers.mkString(", "), @@ -179,15 +175,15 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getOptimizer: String = $(optimizer) /** - * Output column with estimates of the topic mixture distribution for each document (often called - * "theta" in the literature). Returns a vector of zeros for an empty document. - * - * This uses a variational approximation following Hoffman et al. (2010), where the approximate - * distribution is called "gamma." Technically, this method returns this approximation "gamma" - * for each document. - * - * @group param - */ + * Output column with estimates of the topic mixture distribution for each document (often called + * "theta" in the literature). Returns a vector of zeros for an empty document. + * + * This uses a variational approximation following Hoffman et al. (2010), where the approximate + * distribution is called "gamma." Technically, this method returns this approximation "gamma" + * for each document. + * + * @group param + */ @Since("1.6.0") final val topicDistributionCol = new Param[String](this, "topicDistributionCol", "Output column" + " with estimates of the topic mixture distribution for each document (often called \"theta\"" + @@ -200,15 +196,15 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getTopicDistributionCol: String = $(topicDistributionCol) /** - * For Online optimizer only: [[optimizer]] = "online". - * - * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less. - * This is called "tau0" in the Online LDA paper (Hoffman et al., 2010) - * Default: 1024, following Hoffman et al. - * - * @group expertParam - */ + * For Online optimizer only: [[optimizer]] = "online". + * + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less. + * This is called "tau0" in the Online LDA paper (Hoffman et al., 2010) + * Default: 1024, following Hoffman et al. + * + * @group expertParam + */ @Since("1.6.0") final val learningOffset = new DoubleParam(this, "learningOffset", "(For online optimizer)" + " A (positive) learning parameter that downweights early iterations. Larger values make early" + @@ -220,15 +216,15 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getLearningOffset: Double = $(learningOffset) /** - * For Online optimizer only: [[optimizer]] = "online". - * - * Learning rate, set as an exponential decay rate. - * This should be between (0.5, 1.0] to guarantee asymptotic convergence. - * This is called "kappa" in the Online LDA paper (Hoffman et al., 2010). - * Default: 0.51, based on Hoffman et al. - * - * @group expertParam - */ + * For Online optimizer only: [[optimizer]] = "online". + * + * Learning rate, set as an exponential decay rate. + * This should be between (0.5, 1.0] to guarantee asymptotic convergence. + * This is called "kappa" in the Online LDA paper (Hoffman et al., 2010). + * Default: 0.51, based on Hoffman et al. + * + * @group expertParam + */ @Since("1.6.0") final val learningDecay = new DoubleParam(this, "learningDecay", "(For online optimizer)" + " Learning rate, set as an exponential decay rate. This should be between (0.5, 1.0] to" + @@ -239,22 +235,22 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getLearningDecay: Double = $(learningDecay) /** - * For Online optimizer only: [[optimizer]] = "online". - * - * Fraction of the corpus to be sampled and used in each iteration of mini-batch gradient descent, - * in range (0, 1]. - * - * Note that this should be adjusted in synch with [[LDA.maxIter]] - * so the entire corpus is used. Specifically, set both so that - * maxIterations * miniBatchFraction >= 1. - * - * Note: This is the same as the `miniBatchFraction` parameter in - * [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]]. - * - * Default: 0.05, i.e., 5% of total documents. - * - * @group param - */ + * For Online optimizer only: [[optimizer]] = "online". + * + * Fraction of the corpus to be sampled and used in each iteration of mini-batch + * gradient descent, in range (0, 1]. + * + * Note that this should be adjusted in synch with [[LDA.maxIter]] + * so the entire corpus is used. Specifically, set both so that + * maxIterations * miniBatchFraction >= 1. + * + * Note: This is the same as the `miniBatchFraction` parameter in + * [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]]. + * + * Default: 0.05, i.e., 5% of total documents. + * + * @group param + */ @Since("1.6.0") final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "(For online optimizer)" + " Fraction of the corpus to be sampled and used in each iteration of mini-batch" + @@ -266,15 +262,15 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getSubsamplingRate: Double = $(subsamplingRate) /** - * For Online optimizer only (currently): [[optimizer]] = "online". - * - * Indicates whether the docConcentration (Dirichlet parameter for - * document-topic distribution) will be optimized during training. - * Setting this to true will make the model more expressive and fit the training data better. - * Default: false - * - * @group expertParam - */ + * For Online optimizer only (currently): [[optimizer]] = "online". + * + * Indicates whether the docConcentration (Dirichlet parameter for + * document-topic distribution) will be optimized during training. + * Setting this to true will make the model more expressive and fit the training data better. + * Default: false + * + * @group expertParam + */ @Since("1.6.0") final val optimizeDocConcentration = new BooleanParam(this, "optimizeDocConcentration", "(For online optimizer only, currently) Indicates whether the docConcentration" + @@ -285,20 +281,20 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getOptimizeDocConcentration: Boolean = $(optimizeDocConcentration) /** - * For EM optimizer only: [[optimizer]] = "em". - * - * If using checkpointing, this indicates whether to keep the last - * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can - * cause failures if a data partition is lost, so set this bit with care. - * Note that checkpoints will be cleaned up via reference counting, regardless. - * - * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and - * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints. - * - * Default: true - * - * @group expertParam - */ + * For EM optimizer only: [[optimizer]] = "em". + * + * If using checkpointing, this indicates whether to keep the last + * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can + * cause failures if a data partition is lost, so set this bit with care. + * Note that checkpoints will be cleaned up via reference counting, regardless. + * + * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and + * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints. + * + * Default: true + * + * @group expertParam + */ @Since("2.0.0") final val keepLastCheckpoint = new BooleanParam(this, "keepLastCheckpoint", "(For EM optimizer) If using checkpointing, this indicates whether to keep the last" + @@ -310,11 +306,11 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) /** - * Validates and transforms the input schema. - * - * @param schema input schema - * @return output schema - */ + * Validates and transforms the input schema. + * + * @param schema input schema + * @return output schema + */ protected def validateAndTransformSchema(schema: StructType): StructType = { if (isSet(docConcentration)) { if (getDocConcentration.length != 1) { @@ -358,6 +354,25 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM new OldEMLDAOptimizer() .setKeepLastCheckpoint($(keepLastCheckpoint)) } + + def extractTopicDistributionCol(metadata: Metadata, model: Params): Unit = + { + implicit val format = DefaultFormats + metadata.params match { + case JObject(pairs) => + pairs.foreach { case (paramName, jsonValue) => + val origParam = + if (paramName == "topicDistribution") "topicDistributionCol" else paramName + val param = model.getParam(origParam) + val value = param.jsonDecode(compact(render(jsonValue))) + model.set(param, value) + } + case _ => + throw new IllegalArgumentException( + s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") + } + } + } @@ -546,8 +561,8 @@ class LocalLDAModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new LocalLDAModel.LocalLDAModelWriter(this) -} +} @Since("1.6.0") object LocalLDAModel extends MLReadable[LocalLDAModel] { @@ -596,20 +611,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) metadata.sparkVersion match { case "1.6" => - implicit val format = DefaultFormats - metadata.params match { - case JObject(pairs) => - pairs.foreach { case (paramName, jsonValue) => - val origParam = - if (paramName == "topicDistribution") "topicDistributionCol" else paramName - val param = model.getParam(origParam) - val value = param.jsonDecode(compact(render(jsonValue))) - model.set(param, value) - } - case _ => - throw new IllegalArgumentException( - s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") - } + model.extractTopicDistributionCol(metadata, model) case versionRegex => DefaultParamsReader.getAndSetParams(model, metadata) } @@ -767,20 +769,7 @@ object DistributedLDAModel extends MLReadable[DistributedLDAModel] { oldModel, sparkSession, None) metadata.sparkVersion match { case "1.6" => - implicit val format = DefaultFormats - metadata.params match { - case JObject(pairs) => - pairs.foreach { case (paramName, jsonValue) => - val origParam = - if (paramName == "topicDistribution") "topicDistributionCol" else paramName - val param = model.getParam(origParam) - val value = param.jsonDecode(compact(render(jsonValue))) - model.set(param, value) - } - case _ => - throw new IllegalArgumentException( - s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") - } + model.extractTopicDistributionCol(metadata, model) case pattern => DefaultParamsReader.getAndSetParams(model, metadata) } @@ -929,6 +918,7 @@ class LDA @Since("1.6.0") ( override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } + } @Since("2.0.0") @@ -939,7 +929,7 @@ object LDA extends DefaultParamsReadable[LDA] { dataset: Dataset[_], featuresCol: String): RDD[(Long, OldVector)] = { dataset - .withColumn("docId", monotonically_increasing_id()) + .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) .rdd .map { case Row(docId: Long, features: Vector) => @@ -947,6 +937,29 @@ object LDA extends DefaultParamsReadable[LDA] { } } + private class LDAReader extends MLReader[LDA] { + + private val className = classOf[LDA].getName + + override def load(path: String): LDA = { + + // Pattern to determine sparkversion + val pattern = + """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val model = new LDA(metadata.uid) + metadata.sparkVersion match { + case "1.6" => + model.extractTopicDistributionCol(metadata, model) + case pattern => + DefaultParamsReader.getAndSetParams(model, metadata) + } + model + } + } + + @Since("2.0.0") override def load(path: String): LDA = super.load(path) } From b16b3682a50486a81913ce5b5b67711d73b7589f Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Tue, 19 Jul 2016 18:15:28 -0700 Subject: [PATCH 9/9] Style issues --- .../org/apache/spark/ml/clustering/LDA.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 3f16392bf3f1..a46f1144fb0b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -21,14 +21,15 @@ import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonAST.JObject import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Matrix, Vector, VectorUDT, Vectors} +import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} -import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.impl.PeriodicCheckpointer import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors} @@ -44,7 +45,7 @@ import org.apache.spark.sql.types.StructType private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter with HasSeed with HasCheckpointInterval { - /** + /* * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. * * @group param @@ -57,7 +58,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getK: Int = $(k) - /** + /* * Concentration parameter (commonly named "alpha") for the prior placed on documents' * distributions over topics ("theta"). * @@ -101,7 +102,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM } } - /** + /* * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' * distributions over terms. * @@ -147,7 +148,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") final val supportedOptimizers: Array[String] = Array("online", "em") - /** + /* * Optimizer or inference algorithm used to estimate the LDA model. * Currently supported (case-insensitive): * - "online": Online Variational Bayes (default) @@ -174,7 +175,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getOptimizer: String = $(optimizer) - /** + /* * Output column with estimates of the topic mixture distribution for each document (often called * "theta" in the literature). Returns a vector of zeros for an empty document. * @@ -195,7 +196,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getTopicDistributionCol: String = $(topicDistributionCol) - /** + /* * For Online optimizer only: [[optimizer]] = "online". * * A (positive) learning parameter that downweights early iterations. Larger values make early @@ -215,7 +216,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getLearningOffset: Double = $(learningOffset) - /** + /* * For Online optimizer only: [[optimizer]] = "online". * * Learning rate, set as an exponential decay rate. @@ -234,7 +235,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getLearningDecay: Double = $(learningDecay) - /** + /* * For Online optimizer only: [[optimizer]] = "online". * * Fraction of the corpus to be sampled and used in each iteration of mini-batch @@ -261,7 +262,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getSubsamplingRate: Double = $(subsamplingRate) - /** + /* * For Online optimizer only (currently): [[optimizer]] = "online". * * Indicates whether the docConcentration (Dirichlet parameter for @@ -280,7 +281,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getOptimizeDocConcentration: Boolean = $(optimizeDocConcentration) - /** + /* * For EM optimizer only: [[optimizer]] = "em". * * If using checkpointing, this indicates whether to keep the last @@ -305,7 +306,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("2.0.0") def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) - /** + /* * Validates and transforms the input schema. * * @param schema input schema