Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
from numpy import array

from pyspark import RDD
from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
from pyspark.mllib.util import Saveable, Loader, inherit_doc


__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
'StreamingLogisticRegressionWithSGD']


class LinearClassificationModel(LinearModel):
Expand Down Expand Up @@ -583,6 +585,98 @@ def train(cls, data, lambda_=1.0):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))


class StreamingLinearAlgorithm(object):
"""
Base class that has to be inherited by any StreamingLinearAlgorithm.

Prevents reimplementation of methods predictOn and predictOnValues.
"""
def __init__(self, model):
self._model = model

def latestModel(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be a @property? Maybe we should update lastModel in streaming k-means as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the scala convention, where it is a method, hence

"""
Returns the latest model.
"""
return self._model

def _validate(self, dstream):
if not isinstance(dstream, DStream):
raise TypeError(
"dstream should be a DStream object, got %s" % type(dstream))
if not self._model:
raise ValueError(
"Model must be intialized using setInitialWeights")

def predictOn(self, dstream):
"""
Make predictions on a dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.map(lambda x: self._model.predict(x))

def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.mapValues(lambda x: self._model.predict(x))


@inherit_doc
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
"""
Run LogisticRegression with SGD on a stream of data.

The weights obtained at the end of training a stream are used as initial
weights for the next stream.

:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Number of iterations run for each batch of data.
:param miniBatchFraction: Fraction of data on which SGD is run for each
iteration.
:param regParam: L2 Regularization parameter.
"""
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
self.stepSize = stepSize
self.numIterations = numIterations
self.regParam = regParam
self.miniBatchFraction = miniBatchFraction
self._model = None
super(StreamingLogisticRegressionWithSGD, self).__init__(
model=self._model)

def setInitialWeights(self, initialWeights):
"""
Set the initial value of weights.

This must be set before running trainOn and predictOn.
"""
initialWeights = _convert_to_vector(initialWeights)

# LogisticRegressionWithSGD does only binary classification.
self._model = LogisticRegressionModel(
initialWeights, 0, initialWeights.size, 2)
return self

def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)

def update(rdd):
# LogisticRegressionWithSGD.train raises an error for an empty RDD.
if not rdd.isEmpty():
self._model = LogisticRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights)

dstream.foreachRDD(update)


def _test():
import doctest
from pyspark import SparkContext
Expand Down
135 changes: 134 additions & 1 deletion python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from time import time, sleep
from shutil import rmtree

from numpy import array, array_equal, zeros, inf, all, random
from numpy import (
array, array_equal, zeros, inf, random, exp, dot, all, mean)
from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError

Expand All @@ -45,6 +46,7 @@
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import Word2Vec
Expand Down Expand Up @@ -1037,6 +1039,137 @@ def test_dim(self):
self.assertEqual(len(point.features), 2)


class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):

@staticmethod
def generateLogisticInput(offset, scale, nPoints, seed):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the data generation to use #6715.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is used to generate linear data with continuous labels (which is used in StreamingLinearRegressionWithSGD) in this PR (#6744).

"""
Generate 1 / (1 + exp(-x * scale + offset))

where,
x is randomnly distributed and the threshold
and labels for each sample in x is obtained from a random uniform
distribution.
"""
rng = random.RandomState(seed)
x = rng.randn(nPoints)
sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
y_p = rng.rand(nPoints)
cut_off = y_p <= sigmoid
y_p[cut_off] = 1.0
y_p[~cut_off] = 0.0
return [
LabeledPoint(y_p[i], Vectors.dense([x[i]]))
for i in range(nPoints)]

def test_parameter_accuracy(self):
"""
Test that the final value of weights is close to the desired value.
"""
input_batches = [
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
for i in range(20)]
input_stream = self.ssc.queueStream(input_batches)

slr = StreamingLogisticRegressionWithSGD(
stepSize=0.2, numIterations=25)
slr.setInitialWeights([0.0])
slr.trainOn(input_stream)

t = time()
self.ssc.start()
self._ssc_wait(t, 20.0, 0.01)
rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
self.assertAlmostEqual(rel, 0.1, 1)

def test_convergence(self):
"""
Test that weights converge to the required value on toy data.
"""
input_batches = [
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
for i in range(20)]
input_stream = self.ssc.queueStream(input_batches)
models = []

slr = StreamingLogisticRegressionWithSGD(
stepSize=0.2, numIterations=25)
slr.setInitialWeights([0.0])
slr.trainOn(input_stream)
input_stream.foreachRDD(
lambda x: models.append(slr.latestModel().weights[0]))

t = time()
self.ssc.start()
self._ssc_wait(t, 15.0, 0.01)
t_models = array(models)
diff = t_models[1:] - t_models[:-1]

# Test that weights improve with a small tolerance,
self.assertTrue(all(diff >= -0.1))
self.assertTrue(array_sum(diff > 0) > 1)

@staticmethod
def calculate_accuracy_error(true, predicted):
return sum(abs(array(true) - array(predicted))) / len(true)

def test_predictions(self):
"""Test predicted values on a toy model."""
input_batches = []
for i in range(20):
batch = self.sc.parallelize(
self.generateLogisticInput(0, 1.5, 100, 42 + i))
input_batches.append(batch.map(lambda x: (x.label, x.features)))
input_stream = self.ssc.queueStream(input_batches)

slr = StreamingLogisticRegressionWithSGD(
stepSize=0.2, numIterations=25)
slr.setInitialWeights([1.5])
predict_stream = slr.predictOnValues(input_stream)
true_predicted = []
predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
t = time()
self.ssc.start()
self._ssc_wait(t, 5.0, 0.01)

# Test that the accuracy error is no more than 0.4 on each batch.
for batch in true_predicted:
true, predicted = zip(*batch)
self.assertTrue(
self.calculate_accuracy_error(true, predicted) < 0.4)

def test_training_and_prediction(self):
"""Test that the model improves on toy data with no. of batches"""
input_batches = [
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
for i in range(20)]
predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]

slr = StreamingLogisticRegressionWithSGD(
stepSize=0.01, numIterations=25)
slr.setInitialWeights([-0.1])
errors = []

def collect_errors(rdd):
true, predicted = zip(*rdd.collect())
errors.append(self.calculate_accuracy_error(true, predicted))

true_predicted = []
input_stream = self.ssc.queueStream(input_batches)
predict_stream = self.ssc.queueStream(predict_batches)
slr.trainOn(input_stream)
ps = slr.predictOnValues(predict_stream)
ps.foreachRDD(lambda x: collect_errors(x))

t = time()
self.ssc.start()
self._ssc_wait(t, 20.0, 0.01)

# Test that the improvement in error is atleast 0.3
self.assertTrue(errors[1] - errors[-1] > 0.3)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down