Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyspark.ml.param.shared import *
from pyspark.mllib.common import inherit_doc

__all__ = ['KMeans', 'KMeansModel']
__all__ = ['KMeans', 'KMeansModel', 'BisectingKMeans', 'BisectingKMeansModel']


class KMeansModel(JavaModel):
Expand Down Expand Up @@ -170,6 +170,129 @@ def getInitSteps(self):
return self.getOrDefault(self.initSteps)


class BisectingKMeansModel(JavaModel):
"""
.. note:: Experimental

Model fitted by BisectingKMeans.

.. versionadded:: 2.0.0
"""

@since("2.0.0")
def clusterCenters(self):
"""Get the cluster centers, represented as a list of NumPy arrays."""
return [c.toArray() for c in self._call_java("clusterCenters")]

@since("2.0.0")
def computeCost(self, dataset):
"""
Computes the sum of squared distances between the input points
and their corresponding cluster centers.
"""
return self._call_java("computeCost", dataset)


@inherit_doc
class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed):
"""
.. note:: Experimental

A bisecting k-means algorithm based on the paper "A comparison of document clustering
techniques" by Steinbach, Karypis, and Kumar, with modification to fit Spark.
The algorithm starts from a single cluster that contains all points.
Iteratively it finds divisible clusters on the bottom level and bisects each of them using
k-means, until there are `k` leaf clusters in total or no leaf clusters are divisible.
The bisecting steps of clusters on the same level are grouped together to increase parallelism.
If bisecting all divisible clusters on the bottom level would result more than `k` leaf
clusters, larger clusters get higher priority.

>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
>>> df = sqlContext.createDataFrame(data, ["features"])
>>> bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
>>> model = bkm.fit(df)
>>> centers = model.clusterCenters()
>>> len(centers)
2
>>> model.computeCost(df)
2.000...
>>> transformed = model.transform(df).select("features", "prediction")
>>> rows = transformed.collect()
>>> rows[0].prediction == rows[1].prediction
True
>>> rows[2].prediction == rows[3].prediction
True

.. versionadded:: 2.0.0
"""

k = Param(Params._dummy(), "k", "number of clusters to create")
minDivisibleClusterSize = Param(Params._dummy(), "minDivisibleClusterSize",
"the minimum number of points (if >= 1.0) " +
"or the minimum proportion")

@keyword_only
def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20,
seed=None, k=4, minDivisibleClusterSize=1.0):
"""
__init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, \
seed=None, k=4, minDivisibleClusterSize=1.0)
"""
super(BisectingKMeans, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans",
self.uid)
self._setDefault(maxIter=20, k=4, minDivisibleClusterSize=1.0)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #10216, we do not need to declare param variables in __init__ by hand.


@keyword_only
@since("2.0.0")
def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20,
seed=None, k=4, minDivisibleClusterSize=1.0):
"""
setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \
seed=None, k=4, minDivisibleClusterSize=1.0)
Sets params for BisectingKMeans.
"""
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)

@since("2.0.0")
def setK(self, value):
"""
Sets the value of :py:attr:`k`.
"""
self._paramMap[self.k] = value
return self

@since("2.0.0")
def getK(self):
"""
Gets the value of `k` or its default value.
"""
return self.getOrDefault(self.k)

@since("2.0.0")
def setMinDivisibleClusterSize(self, value):
"""
Sets the value of :py:attr:`minDivisibleClusterSize`.
"""
self._paramMap[self.minDivisibleClusterSize] = value
return self

@since("2.0.0")
def getMinDivisibleClusterSize(self):
"""
Gets the value of `minDivisibleClusterSize` or its default value.
"""
return self.getOrDefault(self.minDivisibleClusterSize)

def _create_model(self, java_model):
return BisectingKMeansModel(java_model)


if __name__ == "__main__":
import doctest
from pyspark.context import SparkContext
Expand Down