diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index fe6a37fd6dc3..4fc5a81b985a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -18,7 +18,10 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} @@ -457,6 +460,31 @@ sealed abstract class LDAModel private[ml] ( def describeTopics(): DataFrame = describeTopics(10) } +object LDAModel extends MLReadable[LDAModel] { + + private class LDAModelReader extends MLReader[LDAModel] { + override def load(path: String): LDAModel = { + val metadataPath = new Path(path, "metadata").toString + val metadata = parse(sc.textFile(metadataPath, 1).first()) + implicit val format = DefaultFormats + val className = (metadata \ "class").extract[String] + className match { + case c if className == classOf[LocalLDAModel].getName => + LocalLDAModel.load(path) + case c if className == classOf[DistributedLDAModel].getName => + DistributedLDAModel.load(path) + case _ => throw new SparkException(s"$className in $path is not a LDAModel") + } + } + } + + @Since("2.0.0") + override def read: MLReader[LDAModel] = new LDAModelReader + + @Since("2.0.0") + override def load(path: String): LDAModel = super.load(path) +} + /** * :: Experimental :: diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index baf9e5e7d1fd..0ec8975fed8f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -64,11 +64,10 @@ private[stat] object KolmogorovSmirnovTest extends Logging { */ def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = { val n = data.count().toDouble - val localData = data.sortBy(x => x).mapPartitions { part => - val partDiffs = oneSampleDifferences(part, n, cdf) // local distances - searchOneSampleCandidates(partDiffs) // candidates: local extrema - }.collect() - val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme + val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) => + val f = cdf(v) + math.max(f - i / n, (i + 1) / n - f) + }.max() evalOneSampleP(ksStat, n.toLong) } @@ -84,74 +83,6 @@ private[stat] object KolmogorovSmirnovTest extends Logging { testOneSample(data, cdf) } - /** - * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a - * partition - * @param partData `Iterator[Double]` 1 partition of a sorted RDD - * @param n `Double` the total size of the RDD - * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value - * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema - * in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF, - * the second element corresponds to empirical CDF - CDF. We can then search the resulting - * iterator for the minimum of the first and the maximum of the second element, and provide - * this as a partition's candidate extrema - */ - private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double) - : Iterator[(Double, Double)] = { - // zip data with index (within that partition) - // calculate local (unadjusted) empirical CDF and subtract CDF - partData.zipWithIndex.map { case (v, ix) => - // dp and dl are later adjusted by constant, when global info is available - val dp = (ix + 1) / n - val dl = ix / n - val cdfVal = cdf(v) - (dl - cdfVal, dp - cdfVal) - } - } - - /** - * Search the unadjusted differences in a partition and return the - * two extrema (furthest below and furthest above CDF), along with a count of elements in that - * partition - * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF - * and CDFin a partition, which come as a tuple of - * (empirical CDF - 1/N - CDF, empirical CDF - CDF) - * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements - */ - private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)]) - : Iterator[(Double, Double, Double)] = { - val initAcc = (Double.MaxValue, Double.MinValue, 0.0) - val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) => - (math.min(pMin, dl), math.max(pMax, dp), pCt + 1) - } - val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults) - results.iterator - } - - /** - * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after - * adjusting local extrema estimates from individual partitions with the amount of elements in - * preceding partitions - * @param localData `Array[(Double, Double, Double)]` A local array containing the collected - * results of `searchOneSampleCandidates` across all partitions - * @param n `Double`The size of the RDD - * @return The one-sample Kolmogorov Smirnov Statistic - */ - private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) - : Double = { - val initAcc = (Double.MinValue, 0.0) - // adjust differences based on the number of elements preceding it, which should provide - // the correct distance between empirical CDF and CDF - val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => - val adjConst = prevCt / n - val dist1 = math.abs(minCand + adjConst) - val dist2 = math.abs(maxCand + adjConst) - val maxVal = Array(prevMax, dist1, dist2).max - (maxVal, prevCt + ct) - } - results._1 - } - /** * A convenience function that allows running the KS test for 1 set of sample data against * a named distribution diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index dd3f4c6e5391..af5dc40f2776 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -17,12 +17,17 @@ package org.apache.spark.ml.clustering +import java.io.File + import org.apache.spark.SparkFunSuite +import org.apache.spark.graphx.Edge import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.mllib.clustering.{DistributedLDAModel, LocalLDAModel} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.util.Utils object LDASuite { @@ -261,4 +266,22 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead testEstimatorAndModelReadWrite(lda, dataset, LDASuite.allParamSettings ++ Map("optimizer" -> "em"), checkModelData) } + + test("load LDAModel") { + val lda = new LDA().setK(k).setSeed(1).setOptimizer("em").setMaxIter(2) + val distributedModel = lda.fit(dataset) + val localModel = lda.setOptimizer("online").fit(dataset) + + val tempDir1 = Utils.createTempDir() + val distributedPath = new File(tempDir1, "distributed").getPath + val localPath = new File(tempDir1, "local").getPath + try { + distributedModel.save(distributedPath) + localModel.save(localPath) + assert(LDAModel.load(distributedPath).isInstanceOf[DistributedLDAModel]) + assert(LDAModel.load(localPath).isInstanceOf[LocalLDAModel]) + } finally { + Utils.deleteRecursively(tempDir1) + } + } }