diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index b3d5fb17f6b8..4aa1cf84b582 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -19,14 +19,15 @@
from pyspark import since, keyword_only
from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
+from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc
+from pyspark.sql import DataFrame
__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
- 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
+ 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']
class ClusteringSummary(JavaWrapper):
@@ -836,7 +837,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter
Terminology:
- - "term" = "word": an el
+ - "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
@@ -938,7 +939,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
- topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
+ topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
@@ -967,7 +968,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
- topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
+ topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
Sets params for LDA.
"""
@@ -1156,6 +1157,179 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)
+@inherit_doc
+class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReadable,
+ JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
+ Lin and Cohen. From the abstract:
+ PIC finds a very low-dimensional embedding of a dataset using truncated power
+ iteration on a normalized pair-wise similarity matrix of the data.
+
+ This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method
+ to run the PowerIterationClustering algorithm.
+
+ .. seealso:: `Wikipedia on Spectral clustering \
+ `_
+
+ >>> data = [(1, 0, 0.5), \
+ (2, 0, 0.5), (2, 1, 0.7), \
+ (3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), \
+ (4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), \
+ (5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)]
+ >>> df = spark.createDataFrame(data).toDF("src", "dst", "weight")
+ >>> pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight")
+ >>> assignments = pic.assignClusters(df)
+ >>> assignments.sort(assignments.id).show(truncate=False)
+ +---+-------+
+ |id |cluster|
+ +---+-------+
+ |0 |1 |
+ |1 |1 |
+ |2 |1 |
+ |3 |1 |
+ |4 |1 |
+ |5 |0 |
+ +---+-------+
+ ...
+ >>> pic_path = temp_path + "/pic"
+ >>> pic.save(pic_path)
+ >>> pic2 = PowerIterationClustering.load(pic_path)
+ >>> pic2.getK()
+ 2
+ >>> pic2.getMaxIter()
+ 40
+
+ .. versionadded:: 2.4.0
+ """
+
+ k = Param(Params._dummy(), "k",
+ "The number of clusters to create. Must be > 1.",
+ typeConverter=TypeConverters.toInt)
+ initMode = Param(Params._dummy(), "initMode",
+ "The initialization algorithm. This can be either " +
+ "'random' to use a random vector as vertex properties, or 'degree' to use " +
+ "a normalized sum of similarities with other vertices. Supported options: " +
+ "'random' and 'degree'.",
+ typeConverter=TypeConverters.toString)
+ srcCol = Param(Params._dummy(), "srcCol",
+ "Name of the input column for source vertex IDs.",
+ typeConverter=TypeConverters.toString)
+ dstCol = Param(Params._dummy(), "dstCol",
+ "Name of the input column for destination vertex IDs.",
+ typeConverter=TypeConverters.toString)
+
+ @keyword_only
+ def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
+ weightCol=None):
+ """
+ __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
+ weightCol=None)
+ """
+ super(PowerIterationClustering, self).__init__()
+ self._java_obj = self._new_java_obj(
+ "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
+ self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst")
+ kwargs = self._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.4.0")
+ def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
+ weightCol=None):
+ """
+ setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
+ weightCol=None)
+ Sets params for PowerIterationClustering.
+ """
+ kwargs = self._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.4.0")
+ def setK(self, value):
+ """
+ Sets the value of :py:attr:`k`.
+ """
+ return self._set(k=value)
+
+ @since("2.4.0")
+ def getK(self):
+ """
+ Gets the value of :py:attr:`k` or its default value.
+ """
+ return self.getOrDefault(self.k)
+
+ @since("2.4.0")
+ def setInitMode(self, value):
+ """
+ Sets the value of :py:attr:`initMode`.
+ """
+ return self._set(initMode=value)
+
+ @since("2.4.0")
+ def getInitMode(self):
+ """
+ Gets the value of :py:attr:`initMode` or its default value.
+ """
+ return self.getOrDefault(self.initMode)
+
+ @since("2.4.0")
+ def setSrcCol(self, value):
+ """
+ Sets the value of :py:attr:`srcCol`.
+ """
+ return self._set(srcCol=value)
+
+ @since("2.4.0")
+ def getSrcCol(self):
+ """
+ Gets the value of :py:attr:`srcCol` or its default value.
+ """
+ return self.getOrDefault(self.srcCol)
+
+ @since("2.4.0")
+ def setDstCol(self, value):
+ """
+ Sets the value of :py:attr:`dstCol`.
+ """
+ return self._set(dstCol=value)
+
+ @since("2.4.0")
+ def getDstCol(self):
+ """
+ Gets the value of :py:attr:`dstCol` or its default value.
+ """
+ return self.getOrDefault(self.dstCol)
+
+ @since("2.4.0")
+ def assignClusters(self, dataset):
+ """
+ Run the PIC algorithm and returns a cluster assignment for each input vertex.
+
+ :param dataset:
+ A dataset with columns src, dst, weight representing the affinity matrix,
+ which is the matrix A in the PIC paper. Suppose the src column value is i,
+ the dst column value is j, the weight column value is similarity s,,ij,,
+ which must be nonnegative. This is a symmetric matrix and hence
+ s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there should be
+ either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are
+ ignored, because we assume s,,ij,, = 0.0.
+
+ :return:
+ A dataset that contains columns of vertex id and the corresponding cluster for
+ the id. The schema of it will be:
+ - id: Long
+ - cluster: Int
+
+ .. versionadded:: 2.4.0
+ """
+ self._transfer_params_to_java()
+ jdf = self._java_obj.assignClusters(dataset._jdf)
+ return DataFrame(jdf, dataset.sql_ctx)
+
+
if __name__ == "__main__":
import doctest
import pyspark.ml.clustering