Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/mllib-dimensionality-reduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ Applications](quick-start.html#self-contained-applications) section of the Spark
quick-start guide. Be sure to also include *spark-mllib* to your build file as
a dependency.

</div>
<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.mllib.linalg.distributed import RowMatrix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now days we tend write new examples separately and then use the include example syntax to bring them

from numpy.random import RandomState

# Generate random data with 50 samples and 30 features.
rng = RandomState(0)
mat = RowMatrix(sc.parallelize(rng.randn(50, 30)))

# Compute the top 20 singular values and corresponding singular vectors.
svd = mat.computeSVD(20, computeU=True)
u = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
{% endhighlight %}

The same code applies to `IndexedRowMatrix` if `U` is defined as an
`IndexedRowMatrix`.
</div>
</div>

Expand Down Expand Up @@ -124,6 +143,29 @@ Refer to the [`RowMatrix` Java docs](api/java/org/apache/spark/mllib/linalg/dist

{% include_example java/org/apache/spark/examples/mllib/JavaPCAExample.java %}

</div>

<div data-lang="python" markdown="1">

The following code demonstrates how to compute principal components on a `RowMatrix`
and use them to project the vectors into a low-dimensional space.

{% highlight python %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

from pyspark.mllib.linalg.distributed import RowMatrix
from numpy.random import RandomState

# Generate random data with 50 samples and 30 features.
rng = RandomState(0)
data = sc.parallelize(rng.randn(50, 30))
mat = RowMatrix(data)

# Compute the top 10 principal components stored in a local dense matrix.
pc = rm.computePrincipalComponents(10)

# Project the rows to the linear space spanned by the top 10 principal components.
projected = rm.multiply(pc)
{% endhighlight %}

</div>
</div>

Expand Down
173 changes: 172 additions & 1 deletion python/pyspark/mllib/linalg/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from pyspark import RDD, since
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
from pyspark.mllib.linalg import _convert_to_vector, DenseMatrix, Matrix, QRDecomposition
from pyspark.mllib.stat import MultivariateStatisticalSummary
from pyspark.storagelevel import StorageLevel

Expand Down Expand Up @@ -303,6 +303,121 @@ def tallSkinnyQR(self, computeQ=False):
R = decomp.call("R")
return QRDecomposition(Q, R)

def computeSVD(self, k, computeU=False, rCond=1e-9):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a since annotation here

"""
Computes the singular value decomposition of the RowMatrix.

The given row matrix A of dimension (m X n) is decomposed into
U * s * V'T where

* U: (m X k) (left singular vectors) is a RowMatrix whose
columns are the eigenvectors of (A X A')
* s: DenseVector consisting of square root of the eigenvalues
(singular values) in descending order.
* v: (n X k) (right singular vectors) is a Matrix whose columns
are the eigenvectors of (A' X A)

For more specific details on implementation, please refer
the scala documentation.

:param k: Set the number of singular values to keep.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to copy the longer description from RowMatrix for the k param

:param computeU: Whether or not to compute U. If set to be
True, then U is computed by A * V * s^-1
:param rCond: Reciprocal condition number. All singular values
smaller than rCond * s[0] are treated as zero
where s[0] is the largest singular value.
:returns: SingularValueDecomposition object

>>> data = [(3, 1, 1), (-1, 3, 1)]
>>> rm = RowMatrix(sc.parallelize(data))
>>> svd_model = rm.computeSVD(2, True)
>>> svd_model.U.rows.collect()
[DenseVector([-0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])]
>>> svd_model.s
DenseVector([3.4641, 3.1623])
>>> svd_model.V
DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
"""
j_model = self._java_matrix_wrapper.call(
"computeSVD", int(k), bool(computeU), float(rCond))
return SingularValueDecomposition(j_model)

def computePrincipalComponents(self, k):
"""
Computes the k principal components of the given row matrix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to copy the warnings form RowMatrix here as well.


:param k: Number of principal components to keep.
:returns: DenseMatrix

>>> data = sc.parallelize([[1, 2, 3], [2, 4, 5], [3, 6, 1]])
>>> rm = RowMatrix(data)

>>> # Returns the two principal components of rm
>>> pca = rm.computePrincipalComponents(2)
>>> pca
DenseMatrix(3, 2, [-0.349, -0.6981, 0.6252, -0.2796, -0.5592, -0.7805], 0)

>>> # Transform into new dimensions with the greatest variance.
>>> rm.multiply(pca).rows.collect() # doctest: +NORMALIZE_WHITESPACE
[DenseVector([0.1305, -3.7394]), DenseVector([-0.3642, -6.6983]), \
DenseVector([-4.6102, -4.9745])]
"""
return self._java_matrix_wrapper.call("computePrincipalComponents", k)

def multiply(self, matrix):
"""
Multiplies the given RowMatrix with another matrix.

:param matrix: Matrix to multiply with.
:returns: RowMatrix

>>> rm = RowMatrix(sc.parallelize([[0, 1], [2, 3]]))
>>> rm.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
[DenseVector([2.0, 3.0]), DenseVector([6.0, 11.0])]
"""
if not isinstance(matrix, DenseMatrix):
raise ValueError("Only multiplication with DenseMatrix "
"is supported.")
j_model = self._java_matrix_wrapper.call("multiply", matrix)
return RowMatrix(j_model)


class SingularValueDecomposition(JavaModelWrapper):
"""Wrapper around the SingularValueDecomposition scala case class"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably add a versionAdded


@property
def U(self):
"""
Returns a distributed matrix whose columns are the left
singular vectors of the SingularValueDecomposition if
computeU was set to be True.
"""
u = self.call("U")
if u is not None:
mat_name = u.getClass().getSimpleName()
if mat_name == "RowMatrix":
return RowMatrix(u)
elif mat_name == "IndexedRowMatrix":
return IndexedRowMatrix(u)
else:
raise TypeError("Expected RowMatrix/IndexedRowMatrix got %s" % mat_name)

@property
def s(self):
"""
Returns a DenseVector with singular values in
descending order.
"""
return self.call("s")

@property
def V(self):
"""
Returns a DenseMatrix whose columns are the right singular
vectors of the SingularValueDecomposition.
"""
return self.call("V")


class IndexedRow(object):
"""
Expand Down Expand Up @@ -533,6 +648,62 @@ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
colsPerBlock)
return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)

def computeSVD(self, k, computeU=False, rCond=1e-9):
"""
Computes the singular value decomposition of the IndexedRowMatrix.

The given row matrix A of dimension (m X n) is decomposed into
U * s * V'T where

* U: (m X k) (left singular vectors) is a IndexedRowMatrix
whose columns are the eigenvectors of (A X A')
* s: DenseVector consisting of square root of the eigenvalues
(singular values) in descending order.
* v: (n X k) (right singular vectors) is a Matrix whose columns
are the eigenvectors of (A' X A)

For more specific details on implementation, please refer
the scala documentation.

:param k: Set the number of singular values to keep.
:param computeU: Whether or not to compute U. If set to be
True, then U is computed by A * V * s^-1
:param rCond: Reciprocal condition number. All singular values
smaller than rCond * s[0] are treated as zero
where s[0] is the largest singular value.
:returns: SingularValueDecomposition object

>>> data = [(0, (3, 1, 1)), (1, (-1, 3, 1))]
>>> irm = IndexedRowMatrix(sc.parallelize(data))
>>> svd_model = irm.computeSVD(2, True)
>>> svd_model.U.rows.collect() # doctest: +NORMALIZE_WHITESPACE
[IndexedRow(0, [-0.707106781187,0.707106781187]),\
IndexedRow(1, [-0.707106781187,-0.707106781187])]
>>> svd_model.s
DenseVector([3.4641, 3.1623])
>>> svd_model.V
DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0)
"""
j_model = self._java_matrix_wrapper.call(
"computeSVD", int(k), bool(computeU), float(rCond))
return SingularValueDecomposition(j_model)

def multiply(self, matrix):
"""
Multiplies the given IndexedRowMatrix with another matrix.

:param matrix: Matrix to multiply with.
:returns: IndexedRowMatrix

>>> mat = IndexedRowMatrix(sc.parallelize([(0, (0, 1)), (1, (2, 3))]))
>>> mat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
[IndexedRow(0, [2.0,3.0]), IndexedRow(1, [6.0,11.0])]
"""
if not isinstance(matrix, DenseMatrix):
raise ValueError("Only multiplication with DenseMatrix "
"is supported.")
return IndexedRowMatrix(self._java_matrix_wrapper.call("multiply", matrix))


class MatrixEntry(object):
"""
Expand Down
63 changes: 63 additions & 0 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sys
import tempfile
import array as pyarray
from math import sqrt
from time import time, sleep
from shutil import rmtree

Expand Down Expand Up @@ -53,6 +54,7 @@
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
Expand Down Expand Up @@ -1608,6 +1610,67 @@ def test_binary_term_freqs(self):
": expected " + str(expected[i]) + ", got " + str(output[i]))


class DimensionalityReductionTests(MLlibTestCase):

denseData = [
Vectors.dense([0.0, 1.0, 2.0]),
Vectors.dense([3.0, 4.0, 5.0]),
Vectors.dense([6.0, 7.0, 8.0]),
Vectors.dense([9.0, 0.0, 1.0])
]
sparseData = [
Vectors.sparse(3, [(1, 1.0), (2, 2.0)]),
Vectors.sparse(3, [(0, 3.0), (1, 4.0), (2, 5.0)]),
Vectors.sparse(3, [(0, 6.0), (1, 7.0), (2, 8.0)]),
Vectors.sparse(3, [(0, 9.0), (2, 1.0)])
]

def assertEqualUpToSign(self, vecA, vecB):
eq1 = vecA - vecB
eq2 = vecA + vecB
self.assertTrue(sum(abs(eq1)) < 1e-6 or sum(abs(eq2)) < 1e-6)

def test_svd(self):
denseMat = RowMatrix(self.sc.parallelize(self.denseData))
sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
m = 4
n = 3
for mat in [denseMat, sparseMat]:
for k in range(1, 4):
rm = mat.computeSVD(k, computeU=True)
self.assertEqual(rm.s.size, k)
self.assertEqual(rm.U.numRows(), m)
self.assertEqual(rm.U.numCols(), k)
self.assertEqual(rm.V.numRows, n)
self.assertEqual(rm.V.numCols, k)

# Test that U returned is None if computeU is set to False.
self.assertEqual(mat.computeSVD(1).U, None)

# Test that low rank matrices cannot have number of singular values
# greater than a limit.
rm = RowMatrix(self.sc.parallelize(tile([1, 2, 3], (3, 1))))
self.assertEqual(rm.computeSVD(3, False, 1e-6).s.size, 1)

def test_pca(self):
expected_pcs = array([
[0.0, 1.0, 0.0],
[sqrt(2.0) / 2.0, 0.0, sqrt(2.0) / 2.0],
[sqrt(2.0) / 2.0, 0.0, -sqrt(2.0) / 2.0]
])
n = 3
denseMat = RowMatrix(self.sc.parallelize(self.denseData))
sparseMat = RowMatrix(self.sc.parallelize(self.sparseData))
for mat in [denseMat, sparseMat]:
for k in range(1, 4):
pcs = mat.computePrincipalComponents(k)
self.assertEqual(pcs.numRows, n)
self.assertEqual(pcs.numCols, k)

# We can just test the updated principal component for equality.
self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1])


if __name__ == "__main__":
from pyspark.mllib.tests import *
if not _have_scipy:
Expand Down