Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 179 additions & 5 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

from pyspark import since, keyword_only
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc
from pyspark.sql import DataFrame

__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']


class ClusteringSummary(JavaWrapper):
Expand Down Expand Up @@ -836,7 +837,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter

Terminology:

- "term" = "word": an el
- "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
Expand Down Expand Up @@ -938,7 +939,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
Expand Down Expand Up @@ -967,7 +968,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)

Sets params for LDA.
"""
Expand Down Expand Up @@ -1156,6 +1157,179 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)


@inherit_doc
class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReadable,
JavaMLWritable):
"""
.. note:: Experimental

Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
<a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
PIC finds a very low-dimensional embedding of a dataset using truncated power
iteration on a normalized pair-wise similarity matrix of the data.

This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method
to run the PowerIterationClustering algorithm.

.. seealso:: `Wikipedia on Spectral clustering \
<http://en.wikipedia.org/wiki/Spectral_clustering>`_

>>> data = [(1, 0, 0.5), \
(2, 0, 0.5), (2, 1, 0.7), \
(3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), \
(4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), \
(5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)]
>>> df = spark.createDataFrame(data).toDF("src", "dst", "weight")
>>> pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight")
>>> assignments = pic.assignClusters(df)
>>> assignments.sort(assignments.id).show(truncate=False)
+---+-------+
|id |cluster|
+---+-------+
|0 |1 |
|1 |1 |
|2 |1 |
|3 |1 |
|4 |1 |
|5 |0 |
+---+-------+
...
>>> pic_path = temp_path + "/pic"
>>> pic.save(pic_path)
>>> pic2 = PowerIterationClustering.load(pic_path)
>>> pic2.getK()
2
>>> pic2.getMaxIter()
40

.. versionadded:: 2.4.0
"""

k = Param(Params._dummy(), "k",
"The number of clusters to create. Must be > 1.",
typeConverter=TypeConverters.toInt)
initMode = Param(Params._dummy(), "initMode",
"The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use " +
"a normalized sum of similarities with other vertices. Supported options: " +
"'random' and 'degree'.",
typeConverter=TypeConverters.toString)
srcCol = Param(Params._dummy(), "srcCol",
"Name of the input column for source vertex IDs.",
typeConverter=TypeConverters.toString)
dstCol = Param(Params._dummy(), "dstCol",
"Name of the input column for destination vertex IDs.",
typeConverter=TypeConverters.toString)

@keyword_only
def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
weightCol=None):
"""
__init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
weightCol=None)
"""
super(PowerIterationClustering, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
@since("2.4.0")
def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
weightCol=None):
"""
setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
weightCol=None)
Sets params for PowerIterationClustering.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)

@since("2.4.0")
def setK(self, value):
"""
Sets the value of :py:attr:`k`.
"""
return self._set(k=value)

@since("2.4.0")
def getK(self):
"""
Gets the value of :py:attr:`k` or its default value.
"""
return self.getOrDefault(self.k)

@since("2.4.0")
def setInitMode(self, value):
"""
Sets the value of :py:attr:`initMode`.
"""
return self._set(initMode=value)

@since("2.4.0")
def getInitMode(self):
"""
Gets the value of :py:attr:`initMode` or its default value.
"""
return self.getOrDefault(self.initMode)

@since("2.4.0")
def setSrcCol(self, value):
"""
Sets the value of :py:attr:`srcCol`.
"""
return self._set(srcCol=value)

@since("2.4.0")
def getSrcCol(self):
"""
Gets the value of :py:attr:`srcCol` or its default value.
"""
return self.getOrDefault(self.srcCol)

@since("2.4.0")
def setDstCol(self, value):
"""
Sets the value of :py:attr:`dstCol`.
"""
return self._set(dstCol=value)

@since("2.4.0")
def getDstCol(self):
"""
Gets the value of :py:attr:`dstCol` or its default value.
"""
return self.getOrDefault(self.dstCol)

@since("2.4.0")
def assignClusters(self, dataset):
"""
Run the PIC algorithm and returns a cluster assignment for each input vertex.

:param dataset:
A dataset with columns src, dst, weight representing the affinity matrix,
which is the matrix A in the PIC paper. Suppose the src column value is i,
the dst column value is j, the weight column value is similarity s,,ij,,
which must be nonnegative. This is a symmetric matrix and hence
s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there should be
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are
ignored, because we assume s,,ij,, = 0.0.

:return:
A dataset that contains columns of vertex id and the corresponding cluster for
the id. The schema of it will be:
- id: Long
- cluster: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .. versionadded:: 2.4.0


.. versionadded:: 2.4.0
"""
self._transfer_params_to_java()
jdf = self._java_obj.assignClusters(dataset._jdf)
return DataFrame(jdf, dataset.sql_ctx)


if __name__ == "__main__":
import doctest
import pyspark.ml.clustering
Expand Down