Skip to content

Commit a99d284

Browse files
huaxingaomengxr
authored andcommitted
[SPARK-19826][ML][PYTHON] add spark.ml Python API for PIC
## What changes were proposed in this pull request? add spark.ml Python API for PIC ## How was this patch tested? add doctest Author: Huaxin Gao <huaxing@us.ibm.com> Closes #21513 from huaxingao/spark--19826.
1 parent 3e5b4ae commit a99d284

File tree

1 file changed

+179
-5
lines changed

1 file changed

+179
-5
lines changed

python/pyspark/ml/clustering.py

Lines changed: 179 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
from pyspark import since, keyword_only
2121
from pyspark.ml.util import *
22-
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
22+
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper
2323
from pyspark.ml.param.shared import *
2424
from pyspark.ml.common import inherit_doc
25+
from pyspark.sql import DataFrame
2526

2627
__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
2728
'KMeans', 'KMeansModel',
2829
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
29-
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
30+
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']
3031

3132

3233
class ClusteringSummary(JavaWrapper):
@@ -836,7 +837,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter
836837
837838
Terminology:
838839
839-
- "term" = "word": an el
840+
- "term" = "word": an element of the vocabulary
840841
- "token": instance of a term appearing in a document
841842
- "topic": multinomial distribution over terms representing some concept
842843
- "document": one piece of text, corresponding to one row in the input data
@@ -938,7 +939,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
938939
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
939940
subsamplingRate=0.05, optimizeDocConcentration=True,\
940941
docConcentration=None, topicConcentration=None,\
941-
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
942+
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
942943
"""
943944
super(LDA, self).__init__()
944945
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
@@ -967,7 +968,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
967968
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
968969
subsamplingRate=0.05, optimizeDocConcentration=True,\
969970
docConcentration=None, topicConcentration=None,\
970-
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
971+
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
971972
972973
Sets params for LDA.
973974
"""
@@ -1156,6 +1157,179 @@ def getKeepLastCheckpoint(self):
11561157
return self.getOrDefault(self.keepLastCheckpoint)
11571158

11581159

1160+
@inherit_doc
1161+
class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReadable,
1162+
JavaMLWritable):
1163+
"""
1164+
.. note:: Experimental
1165+
1166+
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
1167+
<a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
1168+
PIC finds a very low-dimensional embedding of a dataset using truncated power
1169+
iteration on a normalized pair-wise similarity matrix of the data.
1170+
1171+
This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method
1172+
to run the PowerIterationClustering algorithm.
1173+
1174+
.. seealso:: `Wikipedia on Spectral clustering \
1175+
<http://en.wikipedia.org/wiki/Spectral_clustering>`_
1176+
1177+
>>> data = [(1, 0, 0.5), \
1178+
(2, 0, 0.5), (2, 1, 0.7), \
1179+
(3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), \
1180+
(4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), \
1181+
(5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)]
1182+
>>> df = spark.createDataFrame(data).toDF("src", "dst", "weight")
1183+
>>> pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight")
1184+
>>> assignments = pic.assignClusters(df)
1185+
>>> assignments.sort(assignments.id).show(truncate=False)
1186+
+---+-------+
1187+
|id |cluster|
1188+
+---+-------+
1189+
|0 |1 |
1190+
|1 |1 |
1191+
|2 |1 |
1192+
|3 |1 |
1193+
|4 |1 |
1194+
|5 |0 |
1195+
+---+-------+
1196+
...
1197+
>>> pic_path = temp_path + "/pic"
1198+
>>> pic.save(pic_path)
1199+
>>> pic2 = PowerIterationClustering.load(pic_path)
1200+
>>> pic2.getK()
1201+
2
1202+
>>> pic2.getMaxIter()
1203+
40
1204+
1205+
.. versionadded:: 2.4.0
1206+
"""
1207+
1208+
k = Param(Params._dummy(), "k",
1209+
"The number of clusters to create. Must be > 1.",
1210+
typeConverter=TypeConverters.toInt)
1211+
initMode = Param(Params._dummy(), "initMode",
1212+
"The initialization algorithm. This can be either " +
1213+
"'random' to use a random vector as vertex properties, or 'degree' to use " +
1214+
"a normalized sum of similarities with other vertices. Supported options: " +
1215+
"'random' and 'degree'.",
1216+
typeConverter=TypeConverters.toString)
1217+
srcCol = Param(Params._dummy(), "srcCol",
1218+
"Name of the input column for source vertex IDs.",
1219+
typeConverter=TypeConverters.toString)
1220+
dstCol = Param(Params._dummy(), "dstCol",
1221+
"Name of the input column for destination vertex IDs.",
1222+
typeConverter=TypeConverters.toString)
1223+
1224+
@keyword_only
1225+
def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
1226+
weightCol=None):
1227+
"""
1228+
__init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
1229+
weightCol=None)
1230+
"""
1231+
super(PowerIterationClustering, self).__init__()
1232+
self._java_obj = self._new_java_obj(
1233+
"org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
1234+
self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst")
1235+
kwargs = self._input_kwargs
1236+
self.setParams(**kwargs)
1237+
1238+
@keyword_only
1239+
@since("2.4.0")
1240+
def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
1241+
weightCol=None):
1242+
"""
1243+
setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
1244+
weightCol=None)
1245+
Sets params for PowerIterationClustering.
1246+
"""
1247+
kwargs = self._input_kwargs
1248+
return self._set(**kwargs)
1249+
1250+
@since("2.4.0")
1251+
def setK(self, value):
1252+
"""
1253+
Sets the value of :py:attr:`k`.
1254+
"""
1255+
return self._set(k=value)
1256+
1257+
@since("2.4.0")
1258+
def getK(self):
1259+
"""
1260+
Gets the value of :py:attr:`k` or its default value.
1261+
"""
1262+
return self.getOrDefault(self.k)
1263+
1264+
@since("2.4.0")
1265+
def setInitMode(self, value):
1266+
"""
1267+
Sets the value of :py:attr:`initMode`.
1268+
"""
1269+
return self._set(initMode=value)
1270+
1271+
@since("2.4.0")
1272+
def getInitMode(self):
1273+
"""
1274+
Gets the value of :py:attr:`initMode` or its default value.
1275+
"""
1276+
return self.getOrDefault(self.initMode)
1277+
1278+
@since("2.4.0")
1279+
def setSrcCol(self, value):
1280+
"""
1281+
Sets the value of :py:attr:`srcCol`.
1282+
"""
1283+
return self._set(srcCol=value)
1284+
1285+
@since("2.4.0")
1286+
def getSrcCol(self):
1287+
"""
1288+
Gets the value of :py:attr:`srcCol` or its default value.
1289+
"""
1290+
return self.getOrDefault(self.srcCol)
1291+
1292+
@since("2.4.0")
1293+
def setDstCol(self, value):
1294+
"""
1295+
Sets the value of :py:attr:`dstCol`.
1296+
"""
1297+
return self._set(dstCol=value)
1298+
1299+
@since("2.4.0")
1300+
def getDstCol(self):
1301+
"""
1302+
Gets the value of :py:attr:`dstCol` or its default value.
1303+
"""
1304+
return self.getOrDefault(self.dstCol)
1305+
1306+
@since("2.4.0")
1307+
def assignClusters(self, dataset):
1308+
"""
1309+
Run the PIC algorithm and returns a cluster assignment for each input vertex.
1310+
1311+
:param dataset:
1312+
A dataset with columns src, dst, weight representing the affinity matrix,
1313+
which is the matrix A in the PIC paper. Suppose the src column value is i,
1314+
the dst column value is j, the weight column value is similarity s,,ij,,
1315+
which must be nonnegative. This is a symmetric matrix and hence
1316+
s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there should be
1317+
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are
1318+
ignored, because we assume s,,ij,, = 0.0.
1319+
1320+
:return:
1321+
A dataset that contains columns of vertex id and the corresponding cluster for
1322+
the id. The schema of it will be:
1323+
- id: Long
1324+
- cluster: Int
1325+
1326+
.. versionadded:: 2.4.0
1327+
"""
1328+
self._transfer_params_to_java()
1329+
jdf = self._java_obj.assignClusters(dataset._jdf)
1330+
return DataFrame(jdf, dataset.sql_ctx)
1331+
1332+
11591333
if __name__ == "__main__":
11601334
import doctest
11611335
import pyspark.ml.clustering

0 commit comments

Comments
 (0)