From 9d157bdc585ec147749dbe7622e04ae26f3f5734 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 28 Apr 2016 16:23:31 -0700 Subject: [PATCH 01/12] add GMM scala example and document --- docs/ml-clustering.md | 85 ++++++++++++++++ .../ml/JavaGaussianMixtureExample.java | 96 +++++++++++++++++++ .../python/ml/gaussian_mixture_example.py | 75 +++++++++++++++ .../examples/ml/GaussianMixtureExample.scala | 79 +++++++++++++++ 4 files changed, 335 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java create mode 100644 examples/src/main/python/ml/gaussian_mixture_example.py create mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index a0955a3855ce..4577db59accc 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -148,3 +148,88 @@ Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering. {% include_example python/ml/bisecting_k_means_example.py %} + +## Gaussian Mixture Model (GMM) + +A [Gaussian Mixture Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) +represents a composite distribution whereby points are drawn from one of *k* Gaussian sub-distributions, +each with its own probability. The spark.ml implementation uses the +[expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) +algorithm to induce the maximum-likelihood model given a set of samples. + +`GaussianMixture` is implemented as an `Estimator` and generates a `GaussianMixtureModel` as the base +model. + +### Input Columns + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescription
featuresColVector"features"Feature vector
+ +### Output Columns + + + + + + + + + + + + + + + + + + + + + + + + +
Param nameType(s)DefaultDescription
predictionColInt"prediction"Predicted cluster center
probabilityColVector"probability"Probability of each cluster
+ + +### Example + +
+ +
+Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.clustering.GaussianMixture) for more details. + +{% include_example scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala %} +
+ +
+Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/GaussianMixture.html) for more details. + +{% include_example java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java %} +
+ +
+Refer to the [Python docs](api/python/pyspark.ml.html#pyspark.ml.clustering.GaussianMixture) for more details. + +{% include_example python/ml/gaussian_mixture_example.py %} +
+
+ + diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java new file mode 100644 index 000000000000..7a7bde739a44 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +import java.util.regex.Pattern; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +// $example on$ +import org.apache.spark.ml.clustering.GaussianMixture; +import org.apache.spark.ml.clustering.GaussianMixtureModel; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.VectorUDT; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +// $example off$ + + +/** + * An example demonstrating a Gaussian Mixture Model. + * Run with + *
+ * bin/run-example ml.JavaGaussianMixtureExample
+ * 
+ */ +public class JavaGaussianMixtureExample { + + private static class ParsePoint implements Function { + private static final Pattern separator = Pattern.compile(" "); + + @Override + public Row call(String line) { + String[] tok = separator.split(line.trim()); + double[] point = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + point[i] = Double.parseDouble(tok[i]); + } + Vector[] points = {Vectors.dense(point)}; + return new GenericRow(points); + } + } + + public static void main(String[] args) { + + String inputFile = "data/mllib/gmm_data.txt"; + int k = 2; + + // Parses the arguments + SparkSession spark = SparkSession + .builder() + .appName("JavaGaussianMixtureExample") + .getOrCreate(); + + // $example on$ + // Loads data + JavaRDD points = spark.read().text(inputFile).javaRDD().map(new ParsePoint()); + StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())}; + StructType schema = new StructType(fields); + Dataset dataset = spark.createDataFrame(points, schema); + + // Trains a GaussianMixture model + GaussianMixture gmm = new GaussianMixture() + .setK(k); + GaussianMixtureModel model = gmm.fit(dataset); + + // Output the parameters of the mixture model + for (int j = 0; j < model.getK(); j++) { + System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n", + model.weights()[j], model.gaussians()[j].mean(), model.gaussians()[j].cov()); + } + // $example off$ + + spark.stop(); + } +} diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py new file mode 100644 index 000000000000..e08f6d2620eb --- /dev/null +++ b/examples/src/main/python/ml/gaussian_mixture_example.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +import sys + +import numpy as np +# $example on$ +from pyspark import SparkContext +from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel +from pyspark.mllib.linalg import VectorUDT, _convert_to_vector +from pyspark.sql import SQLContext +from pyspark.sql.types import Row, StructField, StructType +# $example off$ + +""" +A simple example demonstrating a Gaussian Mixture Model (GMM). +Run with: + bin/spark-submit examples/src/main/python/ml/gaussian_mixture_example.py +""" + + +def parseVector(line): + line_strip = line.strip() + array = np.array([float(x) for x in line_strip.split(' ')]) + return _convert_to_vector(array) + + +if __name__ == "__main__": + + FEATURES_COL = "features" + + path = "data/mllib/gmm_data.txt" + k = 2 + + sc = SparkContext(appName="PythonGuassianMixtureExample") + sqlContext = SQLContext(sc) + + # $example on$ + lines = sc.textFile(path) + data = lines.map(parseVector) + row_rdd = data.map(lambda x: Row(x)) + schema = StructType([StructField("features", VectorUDT(), False)]) + df = sqlContext.createDataFrame(row_rdd, schema) + + gmm = GaussianMixture().setK(k).setSeed(10).setFeaturesCol("features") + model = gmm.fit(df) + + print("Gaussians: ") + model.gaussiansDF.show() + + transformed = model.transform(df).select("prediction") + rows = transformed.collect() + + print("Prediction: ") + for row in rows: + print(row) + # $example off$ + + sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala new file mode 100644 index 000000000000..752924a6345b --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml + +// scalastyle:off println + +import org.apache.spark.{SparkConf, SparkContext} +// $example on$ +import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureSummary} +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.sql.{DataFrame, SQLContext} +// $example off$ + +/** + * An example demonstrating Gaussian Mixture Model (GMM). + * Run with + * {{{ + * bin/run-example ml.GaussianMixtureExample + * }}} + */ +object GaussianMixtureExample { + def main(args: Array[String]): Unit = { + // Creates a Spark context and a SQL context + val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + + // $example on$ + // Crates a DataFrame + val dataset: DataFrame = sqlContext.createDataFrame(Seq( + (1, Vectors.dense(0.0, 0.0, 0.0)), + (2, Vectors.dense(0.1, 0.1, 0.1)), + (3, Vectors.dense(0.2, 0.2, 0.2)), + (4, Vectors.dense(9.0, 9.0, 9.0)), + (5, Vectors.dense(9.1, 9.1, 9.1)), + (6, Vectors.dense(9.2, 9.2, 9.2)) + )).toDF("id", "features") + + // Trains Gaussian Mixture Model + val gmm = new GaussianMixture() + .setK(2) + .setFeaturesCol("features") + .setPredictionCol("prediction") + .setTol(0.0001) + .setMaxIter(10) + .setSeed(10) + val model = gmm.fit(dataset) + + // Shows the result + val summary: GaussianMixtureSummary = model.summary + println("Size of (number of data points in) each cluster: ") + println(summary.clusterSizes) + + println("Cluster centers of the transformed data:") + summary.cluster.show() + + println("Probability of each cluster:") + summary.probability.show() + // $example off$ + + sc.stop() + } +} +// scalastyle:on println \ No newline at end of file From 02cace33263cca4c9bc4539377edc3d84e077b93 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 29 Apr 2016 12:21:26 -0700 Subject: [PATCH 02/12] add Java example and revise the document --- .../apache/spark/examples/ml/JavaGaussianMixtureExample.java | 5 +++++ .../apache/spark/examples/ml/GaussianMixtureExample.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java index 7a7bde739a44..5e30297598d2 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -19,8 +19,13 @@ import java.util.regex.Pattern; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.expressions.GenericRow; // $example on$ import org.apache.spark.ml.clustering.GaussianMixture; import org.apache.spark.ml.clustering.GaussianMixtureModel; diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index 752924a6345b..f65765dd6c52 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -64,7 +64,7 @@ object GaussianMixtureExample { // Shows the result val summary: GaussianMixtureSummary = model.summary println("Size of (number of data points in) each cluster: ") - println(summary.clusterSizes) + println(summary.clusterSizes.foreach(println)) println("Cluster centers of the transformed data:") summary.cluster.show() From 323efeb4a84fc741e54d5c54e963890fca1c30bd Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 29 Apr 2016 14:07:19 -0700 Subject: [PATCH 03/12] add python example and add python source in doc --- docs/ml-clustering.md | 2 -- .../org/apache/spark/examples/ml/GaussianMixtureExample.scala | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 4577db59accc..605b519e8d02 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -231,5 +231,3 @@ Refer to the [Python docs](api/python/pyspark.ml.html#pyspark.ml.clustering.Gaus {% include_example python/ml/gaussian_mixture_example.py %} - - diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index f65765dd6c52..fd294df55fbb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -76,4 +76,4 @@ object GaussianMixtureExample { sc.stop() } } -// scalastyle:on println \ No newline at end of file +// scalastyle:on println From 179167a5d87ffae796d982d705e087a67b6d5138 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 6 May 2016 10:28:26 -0700 Subject: [PATCH 04/12] commit for updating upstream --- .../org/apache/spark/examples/ml/GaussianMixtureExample.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index fd294df55fbb..ba4b6356f42b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -22,7 +22,7 @@ package org.apache.spark.examples.ml import org.apache.spark.{SparkConf, SparkContext} // $example on$ import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureSummary} -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} import org.apache.spark.sql.{DataFrame, SQLContext} // $example off$ From e8b78355a2a5b0209976122e2eaf78e5aee9b541 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 6 May 2016 12:14:06 -0700 Subject: [PATCH 05/12] address review comments --- .../ml/JavaGaussianMixtureExample.java | 5 --- .../examples/ml/GaussianMixtureExample.scala | 40 ++++++++----------- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java index 5e30297598d2..7a7bde739a44 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -19,13 +19,8 @@ import java.util.regex.Pattern; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.catalyst.expressions.GenericRow; // $example on$ import org.apache.spark.ml.clustering.GaussianMixture; import org.apache.spark.ml.clustering.GaussianMixtureModel; diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index ba4b6356f42b..c99acee6db48 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -21,9 +21,10 @@ package org.apache.spark.examples.ml import org.apache.spark.{SparkConf, SparkContext} // $example on$ -import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureSummary} +import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel} import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types.{StructField, StructType} // $example off$ /** @@ -35,21 +36,16 @@ import org.apache.spark.sql.{DataFrame, SQLContext} */ object GaussianMixtureExample { def main(args: Array[String]): Unit = { - // Creates a Spark context and a SQL context - val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}") - val sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) + // Creates a SparkSession + val spark = SparkSession.builder.appName(s"${this.getClass.getSimpleName}").getOrCreate() + val input = "data/mllib/gmm_data.txt" // $example on$ // Crates a DataFrame - val dataset: DataFrame = sqlContext.createDataFrame(Seq( - (1, Vectors.dense(0.0, 0.0, 0.0)), - (2, Vectors.dense(0.1, 0.1, 0.1)), - (3, Vectors.dense(0.2, 0.2, 0.2)), - (4, Vectors.dense(9.0, 9.0, 9.0)), - (5, Vectors.dense(9.1, 9.1, 9.1)), - (6, Vectors.dense(9.2, 9.2, 9.2)) - )).toDF("id", "features") + val rowRDD = spark.read.text(input).rdd.filter(_.nonEmpty) + .map(_.trim.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_)) + val schema = StructType(Array(StructField("features", new VectorUDT, false))) + val dataset: DataFrame = spark.createDataFrame(rowRDD, schema) // Trains Gaussian Mixture Model val gmm = new GaussianMixture() @@ -61,19 +57,15 @@ object GaussianMixtureExample { .setSeed(10) val model = gmm.fit(dataset) - // Shows the result - val summary: GaussianMixtureSummary = model.summary - println("Size of (number of data points in) each cluster: ") - println(summary.clusterSizes.foreach(println)) - println("Cluster centers of the transformed data:") - summary.cluster.show() - - println("Probability of each cluster:") - summary.probability.show() + // output parameters of max-likelihood model + for (i <- 0 until model.getK) { + println("weight=%f\nmu=%s\nsigma=\n%s\n" format + (model.weights(i), model.gaussians(i).mean, model.gaussians(i).cov)) + } // $example off$ - sc.stop() + spark.stop() } } // scalastyle:on println From 91b17c4f546f60a9ab55aa5cfa49568eea509987 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 11:34:58 -0700 Subject: [PATCH 06/12] fix typo --- .../org/apache/spark/examples/ml/GaussianMixtureExample.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index c99acee6db48..8a762009d9f9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -41,7 +41,7 @@ object GaussianMixtureExample { val input = "data/mllib/gmm_data.txt" // $example on$ - // Crates a DataFrame + // Creates a DataFrame val rowRDD = spark.read.text(input).rdd.filter(_.nonEmpty) .map(_.trim.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_)) val schema = StructType(Array(StructField("features", new VectorUDT, false))) From c8b6d2364ffa160a816d6dae43c06fa08631248a Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 14:23:29 -0700 Subject: [PATCH 07/12] change data and simplify the example --- .../ml/JavaGaussianMixtureExample.java | 36 ++--------------- .../python/ml/gaussian_mixture_example.py | 39 +++++-------------- .../examples/ml/GaussianMixtureExample.scala | 12 ++---- 3 files changed, 16 insertions(+), 71 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java index 7a7bde739a44..fa7fab27b424 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -17,24 +17,13 @@ package org.apache.spark.examples.ml; -import java.util.regex.Pattern; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; // $example on$ import org.apache.spark.ml.clustering.GaussianMixture; import org.apache.spark.ml.clustering.GaussianMixtureModel; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.VectorUDT; -import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; // $example off$ +import org.apache.spark.sql.SparkSession; /** @@ -46,24 +35,8 @@ */ public class JavaGaussianMixtureExample { - private static class ParsePoint implements Function { - private static final Pattern separator = Pattern.compile(" "); - - @Override - public Row call(String line) { - String[] tok = separator.split(line.trim()); - double[] point = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { - point[i] = Double.parseDouble(tok[i]); - } - Vector[] points = {Vectors.dense(point)}; - return new GenericRow(points); - } - } - public static void main(String[] args) { - String inputFile = "data/mllib/gmm_data.txt"; int k = 2; // Parses the arguments @@ -73,11 +46,8 @@ public static void main(String[] args) { .getOrCreate(); // $example on$ - // Loads data - JavaRDD points = spark.read().text(inputFile).javaRDD().map(new ParsePoint()); - StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())}; - StructType schema = new StructType(fields); - Dataset dataset = spark.createDataFrame(points, schema); + // Load data + Dataset dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt"); // Trains a GaussianMixture model GaussianMixture gmm = new GaussianMixture() diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py index e08f6d2620eb..9d8978823e3e 100644 --- a/examples/src/main/python/ml/gaussian_mixture_example.py +++ b/examples/src/main/python/ml/gaussian_mixture_example.py @@ -17,16 +17,10 @@ from __future__ import print_function -import sys - -import numpy as np # $example on$ -from pyspark import SparkContext from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel -from pyspark.mllib.linalg import VectorUDT, _convert_to_vector -from pyspark.sql import SQLContext -from pyspark.sql.types import Row, StructField, StructType # $example off$ +from pyspark.sql import SparkSession """ A simple example demonstrating a Gaussian Mixture Model (GMM). @@ -34,37 +28,24 @@ bin/spark-submit examples/src/main/python/ml/gaussian_mixture_example.py """ - -def parseVector(line): - line_strip = line.strip() - array = np.array([float(x) for x in line_strip.split(' ')]) - return _convert_to_vector(array) - - if __name__ == "__main__": + spark = SparkSession\ + .builder\ + .appName("PythonGuassianMixtureExample")\ + .getOrCreate() - FEATURES_COL = "features" - - path = "data/mllib/gmm_data.txt" k = 2 - - sc = SparkContext(appName="PythonGuassianMixtureExample") - sqlContext = SQLContext(sc) - # $example on$ - lines = sc.textFile(path) - data = lines.map(parseVector) - row_rdd = data.map(lambda x: Row(x)) - schema = StructType([StructField("features", VectorUDT(), False)]) - df = sqlContext.createDataFrame(row_rdd, schema) + # load data + dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt") gmm = GaussianMixture().setK(k).setSeed(10).setFeaturesCol("features") - model = gmm.fit(df) + model = gmm.fit(dataset) print("Gaussians: ") model.gaussiansDF.show() - transformed = model.transform(df).select("prediction") + transformed = model.transform(dataset).select("prediction") rows = transformed.collect() print("Prediction: ") @@ -72,4 +53,4 @@ def parseVector(line): print(row) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index 8a762009d9f9..5d3f14a9d3ac 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -22,9 +22,7 @@ package org.apache.spark.examples.ml import org.apache.spark.{SparkConf, SparkContext} // $example on$ import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel} -import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.SparkSession // $example off$ /** @@ -38,14 +36,10 @@ object GaussianMixtureExample { def main(args: Array[String]): Unit = { // Creates a SparkSession val spark = SparkSession.builder.appName(s"${this.getClass.getSimpleName}").getOrCreate() - val input = "data/mllib/gmm_data.txt" // $example on$ - // Creates a DataFrame - val rowRDD = spark.read.text(input).rdd.filter(_.nonEmpty) - .map(_.trim.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_)) - val schema = StructType(Array(StructField("features", new VectorUDT, false))) - val dataset: DataFrame = spark.createDataFrame(rowRDD, schema) + // Load data + val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt") // Trains Gaussian Mixture Model val gmm = new GaussianMixture() From 06253801f380e6ebab06f3e4bdb517c34d6b0cb3 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 16:28:47 -0700 Subject: [PATCH 08/12] hard code k and make copy & paste example work in pyspark shell --- examples/src/main/python/ml/gaussian_mixture_example.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py index 9d8978823e3e..2390942b5b36 100644 --- a/examples/src/main/python/ml/gaussian_mixture_example.py +++ b/examples/src/main/python/ml/gaussian_mixture_example.py @@ -34,12 +34,11 @@ .appName("PythonGuassianMixtureExample")\ .getOrCreate() - k = 2 # $example on$ # load data dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt") - gmm = GaussianMixture().setK(k).setSeed(10).setFeaturesCol("features") + gmm = GaussianMixture().setK(2).setSeed(10).setFeaturesCol("features") model = gmm.fit(dataset) print("Gaussians: ") From 9392430e42a89e00f0e87843f6169fb6d7713c99 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 16:31:33 -0700 Subject: [PATCH 09/12] hard code k in java code --- .../apache/spark/examples/ml/JavaGaussianMixtureExample.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java index fa7fab27b424..4636f6005a8c 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -37,8 +37,6 @@ public class JavaGaussianMixtureExample { public static void main(String[] args) { - int k = 2; - // Parses the arguments SparkSession spark = SparkSession .builder() @@ -51,7 +49,7 @@ public static void main(String[] args) { // Trains a GaussianMixture model GaussianMixture gmm = new GaussianMixture() - .setK(k); + .setK(2); GaussianMixtureModel model = gmm.fit(dataset); // Output the parameters of the mixture model From 09407df75fbc5e7df59b796a342fece311773044 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 16:32:49 -0700 Subject: [PATCH 10/12] remove extra blank line --- .../org/apache/spark/examples/ml/GaussianMixtureExample.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala index 5d3f14a9d3ac..de700f4bb488 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala @@ -51,7 +51,6 @@ object GaussianMixtureExample { .setSeed(10) val model = gmm.fit(dataset) - // output parameters of max-likelihood model for (i <- 0 until model.getK) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format From f2d35dd0ff88656fc4d76b17df2d9be40c8e4acd Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 12 May 2016 23:05:13 -0700 Subject: [PATCH 11/12] address review comments --- docs/ml-clustering.md | 2 +- .../examples/ml/JavaGaussianMixtureExample.java | 10 +++++----- .../main/python/ml/gaussian_mixture_example.py | 15 ++++----------- .../examples/ml/GaussianMixtureExample.scala | 12 +++--------- 4 files changed, 13 insertions(+), 26 deletions(-) diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 605b519e8d02..e766ead22053 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -226,7 +226,7 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/GaussianMix
-Refer to the [Python docs](api/python/pyspark.ml.html#pyspark.ml.clustering.GaussianMixture) for more details. +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.GaussianMixture) for more details. {% include_example python/ml/gaussian_mixture_example.py %}
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java index 4636f6005a8c..79b99095815a 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java @@ -27,7 +27,7 @@ /** - * An example demonstrating a Gaussian Mixture Model. + * An example demonstrating Gaussian Mixture Model. * Run with *
  * bin/run-example ml.JavaGaussianMixtureExample
@@ -37,14 +37,14 @@ public class JavaGaussianMixtureExample {
 
   public static void main(String[] args) {
 
-    // Parses the arguments
+    // Creates a SparkSession 
     SparkSession spark = SparkSession
             .builder()
             .appName("JavaGaussianMixtureExample")
             .getOrCreate();
 
     // $example on$
-    // Load data
+    // Loads data
     Dataset dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
 
     // Trains a GaussianMixture model
@@ -53,9 +53,9 @@ public static void main(String[] args) {
     GaussianMixtureModel model = gmm.fit(dataset);
 
     // Output the parameters of the mixture model
-    for (int j = 0; j < model.getK(); j++) {
+    for (int i = 0; i < model.getK(); i++) {
       System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
-              model.weights()[j], model.gaussians()[j].mean(), model.gaussians()[j].cov());
+              model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
     }
     // $example off$
 
diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py
index 2390942b5b36..f5775c92d08b 100644
--- a/examples/src/main/python/ml/gaussian_mixture_example.py
+++ b/examples/src/main/python/ml/gaussian_mixture_example.py
@@ -18,12 +18,12 @@
 from __future__ import print_function
 
 # $example on$
-from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel
+from pyspark.ml.clustering import GaussianMixture
 # $example off$
 from pyspark.sql import SparkSession
 
 """
-A simple example demonstrating a Gaussian Mixture Model (GMM).
+A simple example demonstrating Gaussian Mixture Model (GMM).
 Run with:
   bin/spark-submit examples/src/main/python/ml/gaussian_mixture_example.py
 """
@@ -35,21 +35,14 @@
         .getOrCreate()
 
     # $example on$
-    # load data
+    # loads data
     dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
 
-    gmm = GaussianMixture().setK(2).setSeed(10).setFeaturesCol("features")
+    gmm = GaussianMixture().setK(2).setSeed(10)
     model = gmm.fit(dataset)
 
     print("Gaussians: ")
     model.gaussiansDF.show()
-
-    transformed = model.transform(dataset).select("prediction")
-    rows = transformed.collect()
-
-    print("Prediction: ")
-    for row in rows:
-        print(row)
     # $example off$
 
     spark.stop()
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala
index de700f4bb488..c484ee55569b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala
@@ -19,9 +19,8 @@ package org.apache.spark.examples.ml
 
 // scalastyle:off println
 
-import org.apache.spark.{SparkConf, SparkContext}
 // $example on$
-import org.apache.spark.ml.clustering.{GaussianMixture, GaussianMixtureModel}
+import org.apache.spark.ml.clustering.GaussianMixture
 import org.apache.spark.sql.SparkSession
 // $example off$
 
@@ -38,20 +37,15 @@ object GaussianMixtureExample {
     val spark = SparkSession.builder.appName(s"${this.getClass.getSimpleName}").getOrCreate()
 
     // $example on$
-    // Load data
+    // Loads data
     val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
 
     // Trains Gaussian Mixture Model
     val gmm = new GaussianMixture()
       .setK(2)
-      .setFeaturesCol("features")
-      .setPredictionCol("prediction")
-      .setTol(0.0001)
-      .setMaxIter(10)
-      .setSeed(10)
     val model = gmm.fit(dataset)
 
-    // output parameters of max-likelihood model
+    // output parameters of mixture model model
     for (i <- 0 until model.getK) {
       println("weight=%f\nmu=%s\nsigma=\n%s\n" format
         (model.weights(i), model.gaussians(i).mean, model.gaussians(i).cov))

From f56344b026dfe11ce10d03ac7c222de68668be92 Mon Sep 17 00:00:00 2001
From: "wm624@hotmail.com" 
Date: Mon, 16 May 2016 09:35:34 -0700
Subject: [PATCH 12/12] address review comments

---
 docs/ml-clustering.md                                   | 3 +--
 examples/src/main/python/ml/gaussian_mixture_example.py | 2 +-
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md
index e766ead22053..33e4b7b0d2cc 100644
--- a/docs/ml-clustering.md
+++ b/docs/ml-clustering.md
@@ -153,7 +153,7 @@ Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.
 
 A [Gaussian Mixture Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model)
 represents a composite distribution whereby points are drawn from one of *k* Gaussian sub-distributions,
-each with its own probability. The spark.ml implementation uses the
+each with its own probability. The `spark.ml` implementation uses the
 [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
 algorithm to induce the maximum-likelihood model given a set of samples.
 
@@ -208,7 +208,6 @@ model.
   
 
 
-
 ### Example
 
 
diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py index f5775c92d08b..2ca13d68f689 100644 --- a/examples/src/main/python/ml/gaussian_mixture_example.py +++ b/examples/src/main/python/ml/gaussian_mixture_example.py @@ -38,7 +38,7 @@ # loads data dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt") - gmm = GaussianMixture().setK(2).setSeed(10) + gmm = GaussianMixture().setK(2) model = gmm.fit(dataset) print("Gaussians: ")