From a0798a7c28fea3651ca152817f4b49543d52245c Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 24 Jan 2016 17:57:18 +0800 Subject: [PATCH 1/4] Add Python API for spark.ml bisecting k-means --- python/pyspark/ml/clustering.py | 128 +++++++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 60d1c9aaec988..0c6aa163006c5 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -21,7 +21,7 @@ from pyspark.ml.param.shared import * from pyspark.mllib.common import inherit_doc -__all__ = ['KMeans', 'KMeansModel'] +__all__ = ['KMeans', 'KMeansModel', 'BisectingKMeans', 'BisectingKMeansModel'] class KMeansModel(JavaModel): @@ -170,6 +170,132 @@ def getInitSteps(self): return self.getOrDefault(self.initSteps) +class BisectingKMeansModel(JavaModel): + """ + Model fitted by BisectingKMeans. + + .. versionadded:: 2.0.0 + """ + + @since("2.0.0") + def clusterCenters(self): + """Get the cluster centers, represented as a list of NumPy arrays.""" + return [c.toArray() for c in self._call_java("clusterCenters")] + + @since("2.0.0") + def computeCost(self, dataset): + """ + Computes the sum of squared distances between the input points + and their corresponding cluster centers. + """ + return self._call_java("computeCost", dataset) + + +@inherit_doc +class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed): + """ + .. note:: Experimental + + A bisecting k-means algorithm based on the paper "A comparison of document clustering + techniques" by Steinbach, Karypis, and Kumar, with modification to fit Spark. + The algorithm starts from a single cluster that contains all points. + Iteratively it finds divisible clusters on the bottom level and bisects each of them using + k-means, until there are `k` leaf clusters in total or no leaf clusters are divisible. + The bisecting steps of clusters on the same level are grouped together to increase parallelism. + If bisecting all divisible clusters on the bottom level would result more than `k` leaf + clusters, larger clusters get higher priority. + + >>> from pyspark.mllib.linalg import Vectors + >>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), + ... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] + >>> df = sqlContext.createDataFrame(data, ["features"]) + >>> bkm = BisectingKMeans(k=2) + >>> model = bkm.fit(df) + >>> centers = model.clusterCenters() + >>> len(centers) + 2 + >>> model.computeCost(df) + 2.000... + >>> transformed = model.transform(df).select("features", "prediction") + >>> rows = transformed.collect() + >>> rows[0].prediction == rows[1].prediction + True + >>> rows[2].prediction == rows[3].prediction + True + + .. versionadded:: 2.0.0 + """ + + # a placeholder to make it appear in the generated doc + k = Param(Params._dummy(), "k", "number of clusters to create") + minDivisibleClusterSize = Param(Params._dummy(), "minDivisibleClusterSize", + "the minimum number of points (if >= 1.0) " + + "or the minimum proportion") + + @keyword_only + def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, + seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + """ + __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ + seed=-1888008604, k=4, minDivisibleClusterSize=1.0) + """ + super(BisectingKMeans, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans", + self.uid) + self.k = Param(self, "k", "number of clusters to create") + self.minDivisibleClusterSize = Param(self, "minDivisibleClusterSize", + "the minimum number of points (if >= 1.0) " + + "or the minimum proportion") + self._setDefault(maxIter=20, seed=-1888008604, k=4, minDivisibleClusterSize=1.0) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.0.0") + def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, + seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + """ + setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ + seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + Sets params for BisectingKMeans. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.0.0") + def setK(self, value): + """ + Sets the value of :py:attr:`k`. + """ + self._paramMap[self.k] = value + return self + + @since("2.0.0") + def getK(self): + """ + Gets the value of `k` or its default value. + """ + return self.getOrDefault(self.k) + + @since("2.0.0") + def setMinDivisibleClusterSize(self, value): + """ + Sets the value of :py:attr:`minDivisibleClusterSize`. + """ + self._paramMap[self.minDivisibleClusterSize] = value + return self + + @since("2.0.0") + def getMinDivisibleClusterSize(self): + """ + Gets the value of `minDivisibleClusterSize` or its default value. + """ + return self.getOrDefault(self.minDivisibleClusterSize) + + def _create_model(self, java_model): + return BisectingKMeansModel(java_model) + + if __name__ == "__main__": import doctest from pyspark.context import SparkContext From 57dc7f4f6da64b1a0419bc51f805cd11ec70f4e1 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 24 Jan 2016 18:08:13 +0800 Subject: [PATCH 2/4] Fix typos --- python/pyspark/ml/clustering.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 0c6aa163006c5..71dc69ace8b50 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -209,7 +209,7 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte >>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), ... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] >>> df = sqlContext.createDataFrame(data, ["features"]) - >>> bkm = BisectingKMeans(k=2) + >>> bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0) >>> model = bkm.fit(df) >>> centers = model.clusterCenters() >>> len(centers) @@ -256,7 +256,7 @@ def setParams(self, featuresCol="features", predictionCol="prediction", maxIter= seed=-1888008604, k=4, minDivisibleClusterSize=1.0): """ setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + seed=-1888008604, k=4, minDivisibleClusterSize=1.0) Sets params for BisectingKMeans. """ kwargs = self.setParams._input_kwargs From 10a3ac46ba91fe850ed7ac67643d5d575b645f26 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 25 Jan 2016 13:04:46 +0800 Subject: [PATCH 3/4] fix seed --- python/pyspark/ml/clustering.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 71dc69ace8b50..a7a448511ddb1 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -234,10 +234,10 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte @keyword_only def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, - seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + seed=None, k=4, minDivisibleClusterSize=1.0): """ __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=-1888008604, k=4, minDivisibleClusterSize=1.0) + seed=None, k=4, minDivisibleClusterSize=1.0) """ super(BisectingKMeans, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans", @@ -246,17 +246,17 @@ def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=2 self.minDivisibleClusterSize = Param(self, "minDivisibleClusterSize", "the minimum number of points (if >= 1.0) " + "or the minimum proportion") - self._setDefault(maxIter=20, seed=-1888008604, k=4, minDivisibleClusterSize=1.0) + self._setDefault(maxIter=20, k=4, minDivisibleClusterSize=1.0) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only @since("2.0.0") def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, - seed=-1888008604, k=4, minDivisibleClusterSize=1.0): + seed=None, k=4, minDivisibleClusterSize=1.0): """ setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=-1888008604, k=4, minDivisibleClusterSize=1.0) + seed=None, k=4, minDivisibleClusterSize=1.0) Sets params for BisectingKMeans. """ kwargs = self.setParams._input_kwargs From 581914ce31d2bf02b3284fd28ffad81fe31fbb15 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 27 Jan 2016 15:39:32 +0800 Subject: [PATCH 4/4] Remove duplicated code after SPARK-10509 --- python/pyspark/ml/clustering.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index a7a448511ddb1..9c5371ed90a1f 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -172,6 +172,8 @@ def getInitSteps(self): class BisectingKMeansModel(JavaModel): """ + .. note:: Experimental + Model fitted by BisectingKMeans. .. versionadded:: 2.0.0 @@ -226,7 +228,6 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte .. versionadded:: 2.0.0 """ - # a placeholder to make it appear in the generated doc k = Param(Params._dummy(), "k", "number of clusters to create") minDivisibleClusterSize = Param(Params._dummy(), "minDivisibleClusterSize", "the minimum number of points (if >= 1.0) " + @@ -242,10 +243,6 @@ def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=2 super(BisectingKMeans, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans", self.uid) - self.k = Param(self, "k", "number of clusters to create") - self.minDivisibleClusterSize = Param(self, "minDivisibleClusterSize", - "the minimum number of points (if >= 1.0) " + - "or the minimum proportion") self._setDefault(maxIter=20, k=4, minDivisibleClusterSize=1.0) kwargs = self.__init__._input_kwargs self.setParams(**kwargs)