diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 034f2c3fa2fd..a46f1144fb0b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -18,6 +18,9 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging @@ -26,29 +29,27 @@ import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, - EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, - LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, - OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.ml.util.DefaultParamsReader.Metadata +import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.impl.PeriodicCheckpointer -import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, - Vectors => OldVectors} +import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} +import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} import org.apache.spark.sql.types.StructType private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter with HasSeed with HasCheckpointInterval { - /** - * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. - * - * @group param - */ + /* + * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. + * + * @group param + */ @Since("1.6.0") final val k = new IntParam(this, "k", "The number of topics (clusters) to infer. " + "Must be > 1.", ParamValidators.gt(1)) @@ -57,31 +58,32 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getK: Int = $(k) - /** - * Concentration parameter (commonly named "alpha") for the prior placed on documents' - * distributions over topics ("theta"). - * - * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing - * (more regularization). - * - * If not set by the user, then docConcentration is set automatically. If set to - * singleton vector [alpha], then alpha is replicated to a vector of length k in fitting. - * Otherwise, the [[docConcentration]] vector must be length k. - * (default = automatic) - * - * Optimizer-specific parameter settings: - * - EM - * - Currently only supports symmetric distributions, so all values in the vector should be - * the same. - * - Values should be > 1.0 - * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows - * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - Online - * - Values should be >= 0 - * - default = uniformly (1.0 / k), following the implementation from - * [[https://github.com/Blei-Lab/onlineldavb]]. - * @group param - */ + /* + * Concentration parameter (commonly named "alpha") for the prior placed on documents' + * distributions over topics ("theta"). + * + * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing + * (more regularization). + * + * If not set by the user, then docConcentration is set automatically. If set to + * singleton vector [alpha], then alpha is replicated to a vector of length k in fitting. + * Otherwise, the [[docConcentration]] vector must be length k. + * (default = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Currently only supports symmetric distributions, so all values in the vector should be + * the same. + * - Values should be > 1.0 + * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * from Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Values should be >= 0 + * - default = uniformly (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * + * @group param + */ @Since("1.6.0") final val docConcentration = new DoubleArrayParam(this, "docConcentration", "Concentration parameter (commonly named \"alpha\") for the prior placed on documents'" + @@ -100,29 +102,30 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM } } - /** - * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' - * distributions over terms. - * - * This is the parameter to a symmetric Dirichlet distribution. - * - * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. - * - * If not set by the user, then topicConcentration is set automatically. - * (default = automatic) - * - * Optimizer-specific parameter settings: - * - EM - * - Value should be > 1.0 - * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows - * Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - Online - * - Value should be >= 0 - * - default = (1.0 / k), following the implementation from - * [[https://github.com/Blei-Lab/onlineldavb]]. - * @group param - */ + /* + * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics' + * distributions over terms. + * + * This is the parameter to a symmetric Dirichlet distribution. + * + * Note: The topics' distributions over terms are called "beta" in the original LDA paper + * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * + * If not set by the user, then topicConcentration is set automatically. + * (default = automatic) + * + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. + * + * @group param + */ @Since("1.6.0") final val topicConcentration = new DoubleParam(this, "topicConcentration", "Concentration parameter (commonly named \"beta\" or \"eta\") for the prior placed on topic'" + @@ -145,24 +148,24 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") final val supportedOptimizers: Array[String] = Array("online", "em") - /** - * Optimizer or inference algorithm used to estimate the LDA model. - * Currently supported (case-insensitive): - * - "online": Online Variational Bayes (default) - * - "em": Expectation-Maximization - * - * For details, see the following papers: - * - Online LDA: - * Hoffman, Blei and Bach. "Online Learning for Latent Dirichlet Allocation." - * Neural Information Processing Systems, 2010. - * [[http://www.cs.columbia.edu/~blei/papers/HoffmanBleiBach2010b.pdf]] - * - EM: - * Asuncion et al. "On Smoothing and Inference for Topic Models." - * Uncertainty in Artificial Intelligence, 2009. - * [[http://arxiv.org/pdf/1205.2662.pdf]] - * - * @group param - */ + /* + * Optimizer or inference algorithm used to estimate the LDA model. + * Currently supported (case-insensitive): + * - "online": Online Variational Bayes (default) + * - "em": Expectation-Maximization + * + * For details, see the following papers: + * - Online LDA: + * Hoffman, Blei and Bach. "Online Learning for Latent Dirichlet Allocation." + * Neural Information Processing Systems, 2010. + * [[http://www.cs.columbia.edu/~blei/papers/HoffmanBleiBach2010b.pdf]] + * - EM: + * Asuncion et al. "On Smoothing and Inference for Topic Models." + * Uncertainty in Artificial Intelligence, 2009. + * [[http://arxiv.org/pdf/1205.2662.pdf]] + * + * @group param + */ @Since("1.6.0") final val optimizer = new Param[String](this, "optimizer", "Optimizer or inference" + " algorithm used to estimate the LDA model. Supported: " + supportedOptimizers.mkString(", "), @@ -172,16 +175,16 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getOptimizer: String = $(optimizer) - /** - * Output column with estimates of the topic mixture distribution for each document (often called - * "theta" in the literature). Returns a vector of zeros for an empty document. - * - * This uses a variational approximation following Hoffman et al. (2010), where the approximate - * distribution is called "gamma." Technically, this method returns this approximation "gamma" - * for each document. - * - * @group param - */ + /* + * Output column with estimates of the topic mixture distribution for each document (often called + * "theta" in the literature). Returns a vector of zeros for an empty document. + * + * This uses a variational approximation following Hoffman et al. (2010), where the approximate + * distribution is called "gamma." Technically, this method returns this approximation "gamma" + * for each document. + * + * @group param + */ @Since("1.6.0") final val topicDistributionCol = new Param[String](this, "topicDistributionCol", "Output column" + " with estimates of the topic mixture distribution for each document (often called \"theta\"" + @@ -193,16 +196,16 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getTopicDistributionCol: String = $(topicDistributionCol) - /** - * For Online optimizer only: [[optimizer]] = "online". - * - * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less. - * This is called "tau0" in the Online LDA paper (Hoffman et al., 2010) - * Default: 1024, following Hoffman et al. - * - * @group expertParam - */ + /* + * For Online optimizer only: [[optimizer]] = "online". + * + * A (positive) learning parameter that downweights early iterations. Larger values make early + * iterations count less. + * This is called "tau0" in the Online LDA paper (Hoffman et al., 2010) + * Default: 1024, following Hoffman et al. + * + * @group expertParam + */ @Since("1.6.0") final val learningOffset = new DoubleParam(this, "learningOffset", "(For online optimizer)" + " A (positive) learning parameter that downweights early iterations. Larger values make early" + @@ -213,16 +216,16 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getLearningOffset: Double = $(learningOffset) - /** - * For Online optimizer only: [[optimizer]] = "online". - * - * Learning rate, set as an exponential decay rate. - * This should be between (0.5, 1.0] to guarantee asymptotic convergence. - * This is called "kappa" in the Online LDA paper (Hoffman et al., 2010). - * Default: 0.51, based on Hoffman et al. - * - * @group expertParam - */ + /* + * For Online optimizer only: [[optimizer]] = "online". + * + * Learning rate, set as an exponential decay rate. + * This should be between (0.5, 1.0] to guarantee asymptotic convergence. + * This is called "kappa" in the Online LDA paper (Hoffman et al., 2010). + * Default: 0.51, based on Hoffman et al. + * + * @group expertParam + */ @Since("1.6.0") final val learningDecay = new DoubleParam(this, "learningDecay", "(For online optimizer)" + " Learning rate, set as an exponential decay rate. This should be between (0.5, 1.0] to" + @@ -232,23 +235,23 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getLearningDecay: Double = $(learningDecay) - /** - * For Online optimizer only: [[optimizer]] = "online". - * - * Fraction of the corpus to be sampled and used in each iteration of mini-batch gradient descent, - * in range (0, 1]. - * - * Note that this should be adjusted in synch with [[LDA.maxIter]] - * so the entire corpus is used. Specifically, set both so that - * maxIterations * miniBatchFraction >= 1. - * - * Note: This is the same as the `miniBatchFraction` parameter in - * [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]]. - * - * Default: 0.05, i.e., 5% of total documents. - * - * @group param - */ + /* + * For Online optimizer only: [[optimizer]] = "online". + * + * Fraction of the corpus to be sampled and used in each iteration of mini-batch + * gradient descent, in range (0, 1]. + * + * Note that this should be adjusted in synch with [[LDA.maxIter]] + * so the entire corpus is used. Specifically, set both so that + * maxIterations * miniBatchFraction >= 1. + * + * Note: This is the same as the `miniBatchFraction` parameter in + * [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]]. + * + * Default: 0.05, i.e., 5% of total documents. + * + * @group param + */ @Since("1.6.0") final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "(For online optimizer)" + " Fraction of the corpus to be sampled and used in each iteration of mini-batch" + @@ -259,16 +262,16 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getSubsamplingRate: Double = $(subsamplingRate) - /** - * For Online optimizer only (currently): [[optimizer]] = "online". - * - * Indicates whether the docConcentration (Dirichlet parameter for - * document-topic distribution) will be optimized during training. - * Setting this to true will make the model more expressive and fit the training data better. - * Default: false - * - * @group expertParam - */ + /* + * For Online optimizer only (currently): [[optimizer]] = "online". + * + * Indicates whether the docConcentration (Dirichlet parameter for + * document-topic distribution) will be optimized during training. + * Setting this to true will make the model more expressive and fit the training data better. + * Default: false + * + * @group expertParam + */ @Since("1.6.0") final val optimizeDocConcentration = new BooleanParam(this, "optimizeDocConcentration", "(For online optimizer only, currently) Indicates whether the docConcentration" + @@ -278,21 +281,21 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("1.6.0") def getOptimizeDocConcentration: Boolean = $(optimizeDocConcentration) - /** - * For EM optimizer only: [[optimizer]] = "em". - * - * If using checkpointing, this indicates whether to keep the last - * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can - * cause failures if a data partition is lost, so set this bit with care. - * Note that checkpoints will be cleaned up via reference counting, regardless. - * - * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and - * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints. - * - * Default: true - * - * @group expertParam - */ + /* + * For EM optimizer only: [[optimizer]] = "em". + * + * If using checkpointing, this indicates whether to keep the last + * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can + * cause failures if a data partition is lost, so set this bit with care. + * Note that checkpoints will be cleaned up via reference counting, regardless. + * + * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and + * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints. + * + * Default: true + * + * @group expertParam + */ @Since("2.0.0") final val keepLastCheckpoint = new BooleanParam(this, "keepLastCheckpoint", "(For EM optimizer) If using checkpointing, this indicates whether to keep the last" + @@ -303,12 +306,12 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("2.0.0") def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) - /** - * Validates and transforms the input schema. - * - * @param schema input schema - * @return output schema - */ + /* + * Validates and transforms the input schema. + * + * @param schema input schema + * @return output schema + */ protected def validateAndTransformSchema(schema: StructType): StructType = { if (isSet(docConcentration)) { if (getDocConcentration.length != 1) { @@ -352,6 +355,25 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM new OldEMLDAOptimizer() .setKeepLastCheckpoint($(keepLastCheckpoint)) } + + def extractTopicDistributionCol(metadata: Metadata, model: Params): Unit = + { + implicit val format = DefaultFormats + metadata.params match { + case JObject(pairs) => + pairs.foreach { case (paramName, jsonValue) => + val origParam = + if (paramName == "topicDistribution") "topicDistributionCol" else paramName + val param = model.getParam(origParam) + val value = param.jsonDecode(compact(render(jsonValue))) + model.set(param, value) + } + case _ => + throw new IllegalArgumentException( + s"Cannot recognize JSON metadata: ${metadata.metadataJson}.") + } + } + } @@ -540,8 +562,8 @@ class LocalLDAModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new LocalLDAModel.LocalLDAModelWriter(this) -} +} @Since("1.6.0") object LocalLDAModel extends MLReadable[LocalLDAModel] { @@ -549,13 +571,6 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { private[LocalLDAModel] class LocalLDAModelWriter(instance: LocalLDAModel) extends MLWriter { - private case class Data( - vocabSize: Int, - topicsMatrix: Matrix, - docConcentration: Vector, - topicConcentration: Double, - gammaShape: Double) - override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) val oldModel = instance.oldLocalModel @@ -566,26 +581,41 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { } } + private case class Data( + vocabSize: Int, + topicsMatrix: Matrix, + docConcentration: Vector, + topicConcentration: Double, + gammaShape: Double) + private class LocalLDAModelReader extends MLReader[LocalLDAModel] { private val className = classOf[LocalLDAModel].getName override def load(path: String): LocalLDAModel = { + // Import implicits for Dataset Encoder + val sparkSession = super.sparkSession + import sparkSession.implicits._ + // Pattern to determine sparkversion + val versionRegex = """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) - .select("vocabSize", "topicsMatrix", "docConcentration", "topicConcentration", - "gammaShape") - .head() - val vocabSize = data.getAs[Int](0) - val topicsMatrix = data.getAs[Matrix](1) - val docConcentration = data.getAs[Vector](2) - val topicConcentration = data.getAs[Double](3) - val gammaShape = data.getAs[Double](4) + val vectorConverted = MLUtils.convertVectorColumnsToML(data, "docConcentration") + val Row(vocabSize: Int, topicsMatrix: Matrix, docConcentration: Vector, + topicConcentration: Double, gammaShape: Double) = MLUtils.convertMatrixColumnsToML( + vectorConverted, "topicsMatrix").select("vocabSize", "topicsMatrix", "docConcentration", + "topicConcentration", "gammaShape").head() val oldModel = new OldLocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) val model = new LocalLDAModel(metadata.uid, vocabSize, oldModel, sparkSession) - DefaultParamsReader.getAndSetParams(model, metadata) + metadata.sparkVersion match { + case "1.6" => + model.extractTopicDistributionCol(metadata, model) + case versionRegex => + DefaultParamsReader.getAndSetParams(model, metadata) + } model } } @@ -728,16 +758,27 @@ object DistributedLDAModel extends MLReadable[DistributedLDAModel] { private val className = classOf[DistributedLDAModel].getName override def load(path: String): DistributedLDAModel = { + + // Pattern to determine sparkversion + val pattern = + """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val modelPath = new Path(path, "oldModel").toString val oldModel = OldDistributedLDAModel.load(sc, modelPath) - val model = new DistributedLDAModel( - metadata.uid, oldModel.vocabSize, oldModel, sparkSession, None) - DefaultParamsReader.getAndSetParams(model, metadata) + val model = new DistributedLDAModel(metadata.uid, oldModel.vocabSize, + oldModel, sparkSession, None) + metadata.sparkVersion match { + case "1.6" => + model.extractTopicDistributionCol(metadata, model) + case pattern => + DefaultParamsReader.getAndSetParams(model, metadata) + } model } } + @Since("1.6.0") override def read: MLReader[DistributedLDAModel] = new DistributedLDAModelReader @@ -878,6 +919,7 @@ class LDA @Since("1.6.0") ( override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } + } @Since("2.0.0") @@ -888,7 +930,7 @@ object LDA extends DefaultParamsReadable[LDA] { dataset: Dataset[_], featuresCol: String): RDD[(Long, OldVector)] = { dataset - .withColumn("docId", monotonically_increasing_id()) + .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) .rdd .map { case Row(docId: Long, features: Vector) => @@ -896,6 +938,29 @@ object LDA extends DefaultParamsReadable[LDA] { } } + private class LDAReader extends MLReader[LDA] { + + private val className = classOf[LDA].getName + + override def load(path: String): LDA = { + + // Pattern to determine sparkversion + val pattern = + """\\d+.\\d+(.\\d+)?(-SNAPSHOT)?""".r + + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val model = new LDA(metadata.uid) + metadata.sparkVersion match { + case "1.6" => + model.extractTopicDistributionCol(metadata, model) + case pattern => + DefaultParamsReader.getAndSetParams(model, metadata) + } + model + } + } + + @Since("2.0.0") override def load(path: String): LDA = super.load(path) }