From d8b066a32d783db8b1d04952969bc5c7ce7b9f2f Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 29 May 2015 16:09:48 +0530 Subject: [PATCH 01/11] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans --- .../mllib/api/python/PythonMLLibAPI.scala | 15 +++ python/pyspark/mllib/clustering.py | 94 ++++++++++++++++++- 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 1812b3ac7cc0e..5cfe793a50183 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -964,6 +964,21 @@ private[python] class PythonMLLibAPI extends Serializable { points.asScala.toArray) } + /** + * Java stub for the update method of StreamingKMeansModel. + */ + def updateStreamingKMeansModel( + clusterCenters: java.util.ArrayList[Vector], + clusterWeights: java.util.ArrayList[Double], + data: JavaRDD[Vector], decayFactor: Double, + timeUnit: String) : JList[Object] = { + val model = new StreamingKMeansModel( + clusterCenters.asScala.toArray, clusterWeights.asScala.toArray) + .update(data, decayFactor, timeUnit) + List(model.clusterCenters, model.clusterWeights). + map(_.asInstanceOf[Object]).asJava + } + } /** diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b55583f82223f..d313121242a14 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -21,7 +21,9 @@ if sys.version > '3': xrange = range -from numpy import array +from math import exp, log + +from numpy import array, random, tile from pyspark import RDD from pyspark import SparkContext @@ -264,6 +266,96 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia return GaussianMixtureModel(weight, mvg_obj) +class StreamingKMeansModel(KMeansModel): + """ + .. note:: Experimental + """ + def __init__(self, clusterCenters, clusterWeights): + super(StreamingKMeansModel, self).__init__(centers=clusterCenters) + self._clusterWeights = list(clusterWeights) + + def update(self, data, decayFactor, timeUnit): + if not isinstance(data, RDD): + raise TypeError("data should be of a RDD, got %s." % type(data)) + decayFactor = float(decayFactor) + if timeUnit not in ["batches", "points"]: + raise ValueError( + "timeUnit should be 'batches' or 'points', got %s." % timeUnit) + vectorCenters = [_convert_to_vector(center) for center in self.centers] + updatedModel = callMLlibFunc( + "updateStreamingKMeansModel", vectorCenters, self._clusterWeights, + data, decayFactor, timeUnit) + self.centers = array(updatedModel[0]) + self._clusterWeights = list(updatedModel[1]) + return self + + +class StreamingKMeans(object): + """ + .. note:: Experimental + """ + def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): + self._k = k + self._decayFactor = decayFactor + if timeUnit not in ["batches", "points"]: + raise ValueError( + "timeUnit should be 'batches' or 'points', got %s." % timeUnit) + self._timeUnit = timeUnit + self.model = None + + def _validate(self, dstream): + if self.model is None: + raise ValueError( + "Initial centers should be set either by setInitialCenters ") + "or setRandomCenters.") + if not isinstance(dstream, DStream): + raise TypeError( + "Expected dstream to be of type DStream, " + "got type %d" % type(dstream)) + + def setK(self, k): + self._k = k + return self + + def setDecayFactor(self, decayFactor): + self._decayFactor = decayFactor + return self + + def setHalfLife(self, halfLife, timeUnit): + self._timeUnit = timeUnit + self._decayFactor = exp(log(0.5) / halfLife) + return self + + def setInitialCenters(self, centers, weights): + self.model = StreamingKMeansModel(centers, weights) + return self + + def setRandomCenters(self, dim, weight, seed): + rng = random.RandomState(seed) + clusterCenters = rng.randn(self._k, dim) + clusterWeights = tile(weight, self._k) + self.model = StreamingKMeansModel(clusterCenters, clusterWeights) + return self + + def trainOn(self, dstream): + self._validate(dstream) + + def update(_, rdd): + if rdd: + self.model = self.model.update(rdd) + + dstream.foreachRDD(update) + return self + + def predictOn(self, dstream): + self._validate(dstream) + dstream.map(model.predict) + + def predictOnValues(self, dstream): + self._validate(dstream) + dstream.mapValues(model.predict) + + def _test(): import doctest globs = globals().copy() From 4b1481f0973d1eb02f9d5dc5e4cd12824484165b Mon Sep 17 00:00:00 2001 From: MechCoder Date: Sat, 30 May 2015 18:53:47 +0530 Subject: [PATCH 02/11] Some changes and tests --- python/pyspark/mllib/clustering.py | 43 ++++++++++++++++++++++++++---- python/pyspark/mllib/tests.py | 40 +++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index d313121242a14..94ccecfe48f72 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,9 +28,10 @@ from pyspark import RDD from pyspark import SparkContext from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py -from pyspark.mllib.linalg import SparseVector, _convert_to_vector +from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector from pyspark.mllib.stat.distribution import MultivariateGaussian from pyspark.mllib.util import Saveable, Loader, inherit_doc +from pyspark.streaming import DStream __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture'] @@ -269,14 +270,46 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia class StreamingKMeansModel(KMeansModel): """ .. note:: Experimental + + >>> initCenters, initWeights = [[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0] + >>> stkm = StreamingKMeansModel(initCenters, initWeights) + >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1], + ... [0.9, 0.9], [1.1, 1.1]]) + >>> stkm = stkm.update(data, 1.0, "batches") + >>> stkm.centers + array([[ 0., 0.], + [ 1., 1.]]) + >>> stkm.predict([-0.1, -0.1]) == stkm.predict([0.1, 0.1]) == 0 + True + >>> stkm.predict([0.9, 0.9]) == stkm.predict([1.1, 1.1]) == 1 + True + >>> stkm.getClusterWeights + [3.0, 3.0] + >>> decayFactor = 0.0 + >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])]) + >>> stkm = stkm.update(data, 0.0, "batches") + >>> stkm.centers + array([[ 0.2, 0.2], + [ 1.5, 1.5]]) + >>> stkm.getClusterWeights + [1.0, 1.0] + >>> stkm.predict([0.2, 0.2]) + 0 + >>> stkm.predict([1.5, 1.5]) + 1 """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) + @property + def getClusterWeights(self): + return self._clusterWeights + def update(self, data, decayFactor, timeUnit): if not isinstance(data, RDD): raise TypeError("data should be of a RDD, got %s." % type(data)) + data = data.map(_convert_to_vector) decayFactor = float(decayFactor) if timeUnit not in ["batches", "points"]: raise ValueError( @@ -306,7 +339,7 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): def _validate(self, dstream): if self.model is None: raise ValueError( - "Initial centers should be set either by setInitialCenters ") + "Initial centers should be set either by setInitialCenters " "or setRandomCenters.") if not isinstance(dstream, DStream): raise TypeError( @@ -342,18 +375,18 @@ def trainOn(self, dstream): def update(_, rdd): if rdd: - self.model = self.model.update(rdd) + self.model = self.model.update(rdd, self._decayFactor, self._timeUnit) dstream.foreachRDD(update) return self def predictOn(self, dstream): self._validate(dstream) - dstream.map(model.predict) + dstream.map(self.model.predict) def predictOnValues(self, dstream): self._validate(dstream) - dstream.mapValues(model.predict) + dstream.mapValues(self.model.predict) def _test(): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c482e6b0681e3..5b34b91ef20d6 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -38,6 +38,7 @@ from pyspark import SparkContext from pyspark.mllib.common import _to_java_object_rdd +from pyspark.mllib.clustering import StreamingKMeans from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT from pyspark.mllib.regression import LabeledPoint @@ -48,6 +49,7 @@ from pyspark.mllib.feature import StandardScaler from pyspark.mllib.feature import ElementwiseProduct from pyspark.serializers import PickleSerializer +from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext _have_scipy = False @@ -863,6 +865,44 @@ def test_model_transform(self): eprod.transform(sparsevec), SparseVector(3, [0], [3])) +class StreamingKMeansTest(MLlibTestCase): + def test_model_params(self): + stkm = StreamingKMeans() + stkm.setK(5).setDecayFactor(0.0) + self.assertEquals(stkm._k, 5) + self.assertEquals(stkm._decayFactor, 0.0) + + # Model not set yet. + self.assertIsNone(stkm.model) + self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) + + stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) + self.assertEqual(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEqual(stkm.model.getClusterWeights, [1.0, 1.0]) + + def test_model(self): + stkm = StreamingKMeans() + initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] + weights = [1.0, 1.0, 1.0, 1.0] + stkm.setInitialCenters(initCenters, weights) + + offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]] + batches = [] + + for offset in offsets: + batches.append([[offset[0] + center[0], offset[1] + center[1]] + for center in initCenters]) + + batches = [self.sc.parallelize(batch, 1) for batch in batches] + ssc = StreamingContext(self.sc, 2.0) + input_stream = ssc.queueStream(batches) + stkm.trainOn(input_stream) + ssc.start() + finalModel = stkm.model + self.assertEqual(finalModel.centers, initCenters) + # self.assertEqual(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) + + if __name__ == "__main__": if not _have_scipy: print("NOTE: Skipping SciPy tests as it does not seem to be installed") From ee8ce16528c359a37796969ffe8f0a5b6d597e46 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 2 Jun 2015 19:43:05 +0530 Subject: [PATCH 03/11] Update tests, doc and examples --- docs/mllib-clustering.md | 56 ++++++++++++++++++-- python/pyspark/mllib/clustering.py | 82 +++++++++++++++++++++++++++--- python/pyspark/mllib/tests.py | 68 ++++++++++++++++++++----- 3 files changed, 182 insertions(+), 24 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 1b088969ddc25..fdee8e7300abd 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -593,6 +593,58 @@ ssc.start() ssc.awaitTermination() {% endhighlight %} + + +
+First we import the neccessary classes. + +{% highlight python %} + +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.clustering import StreamingKMeans + +{% endhighlight %} + +Then we make an input stream of vectors for training, as well as a stream of labeled data +points for testing. We assume a StreamingContext `ssc` has been created, see +[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. + +{% highlight python %} + +trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) +testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) + +{% endhighlight %} + +We create a model with random clusters and specify the number of clusters to find + +{% highlight python %} + +numDimensions = 3 +numClusters = 2 +model = StreamingKMeans() +model.setK(numClusters) +model.setDecayFactor(1.0) +model.setRandomCenters(numDimensions, 0.0) + +{% endhighlight %} + +Now register the streams for training and testing and start the job, printing +the predicted cluster assignments on new data points as they arrive. + +{% highlight python %} + +model.trainOn(trainingData) +model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))) + +ssc.start() +ssc.awaitTermination() + +{% endhighlight %} +
+ + As you add new text files with data the cluster centers will update. Each training point should be formatted as `[x1, x2, x3]`, and each test data point @@ -600,7 +652,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id (e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir` the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. With new data, the cluster centers will change! - - - - diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 94ccecfe48f72..26a19afdd9323 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -101,6 +101,9 @@ def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 best_distance = float("inf") + if isinstance(x, RDD): + return x.map(self.predict) + x = _convert_to_vector(x) for i in xrange(len(self.centers)): distance = x.squared_distance(self.centers[i]) @@ -270,12 +273,30 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia class StreamingKMeansModel(KMeansModel): """ .. note:: Experimental + Clustering model which can perform an online update of the centroids. + + The update formula is given by + c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t] + n_t+1 = n_t * a + m_t + + where + c_t: Centroid at the n_th iteration. + n_t: Number of weights at the n_th iteration. + x_t: Centroid of the new data closest to c_t + m_t: Number of weights of the new data closest to c_t + c_t+1: New centroid + n_t+1: New number of weights. + a: Decay Factor, which gives the forgetfulnes + + Note that if a is set to 1, it is the weighted mean of the previous + and new data. If it set to zero, the old centroids are completely + forgotten. >>> initCenters, initWeights = [[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0] >>> stkm = StreamingKMeansModel(initCenters, initWeights) >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1], ... [0.9, 0.9], [1.1, 1.1]]) - >>> stkm = stkm.update(data, 1.0, "batches") + >>> stkm = stkm.update(data, 1.0, u"batches") >>> stkm.centers array([[ 0., 0.], [ 1., 1.]]) @@ -287,7 +308,7 @@ class StreamingKMeansModel(KMeansModel): [3.0, 3.0] >>> decayFactor = 0.0 >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])]) - >>> stkm = stkm.update(data, 0.0, "batches") + >>> stkm = stkm.update(data, 0.0, u"batches") >>> stkm.centers array([[ 0.2, 0.2], [ 1.5, 1.5]]) @@ -304,9 +325,22 @@ def __init__(self, clusterCenters, clusterWeights): @property def getClusterWeights(self): + """Convenience method to return the cluster weights.""" return self._clusterWeights def update(self, data, decayFactor, timeUnit): + """Update the centroids, according to data + + Parameters + ---------- + data: Should be a RDD that represents the new data. + + decayFactor: forgetfulness of the previous centroids. + + timeUnit: Can be "batches" or "points" + If points, then the decay factor is raised to the power of + number of new points and if batches, it is used as it is. + """ if not isinstance(data, RDD): raise TypeError("data should be of a RDD, got %s." % type(data)) data = data.map(_convert_to_vector) @@ -326,6 +360,21 @@ def update(self, data, decayFactor, timeUnit): class StreamingKMeans(object): """ .. note:: Experimental + + Provides methods to set k, decayFactor, timeUnit to train and + predict the incoming data + + Parameters + ---------- + k: int + Number of clusters + + decayFactor: float + Forgetfulness of the previous centroid. + + timeUnit: str, "batches" or "points" + If points, then the decayfactor is raised to the power of new + points. """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -347,14 +396,20 @@ def _validate(self, dstream): "got type %d" % type(dstream)) def setK(self, k): + """Set number of clusters.""" self._k = k return self def setDecayFactor(self, decayFactor): + """Set decay factor.""" self._decayFactor = decayFactor return self def setHalfLife(self, halfLife, timeUnit): + """ + Set number of instances after which the centroids at + has 0.5 weightage + """ self._timeUnit = timeUnit self._decayFactor = exp(log(0.5) / halfLife) return self @@ -364,6 +419,10 @@ def setInitialCenters(self, centers, weights): return self def setRandomCenters(self, dim, weight, seed): + """ + Set the initial centres to be random samples from + a gaussian population with constant weights. + """ rng = random.RandomState(seed) clusterCenters = rng.randn(self._k, dim) clusterWeights = tile(weight, self._k) @@ -371,22 +430,29 @@ def setRandomCenters(self, dim, weight, seed): return self def trainOn(self, dstream): + """Train the model on the incoming dstream.""" self._validate(dstream) - def update(_, rdd): - if rdd: - self.model = self.model.update(rdd, self._decayFactor, self._timeUnit) + def update(rdd): + self.model.update(rdd, self._decayFactor, self._timeUnit) dstream.foreachRDD(update) - return self def predictOn(self, dstream): + """ + Make predictions on a dstream. + Returns a transformed dstream object + """ self._validate(dstream) - dstream.map(self.model.predict) + return dstream.map(self.model.predict) def predictOnValues(self, dstream): + """ + Make predictions on a keyed dstream. + Returns a transformed dstream object. + """ self._validate(dstream) - dstream.mapValues(self.model.predict) + return dstream.mapValues(self.model.predict) def _test(): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 5b34b91ef20d6..5be26e286f00f 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -23,8 +23,9 @@ import sys import tempfile import array as pyarray +from time import time, sleep -from numpy import array, array_equal, zeros, inf +from numpy import array, array_equal, zeros, inf, all from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -38,7 +39,7 @@ from pyspark import SparkContext from pyspark.mllib.common import _to_java_object_rdd -from pyspark.mllib.clustering import StreamingKMeans +from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT from pyspark.mllib.regression import LabeledPoint @@ -69,6 +70,15 @@ def setUp(self): self.sc = sc +class MLLibStreamingTestCase(unittest.TestCase): + def setUp(self): + self.sc = sc + self.ssc = StreamingContext(self.sc, 1.0) + + def tearDown(self): + self.ssc.stop(False) + + def _squared_distance(a, b): if isinstance(a, Vector): return a.squared_distance(b) @@ -865,7 +875,7 @@ def test_model_transform(self): eprod.transform(sparsevec), SparseVector(3, [0], [3])) -class StreamingKMeansTest(MLlibTestCase): +class StreamingKMeansTest(MLLibStreamingTestCase): def test_model_params(self): stkm = StreamingKMeans() stkm.setK(5).setDecayFactor(0.0) @@ -877,30 +887,64 @@ def test_model_params(self): self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) - self.assertEqual(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]]) - self.assertEqual(stkm.model.getClusterWeights, [1.0, 1.0]) + self.assertEquals(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEquals(stkm.model.getClusterWeights, [1.0, 1.0]) + + def _ssc_wait(self, start_time, end_time, sleep_time): + while time() - start_time < end_time: + sleep(0.01) - def test_model(self): + def test_trainOn_model(self): + # Test the model on toy data. stkm = StreamingKMeans() initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] stkm.setInitialCenters(initCenters, weights) + # Create a toy dataset by setting a tiny offest for each point. offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]] batches = [] - for offset in offsets: batches.append([[offset[0] + center[0], offset[1] + center[1]] for center in initCenters]) batches = [self.sc.parallelize(batch, 1) for batch in batches] - ssc = StreamingContext(self.sc, 2.0) - input_stream = ssc.queueStream(batches) + input_stream = self.ssc.queueStream(batches) stkm.trainOn(input_stream) - ssc.start() + t = time() + self.ssc.start() + + # Give enough time to train the model. + self._ssc_wait(t, 6.0, 0.01) finalModel = stkm.model - self.assertEqual(finalModel.centers, initCenters) - # self.assertEqual(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) + self.assertTrue(all(finalModel.centers == array(initCenters))) + self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) + + def test_predictOn_model(self): + initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] + weights = [1.0, 1.0, 1.0, 1.0] + model = StreamingKMeansModel(initCenters, weights) + stkm = StreamingKMeans() + stkm.model = model + + predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] + predict_data = [sc.parallelize(batch, 1) for batch in predict_data] + predict_stream = self.ssc.queueStream(predict_data) + predict_val = stkm.predictOn(predict_stream) + + result = [] + + def update(rdd): + if rdd: + rdd_collect = rdd.collect() + if rdd_collect: + result.append(rdd_collect) + + predict_val.foreachRDD(update) + t = time() + self.ssc.start() + self._ssc_wait(t, 6.0, 0.01) + self.assertEquals(result, [[0], [1], [2], [3]]) if __name__ == "__main__": From c80e45120a5181046844846eb71442cf6e05cd8c Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 3 Jun 2015 22:20:39 +0530 Subject: [PATCH 04/11] Add ignore_unicode_prefix --- python/pyspark/mllib/clustering.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 26a19afdd9323..8d0c8c7654934 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -25,8 +25,8 @@ from numpy import array, random, tile -from pyspark import RDD from pyspark import SparkContext +from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector from pyspark.mllib.stat.distribution import MultivariateGaussian @@ -328,6 +328,7 @@ def getClusterWeights(self): """Convenience method to return the cluster weights.""" return self._clusterWeights + @ignore_unicode_prefix def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data From a9817df9c59919a754f05adda161c840173ccb22 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 4 Jun 2015 12:22:58 +0530 Subject: [PATCH 05/11] Better tests and minor fixes --- python/pyspark/mllib/clustering.py | 16 ++++----- python/pyspark/mllib/tests.py | 55 +++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8d0c8c7654934..1e148452d2cc2 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -384,17 +384,17 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): raise ValueError( "timeUnit should be 'batches' or 'points', got %s." % timeUnit) self._timeUnit = timeUnit - self.model = None + self.latestModel = None def _validate(self, dstream): - if self.model is None: + if self.latestModel is None: raise ValueError( "Initial centers should be set either by setInitialCenters " "or setRandomCenters.") if not isinstance(dstream, DStream): raise TypeError( "Expected dstream to be of type DStream, " - "got type %d" % type(dstream)) + "got type %s" % type(dstream)) def setK(self, k): """Set number of clusters.""" @@ -416,7 +416,7 @@ def setHalfLife(self, halfLife, timeUnit): return self def setInitialCenters(self, centers, weights): - self.model = StreamingKMeansModel(centers, weights) + self.latestModel = StreamingKMeansModel(centers, weights) return self def setRandomCenters(self, dim, weight, seed): @@ -427,7 +427,7 @@ def setRandomCenters(self, dim, weight, seed): rng = random.RandomState(seed) clusterCenters = rng.randn(self._k, dim) clusterWeights = tile(weight, self._k) - self.model = StreamingKMeansModel(clusterCenters, clusterWeights) + self.latestModel = StreamingKMeansModel(clusterCenters, clusterWeights) return self def trainOn(self, dstream): @@ -435,7 +435,7 @@ def trainOn(self, dstream): self._validate(dstream) def update(rdd): - self.model.update(rdd, self._decayFactor, self._timeUnit) + self.latestModel.update(rdd, self._decayFactor, self._timeUnit) dstream.foreachRDD(update) @@ -445,7 +445,7 @@ def predictOn(self, dstream): Returns a transformed dstream object """ self._validate(dstream) - return dstream.map(self.model.predict) + return dstream.map(self.latestModel.predict) def predictOnValues(self, dstream): """ @@ -453,7 +453,7 @@ def predictOnValues(self, dstream): Returns a transformed dstream object. """ self._validate(dstream) - return dstream.mapValues(self.model.predict) + return dstream.mapValues(self.latestModel.predict) def _test(): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 5be26e286f00f..aaffeeefaf666 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -25,7 +25,7 @@ import array as pyarray from time import time, sleep -from numpy import array, array_equal, zeros, inf, all +from numpy import array, array_equal, zeros, inf, all, random, sum from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -883,19 +883,50 @@ def test_model_params(self): self.assertEquals(stkm._decayFactor, 0.0) # Model not set yet. - self.assertIsNone(stkm.model) + self.assertIsNone(stkm.latestModel) self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) - self.assertEquals(stkm.model.centers, [[0.0, 0.0], [1.0, 1.0]]) - self.assertEquals(stkm.model.getClusterWeights, [1.0, 1.0]) + self.assertEquals(stkm.latestModel.centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEquals(stkm.latestModel.getClusterWeights, [1.0, 1.0]) - def _ssc_wait(self, start_time, end_time, sleep_time): + @staticmethod + def _ssc_wait(start_time, end_time, sleep_time): while time() - start_time < end_time: sleep(0.01) + def test_accuracy_for_single_center(self): + numBatches, numPoints, k, d, r, seed = 5, 5, 1, 5, 0.1, 0 + centers, batches = self.streamingKMeansDataGenerator( + numBatches, numPoints, k, d, r, seed) + stkm = StreamingKMeans(1) + stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.]) + input_stream = self.ssc.queueStream( + [self.sc.parallelize(batch, 1) for batch in batches]) + stkm.trainOn(input_stream) + t = time() + self.ssc.start() + self._ssc_wait(t, 10.0, 0.01) + self.assertEquals(stkm.latestModel.getClusterWeights, [25.0]) + realCenters = sum(array(centers), axis=0) + for i in range(d): + modelCenters = stkm.latestModel.centers[0][i] + self.assertAlmostEqual(centers[0][i], modelCenters, 1) + self.assertAlmostEqual(realCenters[i], modelCenters, 1) + + def streamingKMeansDataGenerator(self, batches, numPoints, + k, d, r, seed, centers=None): + rng = random.RandomState(seed) + + # Generate centers. + centers = [rng.randn(d) for i in range(k)] + + return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d)) + for j in range(numPoints)] + for i in range(batches)] + def test_trainOn_model(self): - # Test the model on toy data. + # Test the model on toy data with four clusters. stkm = StreamingKMeans() initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] @@ -916,16 +947,15 @@ def test_trainOn_model(self): # Give enough time to train the model. self._ssc_wait(t, 6.0, 0.01) - finalModel = stkm.model + finalModel = stkm.latestModel self.assertTrue(all(finalModel.centers == array(initCenters))) self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) def test_predictOn_model(self): initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] - model = StreamingKMeansModel(initCenters, weights) stkm = StreamingKMeans() - stkm.model = model + stkm.latestModel = StreamingKMeansModel(initCenters, weights) predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] predict_data = [sc.parallelize(batch, 1) for batch in predict_data] @@ -935,10 +965,9 @@ def test_predictOn_model(self): result = [] def update(rdd): - if rdd: - rdd_collect = rdd.collect() - if rdd_collect: - result.append(rdd_collect) + rdd_collect = rdd.collect() + if rdd_collect: + result.append(rdd_collect) predict_val.foreachRDD(update) t = time() From 8ab9e89b91580a13e7efbb16657003ec4cd6f5ce Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 5 Jun 2015 01:17:18 +0530 Subject: [PATCH 06/11] Fix Python3 error --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 3 +-- python/pyspark/mllib/tests.py | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 5cfe793a50183..75e64f88543f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -975,8 +975,7 @@ private[python] class PythonMLLibAPI extends Serializable { val model = new StreamingKMeansModel( clusterCenters.asScala.toArray, clusterWeights.asScala.toArray) .update(data, decayFactor, timeUnit) - List(model.clusterCenters, model.clusterWeights). - map(_.asInstanceOf[Object]).asJava + List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava } } diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index aaffeeefaf666..146851a5e00b3 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -25,7 +25,8 @@ import array as pyarray from time import time, sleep -from numpy import array, array_equal, zeros, inf, all, random, sum +from numpy import array, array_equal, zeros, inf, all, random +from numpy import sum as array_sum from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -908,7 +909,7 @@ def test_accuracy_for_single_center(self): self.ssc.start() self._ssc_wait(t, 10.0, 0.01) self.assertEquals(stkm.latestModel.getClusterWeights, [25.0]) - realCenters = sum(array(centers), axis=0) + realCenters = array_sum(array(centers), axis=0) for i in range(d): modelCenters = stkm.latestModel.centers[0][i] self.assertAlmostEqual(centers[0][i], modelCenters, 1) From 5d9fe610531fcdd064b86068590ad7c26b1b00e3 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 11 Jun 2015 12:42:23 +0530 Subject: [PATCH 07/11] predictOn should take into account the latest model --- python/pyspark/mllib/clustering.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 1e148452d2cc2..34badcaab94bb 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -445,7 +445,11 @@ def predictOn(self, dstream): Returns a transformed dstream object """ self._validate(dstream) - return dstream.map(self.latestModel.predict) + + def predict(ds): + return self.latestModel.predict(ds) + + return dstream.map(predict) def predictOnValues(self, dstream): """ @@ -453,7 +457,11 @@ def predictOnValues(self, dstream): Returns a transformed dstream object. """ self._validate(dstream) - return dstream.mapValues(self.latestModel.predict) + + def predict(ds): + return self.latestModel.predict(ds) + + return dstream.mapValues(predict) def _test(): From 81482fd31ff597779724c2ccec7f223aa97a0f60 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 17 Jun 2015 00:49:11 +0530 Subject: [PATCH 08/11] minor --- python/pyspark/mllib/clustering.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 34badcaab94bb..3a88a6aa68c0d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -445,11 +445,7 @@ def predictOn(self, dstream): Returns a transformed dstream object """ self._validate(dstream) - - def predict(ds): - return self.latestModel.predict(ds) - - return dstream.map(predict) + return dstream.map(lambda x: self.latestModel.predict(x)) def predictOnValues(self, dstream): """ @@ -457,11 +453,7 @@ def predictOnValues(self, dstream): Returns a transformed dstream object. """ self._validate(dstream) - - def predict(ds): - return self.latestModel.predict(ds) - - return dstream.mapValues(predict) + return dstream.mapValues(lambda x: self.latestModel.predict(x)) def _test(): From 2061a765a1dc4c0059636d2db6274562019bce57 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 19 Jun 2015 01:40:58 +0530 Subject: [PATCH 09/11] Add tests for simultaneous training and prediction Minor style fixes --- docs/mllib-clustering.md | 20 ++---- .../mllib/api/python/PythonMLLibAPI.scala | 13 ++-- python/pyspark/mllib/clustering.py | 66 +++++++++---------- python/pyspark/mllib/tests.py | 62 +++++++++++++---- 4 files changed, 93 insertions(+), 68 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index fdee8e7300abd..d1f0a7f06c526 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -599,11 +599,9 @@ ssc.awaitTermination() First we import the neccessary classes. {% highlight python %} - from pyspark.mllib.linalg import Vectors from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.clustering import StreamingKMeans - {% endhighlight %} Then we make an input stream of vectors for training, as well as a stream of labeled data @@ -611,36 +609,30 @@ points for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. {% highlight python %} +def parse(lp): + label = float(lp[lp.find('(') + 1: lp.find(',')]) + vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) + return LabeledPoint(label, vec) trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) -testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) - +testData = ssc.textFileStream("/testing/data/dir").map(parse) {% endhighlight %} We create a model with random clusters and specify the number of clusters to find {% highlight python %} - -numDimensions = 3 -numClusters = 2 -model = StreamingKMeans() -model.setK(numClusters) -model.setDecayFactor(1.0) -model.setRandomCenters(numDimensions, 0.0) - +model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) {% endhighlight %} Now register the streams for training and testing and start the job, printing the predicted cluster assignments on new data points as they arrive. {% highlight python %} - model.trainOn(trainingData) model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))) ssc.start() ssc.awaitTermination() - {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 75e64f88543f9..2897865af6912 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -968,12 +968,13 @@ private[python] class PythonMLLibAPI extends Serializable { * Java stub for the update method of StreamingKMeansModel. */ def updateStreamingKMeansModel( - clusterCenters: java.util.ArrayList[Vector], - clusterWeights: java.util.ArrayList[Double], - data: JavaRDD[Vector], decayFactor: Double, - timeUnit: String) : JList[Object] = { - val model = new StreamingKMeansModel( - clusterCenters.asScala.toArray, clusterWeights.asScala.toArray) + clusterCenters: JList[Vector], + clusterWeights: JList[Double], + data: JavaRDD[Vector], + decayFactor: Double, + timeUnit: String): JList[Object] = { + val model = new StreamingKMeansModel( + clusterCenters.asScala.toArray, clusterWeights.asScala.toArray) .update(data, decayFactor, timeUnit) List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava } diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 3a88a6aa68c0d..a680adaf2744a 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -275,18 +275,19 @@ class StreamingKMeansModel(KMeansModel): .. note:: Experimental Clustering model which can perform an online update of the centroids. - The update formula is given by + The update formula for each centroid is given by c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t] n_t+1 = n_t * a + m_t where - c_t: Centroid at the n_th iteration. - n_t: Number of weights at the n_th iteration. - x_t: Centroid of the new data closest to c_t - m_t: Number of weights of the new data closest to c_t - c_t+1: New centroid + c_t: Centroid at the n_th iteration. + n_t: Number of samples (or) weights associated with the centroid + at the n_th iteration. + x_t: Centroid of the new data closest to c_t. + m_t: Number of samples (or) weights of the new data closest to c_t + c_t+1: New centroid. n_t+1: New number of weights. - a: Decay Factor, which gives the forgetfulnes + a: Decay Factor, which gives the forgetfulness. Note that if a is set to 1, it is the weighted mean of the previous and new data. If it set to zero, the old centroids are completely @@ -304,7 +305,7 @@ class StreamingKMeansModel(KMeansModel): True >>> stkm.predict([0.9, 0.9]) == stkm.predict([1.1, 1.1]) == 1 True - >>> stkm.getClusterWeights + >>> stkm.clusterWeights [3.0, 3.0] >>> decayFactor = 0.0 >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])]) @@ -312,19 +313,22 @@ class StreamingKMeansModel(KMeansModel): >>> stkm.centers array([[ 0.2, 0.2], [ 1.5, 1.5]]) - >>> stkm.getClusterWeights + >>> stkm.clusterWeights [1.0, 1.0] >>> stkm.predict([0.2, 0.2]) 0 >>> stkm.predict([1.5, 1.5]) 1 + + :param clusterCenters: Initial cluster centers. + :param clusterWeights: List of weights assigned to each cluster. """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) @property - def getClusterWeights(self): + def clusterWeights(self): """Convenience method to return the cluster weights.""" return self._clusterWeights @@ -332,13 +336,10 @@ def getClusterWeights(self): def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data - Parameters - ---------- - data: Should be a RDD that represents the new data. - - decayFactor: forgetfulness of the previous centroids. + :param data: Should be a RDD that represents the new data. + :param decayFactor: forgetfulness of the previous centroids. + :param timeUnit: Can be "batches" or "points" - timeUnit: Can be "batches" or "points" If points, then the decay factor is raised to the power of number of new points and if batches, it is used as it is. """ @@ -365,17 +366,10 @@ class StreamingKMeans(object): Provides methods to set k, decayFactor, timeUnit to train and predict the incoming data - Parameters - ---------- - k: int - Number of clusters - - decayFactor: float - Forgetfulness of the previous centroid. - - timeUnit: str, "batches" or "points" - If points, then the decayfactor is raised to the power of new - points. + :param k: int, number of clusters + :param decayFactor: float, forgetfulness of the previous centroids. + :param timeUnit: can be "batches" or "points". If points, then the + decayfactor is raised to the power of no. of new points. """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -384,10 +378,14 @@ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): raise ValueError( "timeUnit should be 'batches' or 'points', got %s." % timeUnit) self._timeUnit = timeUnit - self.latestModel = None + self._model = None + + def latestModel(self): + """Return the latest model""" + return self._model def _validate(self, dstream): - if self.latestModel is None: + if self._model is None: raise ValueError( "Initial centers should be set either by setInitialCenters " "or setRandomCenters.") @@ -416,7 +414,7 @@ def setHalfLife(self, halfLife, timeUnit): return self def setInitialCenters(self, centers, weights): - self.latestModel = StreamingKMeansModel(centers, weights) + self._model = StreamingKMeansModel(centers, weights) return self def setRandomCenters(self, dim, weight, seed): @@ -427,7 +425,7 @@ def setRandomCenters(self, dim, weight, seed): rng = random.RandomState(seed) clusterCenters = rng.randn(self._k, dim) clusterWeights = tile(weight, self._k) - self.latestModel = StreamingKMeansModel(clusterCenters, clusterWeights) + self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self def trainOn(self, dstream): @@ -435,7 +433,7 @@ def trainOn(self, dstream): self._validate(dstream) def update(rdd): - self.latestModel.update(rdd, self._decayFactor, self._timeUnit) + self._model.update(rdd, self._decayFactor, self._timeUnit) dstream.foreachRDD(update) @@ -445,7 +443,7 @@ def predictOn(self, dstream): Returns a transformed dstream object """ self._validate(dstream) - return dstream.map(lambda x: self.latestModel.predict(x)) + return dstream.map(lambda x: self._model.predict(x)) def predictOnValues(self, dstream): """ @@ -453,7 +451,7 @@ def predictOnValues(self, dstream): Returns a transformed dstream object. """ self._validate(dstream) - return dstream.mapValues(lambda x: self.latestModel.predict(x)) + return dstream.mapValues(lambda x: self._model.predict(x)) def _test(): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 146851a5e00b3..6c67b3489577a 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -79,6 +79,11 @@ def setUp(self): def tearDown(self): self.ssc.stop(False) + @staticmethod + def _ssc_wait(start_time, end_time, sleep_time): + while time() - start_time < end_time: + sleep(0.01) + def _squared_distance(a, b): if isinstance(a, Vector): @@ -878,25 +883,23 @@ def test_model_transform(self): class StreamingKMeansTest(MLLibStreamingTestCase): def test_model_params(self): + """Test that the model params are set correctly""" stkm = StreamingKMeans() stkm.setK(5).setDecayFactor(0.0) self.assertEquals(stkm._k, 5) self.assertEquals(stkm._decayFactor, 0.0) # Model not set yet. - self.assertIsNone(stkm.latestModel) + self.assertIsNone(stkm.latestModel()) self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) - self.assertEquals(stkm.latestModel.centers, [[0.0, 0.0], [1.0, 1.0]]) - self.assertEquals(stkm.latestModel.getClusterWeights, [1.0, 1.0]) - - @staticmethod - def _ssc_wait(start_time, end_time, sleep_time): - while time() - start_time < end_time: - sleep(0.01) + self.assertEquals( + stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0]) def test_accuracy_for_single_center(self): + """Test that the parameters obtained are correct for a single center.""" numBatches, numPoints, k, d, r, seed = 5, 5, 1, 5, 0.1, 0 centers, batches = self.streamingKMeansDataGenerator( numBatches, numPoints, k, d, r, seed) @@ -905,13 +908,14 @@ def test_accuracy_for_single_center(self): input_stream = self.ssc.queueStream( [self.sc.parallelize(batch, 1) for batch in batches]) stkm.trainOn(input_stream) + t = time() self.ssc.start() self._ssc_wait(t, 10.0, 0.01) - self.assertEquals(stkm.latestModel.getClusterWeights, [25.0]) + self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) realCenters = array_sum(array(centers), axis=0) for i in range(d): - modelCenters = stkm.latestModel.centers[0][i] + modelCenters = stkm.latestModel().centers[0][i] self.assertAlmostEqual(centers[0][i], modelCenters, 1) self.assertAlmostEqual(realCenters[i], modelCenters, 1) @@ -927,7 +931,7 @@ def streamingKMeansDataGenerator(self, batches, numPoints, for i in range(batches)] def test_trainOn_model(self): - # Test the model on toy data with four clusters. + """Test the model on toy data with four clusters.""" stkm = StreamingKMeans() initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] @@ -948,15 +952,16 @@ def test_trainOn_model(self): # Give enough time to train the model. self._ssc_wait(t, 6.0, 0.01) - finalModel = stkm.latestModel + finalModel = stkm.latestModel() self.assertTrue(all(finalModel.centers == array(initCenters))) - self.assertEquals(finalModel.getClusterWeights, [5.0, 5.0, 5.0, 5.0]) + self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) def test_predictOn_model(self): + """Test that the model predicts correctly on toy data.""" initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] weights = [1.0, 1.0, 1.0, 1.0] stkm = StreamingKMeans() - stkm.latestModel = StreamingKMeansModel(initCenters, weights) + stkm._model = StreamingKMeansModel(initCenters, weights) predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] predict_data = [sc.parallelize(batch, 1) for batch in predict_data] @@ -976,6 +981,35 @@ def update(rdd): self._ssc_wait(t, 6.0, 0.01) self.assertEquals(result, [[0], [1], [2], [3]]) + def test_trainOn_predictOn(self): + """Test that prediction happens on the updated model.""" + stkm = StreamingKMeans(decayFactor=0.0, k=2) + stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0]) + + # Since decay factor is set to zero, once the first batch + # is passed the clusterCenters are updated to [-0.5, 0.7] + # which causes 0.2 & 0.3 to be classified as 1, even though the + # classification based in the initial model would have been 0 + # proving that the model is updated. + batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]] + batches = [sc.parallelize(batch) for batch in batches] + input_stream = self.ssc.queueStream(batches) + predict_results = [] + + def collect(rdd): + rdd_collect = rdd.collect() + if rdd_collect: + predict_results.append(rdd_collect) + + stkm.trainOn(input_stream) + predict_stream = stkm.predictOn(input_stream) + predict_stream.foreachRDD(collect) + + t = time() + self.ssc.start() + self._ssc_wait(t, 6.0, 0.01) + self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) + if __name__ == "__main__": if not _have_scipy: From 51052d30397229d3a32ca5d1e2d4ed7d9f7ddde6 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 19 Jun 2015 11:54:13 +0530 Subject: [PATCH 10/11] Doc fixes --- docs/mllib-clustering.md | 2 +- python/pyspark/mllib/clustering.py | 71 +++++++++++++++++------------- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index d1f0a7f06c526..dcaa3784be874 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -629,7 +629,7 @@ the predicted cluster assignments on new data points as they arrive. {% highlight python %} model.trainOn(trainingData) -model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))) +print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features)))) ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index a680adaf2744a..c38229864d3b4 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -33,7 +33,8 @@ from pyspark.mllib.util import Saveable, Loader, inherit_doc from pyspark.streaming import DStream -__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture'] +__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', + 'StreamingKMeans', 'StreamingKMeansModel'] @inherit_doc @@ -273,27 +274,34 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia class StreamingKMeansModel(KMeansModel): """ .. note:: Experimental + Clustering model which can perform an online update of the centroids. The update formula for each centroid is given by - c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t] - n_t+1 = n_t * a + m_t + + * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t) + * n_t+1 = n_t * a + m_t where - c_t: Centroid at the n_th iteration. - n_t: Number of samples (or) weights associated with the centroid + + * c_t: Centroid at the n_th iteration. + * n_t: Number of samples (or) weights associated with the centroid at the n_th iteration. - x_t: Centroid of the new data closest to c_t. - m_t: Number of samples (or) weights of the new data closest to c_t - c_t+1: New centroid. - n_t+1: New number of weights. - a: Decay Factor, which gives the forgetfulness. + * x_t: Centroid of the new data closest to c_t. + * m_t: Number of samples (or) weights of the new data closest to c_t + * c_t+1: New centroid. + * n_t+1: New number of weights. + * a: Decay Factor, which gives the forgetfulness. Note that if a is set to 1, it is the weighted mean of the previous and new data. If it set to zero, the old centroids are completely forgotten. - >>> initCenters, initWeights = [[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0] + :param clusterCenters: Initial cluster centers. + :param clusterWeights: List of weights assigned to each cluster. + + >>> initCenters = [[0.0, 0.0], [1.0, 1.0]] + >>> initWeights = [1.0, 1.0] >>> stkm = StreamingKMeansModel(initCenters, initWeights) >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1], ... [0.9, 0.9], [1.1, 1.1]]) @@ -301,10 +309,10 @@ class StreamingKMeansModel(KMeansModel): >>> stkm.centers array([[ 0., 0.], [ 1., 1.]]) - >>> stkm.predict([-0.1, -0.1]) == stkm.predict([0.1, 0.1]) == 0 - True - >>> stkm.predict([0.9, 0.9]) == stkm.predict([1.1, 1.1]) == 1 - True + >>> stkm.predict([-0.1, -0.1]) + 0 + >>> stkm.predict([0.9, 0.9]) + 1 >>> stkm.clusterWeights [3.0, 3.0] >>> decayFactor = 0.0 @@ -319,9 +327,6 @@ class StreamingKMeansModel(KMeansModel): 0 >>> stkm.predict([1.5, 1.5]) 1 - - :param clusterCenters: Initial cluster centers. - :param clusterWeights: List of weights assigned to each cluster. """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) @@ -329,7 +334,7 @@ def __init__(self, clusterCenters, clusterWeights): @property def clusterWeights(self): - """Convenience method to return the cluster weights.""" + """Return the cluster weights.""" return self._clusterWeights @ignore_unicode_prefix @@ -338,13 +343,12 @@ def update(self, data, decayFactor, timeUnit): :param data: Should be a RDD that represents the new data. :param decayFactor: forgetfulness of the previous centroids. - :param timeUnit: Can be "batches" or "points" - - If points, then the decay factor is raised to the power of - number of new points and if batches, it is used as it is. + :param timeUnit: Can be "batches" or "points". If points, then the + decay factor is raised to the power of number of new + points and if batches, it is used as it is. """ if not isinstance(data, RDD): - raise TypeError("data should be of a RDD, got %s." % type(data)) + raise TypeError("Data should be of an RDD, got %s." % type(data)) data = data.map(_convert_to_vector) decayFactor = float(decayFactor) if timeUnit not in ["batches", "points"]: @@ -363,13 +367,15 @@ class StreamingKMeans(object): """ .. note:: Experimental - Provides methods to set k, decayFactor, timeUnit to train and - predict the incoming data + Provides methods to set k, decayFactor, timeUnit to configure the + KMeans algorithm for fitting and predicting on incoming dstreams. + More details on how the centroids are updated are provided under the + docs of StreamingKMeansModel. - :param k: int, number of clusters + :param k: int, number of clusters :param decayFactor: float, forgetfulness of the previous centroids. - :param timeUnit: can be "batches" or "points". If points, then the - decayfactor is raised to the power of no. of new points. + :param timeUnit: can be "batches" or "points". If points, then the + decayfactor is raised to the power of no. of new points. """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -406,14 +412,17 @@ def setDecayFactor(self, decayFactor): def setHalfLife(self, halfLife, timeUnit): """ - Set number of instances after which the centroids at - has 0.5 weightage + Set number of batches after which the centroids of that + particular batch has half the weightage. """ self._timeUnit = timeUnit self._decayFactor = exp(log(0.5) / halfLife) return self def setInitialCenters(self, centers, weights): + """ + Set initial centers. Should be set before calling trainOn. + """ self._model = StreamingKMeansModel(centers, weights) return self From 7722d16d28453e6eb57bb8fc38023b0f9a327aaa Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 19 Jun 2015 12:13:58 +0530 Subject: [PATCH 11/11] minor style fixes --- python/pyspark/mllib/tests.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 6c67b3489577a..744dc112d9209 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -893,16 +893,16 @@ def test_model_params(self): self.assertIsNone(stkm.latestModel()) self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) - stkm.setInitialCenters([[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0]) + stkm.setInitialCenters( + centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0]) self.assertEquals( stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]]) self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0]) def test_accuracy_for_single_center(self): - """Test that the parameters obtained are correct for a single center.""" - numBatches, numPoints, k, d, r, seed = 5, 5, 1, 5, 0.1, 0 + """Test that parameters obtained are correct for a single center.""" centers, batches = self.streamingKMeansDataGenerator( - numBatches, numPoints, k, d, r, seed) + batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0) stkm = StreamingKMeans(1) stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.]) input_stream = self.ssc.queueStream( @@ -914,7 +914,7 @@ def test_accuracy_for_single_center(self): self._ssc_wait(t, 10.0, 0.01) self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) realCenters = array_sum(array(centers), axis=0) - for i in range(d): + for i in range(5): modelCenters = stkm.latestModel().centers[0][i] self.assertAlmostEqual(centers[0][i], modelCenters, 1) self.assertAlmostEqual(realCenters[i], modelCenters, 1) @@ -934,8 +934,8 @@ def test_trainOn_model(self): """Test the model on toy data with four clusters.""" stkm = StreamingKMeans() initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] - weights = [1.0, 1.0, 1.0, 1.0] - stkm.setInitialCenters(initCenters, weights) + stkm.setInitialCenters( + centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0]) # Create a toy dataset by setting a tiny offest for each point. offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]] @@ -958,10 +958,10 @@ def test_trainOn_model(self): def test_predictOn_model(self): """Test that the model predicts correctly on toy data.""" - initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] - weights = [1.0, 1.0, 1.0, 1.0] stkm = StreamingKMeans() - stkm._model = StreamingKMeansModel(initCenters, weights) + stkm._model = StreamingKMeansModel( + clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]], + clusterWeights=[1.0, 1.0, 1.0, 1.0]) predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] predict_data = [sc.parallelize(batch, 1) for batch in predict_data]