Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,58 @@ will get better!

</div>

<div data-lang="python" markdown="1">

First, we import the necessary classes for parsing our input data and creating the model.

{% highlight python %}
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
{% endhighlight %}

Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
for more info. For this example, we use labeled points in training and testing streams,
but in practice you will likely want to use unlabeled vectors for test data.

{% highlight python %}
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(',')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)

trainingData = ssc.textFileStream("/training/data/dir").map(parse).cache()
testData = ssc.textFileStream("/testing/data/dir").map(parse)
{% endhighlight %}

We create our model by initializing the weights to 0

{% highlight python %}
numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])
{% endhighlight %}

Now we register the streams for training and testing and start the job.

{% highlight python %}
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
{% endhighlight %}

We can now save text files with data to the training or testing folders.
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
As you feed more data to the training directory, the predictions
will get better!

</div>

</div>


Expand Down
50 changes: 5 additions & 45 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
from pyspark.mllib.regression import (
LabeledPoint, LinearModel, _regression_train_wrapper,
StreamingLinearAlgorithm)
from pyspark.mllib.util import Saveable, Loader, inherit_doc


Expand Down Expand Up @@ -585,55 +587,13 @@ def train(cls, data, lambda_=1.0):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))


class StreamingLinearAlgorithm(object):
"""
Base class that has to be inherited by any StreamingLinearAlgorithm.

Prevents reimplementation of methods predictOn and predictOnValues.
"""
def __init__(self, model):
self._model = model

def latestModel(self):
"""
Returns the latest model.
"""
return self._model

def _validate(self, dstream):
if not isinstance(dstream, DStream):
raise TypeError(
"dstream should be a DStream object, got %s" % type(dstream))
if not self._model:
raise ValueError(
"Model must be intialized using setInitialWeights")

def predictOn(self, dstream):
"""
Make predictions on a dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.map(lambda x: self._model.predict(x))

def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.mapValues(lambda x: self._model.predict(x))


@inherit_doc
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
"""
Run LogisticRegression with SGD on a stream of data.
Run LogisticRegression with SGD on a batch of data.

The weights obtained at the end of training a stream are used as initial
weights for the next stream.
weights for the next batch.

:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Number of iterations run for each batch of data.
Expand Down
90 changes: 90 additions & 0 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from numpy import array

from pyspark import RDD
from pyspark.streaming.dstream import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader
Expand Down Expand Up @@ -570,6 +571,95 @@ def train(cls, data, isotonic=True):
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)


class StreamingLinearAlgorithm(object):
"""
Base class that has to be inherited by any StreamingLinearAlgorithm.

Prevents reimplementation of methods predictOn and predictOnValues.
"""
def __init__(self, model):
self._model = model

def latestModel(self):
"""
Returns the latest model.
"""
return self._model

def _validate(self, dstream):
if not isinstance(dstream, DStream):
raise TypeError(
"dstream should be a DStream object, got %s" % type(dstream))
if not self._model:
raise ValueError(
"Model must be intialized using setInitialWeights")

def predictOn(self, dstream):
"""
Make predictions on a dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.map(lambda x: self._model.predict(x))

def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.

:return: Transformed dstream object.
"""
self._validate(dstream)
return dstream.mapValues(lambda x: self._model.predict(x))


@inherit_doc
class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
"""
Run LinearRegression with SGD on a batch of data.

The problem minimized is (1 / n_samples) * (y - weights'X)**2.
After training on a batch of data, the weights obtained at the end of
training are used as initial weights for the next batch.

:param: stepSize Step size for each iteration of gradient descent.
:param: numIterations Total number of iterations run.
:param: miniBatchFraction Fraction of data on which SGD is run for each
iteration.
"""
def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
self.stepSize = stepSize
self.numIterations = numIterations
self.miniBatchFraction = miniBatchFraction
self._model = None
super(StreamingLinearRegressionWithSGD, self).__init__(
model=self._model)

def setInitialWeights(self, initialWeights):
"""
Set the initial value of weights.

This must be set before running trainOn and predictOn
"""
initialWeights = _convert_to_vector(initialWeights)
self._model = LinearRegressionModel(initialWeights, 0)
return self

def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)

def update(rdd):
# LinearRegressionWithSGD.train raises an error for an empty RDD.
if not rdd.isEmpty():
self._model = LinearRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
self._model.intercept)

dstream.foreachRDD(update)


def _test():
import doctest
from pyspark import SparkContext
Expand Down
124 changes: 122 additions & 2 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
from shutil import rmtree

from numpy import (
array, array_equal, zeros, inf, random, exp, dot, all, mean)
array, array_equal, zeros, inf, random, exp, dot, all, mean, abs)
from numpy import sum as array_sum

from py4j.protocol import Py4JJavaError

if sys.version_info[:2] <= (2, 6):
Expand All @@ -45,8 +46,8 @@
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import Word2Vec
Expand All @@ -56,6 +57,7 @@
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext

_have_scipy = False
try:
Expand Down Expand Up @@ -1170,6 +1172,124 @@ def collect_errors(rdd):
self.assertTrue(errors[1] - errors[-1] > 0.3)


class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):

def assertArrayAlmostEqual(self, array1, array2, dec):
for i, j in array1, array2:
self.assertAlmostEqual(i, j, dec)

def test_parameter_accuracy(self):
"""Test that coefs are predicted accurately by fitting on toy data."""

# Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
# (10, 10)
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
slr.setInitialWeights([0.0, 0.0])
xMean = [0.0, 0.0]
xVariance = [1.0 / 3.0, 1.0 / 3.0]

# Create ten batches with 100 sample points in each.
batches = []
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))

input_stream = self.ssc.queueStream(batches)
t = time()
slr.trainOn(input_stream)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)
self.assertArrayAlmostEqual(
slr.latestModel().weights.array, [10., 10.], 1)
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)

def test_parameter_convergence(self):
"""Test that the model parameters improve with streaming data."""
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
slr.setInitialWeights([0.0])

# Create ten batches with 100 sample points in each.
batches = []
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))

model_weights = []
input_stream = self.ssc.queueStream(batches)
input_stream.foreachRDD(
lambda x: model_weights.append(slr.latestModel().weights[0]))
t = time()
slr.trainOn(input_stream)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)

model_weights = array(model_weights)
diff = model_weights[1:] - model_weights[:-1]
self.assertTrue(all(diff >= -0.1))

def test_prediction(self):
"""Test prediction on a model with weights already set."""
# Create a model with initial Weights equal to coefs
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
slr.setInitialWeights([10.0, 10.0])

# Create ten batches with 100 sample points in each.
batches = []
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
100, 42 + i, 0.1)
batches.append(
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))

input_stream = self.ssc.queueStream(batches)
t = time()
output_stream = slr.predictOnValues(input_stream)
samples = []
output_stream.foreachRDD(lambda x: samples.append(x.collect()))

self.ssc.start()
self._ssc_wait(t, 5, 0.01)

# Test that mean absolute error on each batch is less than 0.1
for batch in samples:
true, predicted = zip(*batch)
self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)

def test_train_prediction(self):
"""Test that error on test data improves as model is trained."""
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
slr.setInitialWeights([0.0])

# Create ten batches with 100 sample points in each.
batches = []
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))

predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
mean_absolute_errors = []

def func(rdd):
true, predicted = zip(*rdd.collect())
mean_absolute_errors.append(mean(abs(true) - abs(predicted)))

model_weights = []
input_stream = self.ssc.queueStream(batches)
output_stream = self.ssc.queueStream(predict_batches)
t = time()
slr.trainOn(input_stream)
output_stream = slr.predictOnValues(output_stream)
output_stream.foreachRDD(func)
self.ssc.start()
self._ssc_wait(t, 10, 0.01)
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down