diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index cceddce9f79a..b1b11610ddff 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -83,6 +83,25 @@ Applications](quick-start.html#self-contained-applications) section of the Spark quick-start guide. Be sure to also include *spark-mllib* to your build file as a dependency. + +
+{% highlight python %} +from pyspark.mllib.linalg.distributed import RowMatrix +from numpy.random import RandomState + +# Generate random data with 50 samples and 30 features. +rng = RandomState(0) +mat = RowMatrix(sc.parallelize(rng.randn(50, 30))) + +# Compute the top 20 singular values and corresponding singular vectors. +svd = mat.computeSVD(20, computeU=True) +u = svd.U # The U factor is a RowMatrix. +s = svd.s # The singular values are stored in a local dense vector. +V = svd.V # The V factor is a local dense matrix. +{% endhighlight %} + +The same code applies to `IndexedRowMatrix` if `U` is defined as an +`IndexedRowMatrix`.
@@ -124,6 +143,29 @@ Refer to the [`RowMatrix` Java docs](api/java/org/apache/spark/mllib/linalg/dist {% include_example java/org/apache/spark/examples/mllib/JavaPCAExample.java %} + + +
+ +The following code demonstrates how to compute principal components on a `RowMatrix` +and use them to project the vectors into a low-dimensional space. + +{% highlight python %} +from pyspark.mllib.linalg.distributed import RowMatrix +from numpy.random import RandomState + +# Generate random data with 50 samples and 30 features. +rng = RandomState(0) +data = sc.parallelize(rng.randn(50, 30)) +mat = RowMatrix(data) + +# Compute the top 10 principal components stored in a local dense matrix. +pc = rm.computePrincipalComponents(10) + +# Project the rows to the linear space spanned by the top 10 principal components. +projected = rm.multiply(pc) +{% endhighlight %} +
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index ea4f27cf4ffe..4a2b53c73179 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -28,7 +28,7 @@ from pyspark import RDD, since from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition +from pyspark.mllib.linalg import _convert_to_vector, DenseMatrix, Matrix, QRDecomposition from pyspark.mllib.stat import MultivariateStatisticalSummary from pyspark.storagelevel import StorageLevel @@ -303,6 +303,121 @@ def tallSkinnyQR(self, computeQ=False): R = decomp.call("R") return QRDecomposition(Q, R) + def computeSVD(self, k, computeU=False, rCond=1e-9): + """ + Computes the singular value decomposition of the RowMatrix. + + The given row matrix A of dimension (m X n) is decomposed into + U * s * V'T where + + * U: (m X k) (left singular vectors) is a RowMatrix whose + columns are the eigenvectors of (A X A') + * s: DenseVector consisting of square root of the eigenvalues + (singular values) in descending order. + * v: (n X k) (right singular vectors) is a Matrix whose columns + are the eigenvectors of (A' X A) + + For more specific details on implementation, please refer + the scala documentation. + + :param k: Set the number of singular values to keep. + :param computeU: Whether or not to compute U. If set to be + True, then U is computed by A * V * s^-1 + :param rCond: Reciprocal condition number. All singular values + smaller than rCond * s[0] are treated as zero + where s[0] is the largest singular value. + :returns: SingularValueDecomposition object + + >>> data = [(3, 1, 1), (-1, 3, 1)] + >>> rm = RowMatrix(sc.parallelize(data)) + >>> svd_model = rm.computeSVD(2, True) + >>> svd_model.U.rows.collect() + [DenseVector([-0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])] + >>> svd_model.s + DenseVector([3.4641, 3.1623]) + >>> svd_model.V + DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0) + """ + j_model = self._java_matrix_wrapper.call( + "computeSVD", int(k), bool(computeU), float(rCond)) + return SingularValueDecomposition(j_model) + + def computePrincipalComponents(self, k): + """ + Computes the k principal components of the given row matrix + + :param k: Number of principal components to keep. + :returns: DenseMatrix + + >>> data = sc.parallelize([[1, 2, 3], [2, 4, 5], [3, 6, 1]]) + >>> rm = RowMatrix(data) + + >>> # Returns the two principal components of rm + >>> pca = rm.computePrincipalComponents(2) + >>> pca + DenseMatrix(3, 2, [-0.349, -0.6981, 0.6252, -0.2796, -0.5592, -0.7805], 0) + + >>> # Transform into new dimensions with the greatest variance. + >>> rm.multiply(pca).rows.collect() # doctest: +NORMALIZE_WHITESPACE + [DenseVector([0.1305, -3.7394]), DenseVector([-0.3642, -6.6983]), \ + DenseVector([-4.6102, -4.9745])] + """ + return self._java_matrix_wrapper.call("computePrincipalComponents", k) + + def multiply(self, matrix): + """ + Multiplies the given RowMatrix with another matrix. + + :param matrix: Matrix to multiply with. + :returns: RowMatrix + + >>> rm = RowMatrix(sc.parallelize([[0, 1], [2, 3]])) + >>> rm.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect() + [DenseVector([2.0, 3.0]), DenseVector([6.0, 11.0])] + """ + if not isinstance(matrix, DenseMatrix): + raise ValueError("Only multiplication with DenseMatrix " + "is supported.") + j_model = self._java_matrix_wrapper.call("multiply", matrix) + return RowMatrix(j_model) + + +class SingularValueDecomposition(JavaModelWrapper): + """Wrapper around the SingularValueDecomposition scala case class""" + + @property + def U(self): + """ + Returns a distributed matrix whose columns are the left + singular vectors of the SingularValueDecomposition if + computeU was set to be True. + """ + u = self.call("U") + if u is not None: + mat_name = u.getClass().getSimpleName() + if mat_name == "RowMatrix": + return RowMatrix(u) + elif mat_name == "IndexedRowMatrix": + return IndexedRowMatrix(u) + else: + raise TypeError("Expected RowMatrix/IndexedRowMatrix got %s" % mat_name) + + @property + def s(self): + """ + Returns a DenseVector with singular values in + descending order. + """ + return self.call("s") + + @property + def V(self): + """ + Returns a DenseMatrix whose columns are the right singular + vectors of the SingularValueDecomposition. + """ + return self.call("V") + class IndexedRow(object): """ @@ -533,6 +648,62 @@ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): colsPerBlock) return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock) + def computeSVD(self, k, computeU=False, rCond=1e-9): + """ + Computes the singular value decomposition of the IndexedRowMatrix. + + The given row matrix A of dimension (m X n) is decomposed into + U * s * V'T where + + * U: (m X k) (left singular vectors) is a IndexedRowMatrix + whose columns are the eigenvectors of (A X A') + * s: DenseVector consisting of square root of the eigenvalues + (singular values) in descending order. + * v: (n X k) (right singular vectors) is a Matrix whose columns + are the eigenvectors of (A' X A) + + For more specific details on implementation, please refer + the scala documentation. + + :param k: Set the number of singular values to keep. + :param computeU: Whether or not to compute U. If set to be + True, then U is computed by A * V * s^-1 + :param rCond: Reciprocal condition number. All singular values + smaller than rCond * s[0] are treated as zero + where s[0] is the largest singular value. + :returns: SingularValueDecomposition object + + >>> data = [(0, (3, 1, 1)), (1, (-1, 3, 1))] + >>> irm = IndexedRowMatrix(sc.parallelize(data)) + >>> svd_model = irm.computeSVD(2, True) + >>> svd_model.U.rows.collect() # doctest: +NORMALIZE_WHITESPACE + [IndexedRow(0, [-0.707106781187,0.707106781187]),\ + IndexedRow(1, [-0.707106781187,-0.707106781187])] + >>> svd_model.s + DenseVector([3.4641, 3.1623]) + >>> svd_model.V + DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, 0.0], 0) + """ + j_model = self._java_matrix_wrapper.call( + "computeSVD", int(k), bool(computeU), float(rCond)) + return SingularValueDecomposition(j_model) + + def multiply(self, matrix): + """ + Multiplies the given IndexedRowMatrix with another matrix. + + :param matrix: Matrix to multiply with. + :returns: IndexedRowMatrix + + >>> mat = IndexedRowMatrix(sc.parallelize([(0, (0, 1)), (1, (2, 3))])) + >>> mat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect() + [IndexedRow(0, [2.0,3.0]), IndexedRow(1, [6.0,11.0])] + """ + if not isinstance(matrix, DenseMatrix): + raise ValueError("Only multiplication with DenseMatrix " + "is supported.") + return IndexedRowMatrix(self._java_matrix_wrapper.call("multiply", matrix)) + class MatrixEntry(object): """ diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 74cf7bb8eaf9..3219619bdc17 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -23,6 +23,7 @@ import sys import tempfile import array as pyarray +from math import sqrt from time import time, sleep from shutil import rmtree @@ -53,6 +54,7 @@ from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT +from pyspark.mllib.linalg.distributed import RowMatrix from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD from pyspark.mllib.recommendation import Rating from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD @@ -1608,6 +1610,67 @@ def test_binary_term_freqs(self): ": expected " + str(expected[i]) + ", got " + str(output[i])) +class DimensionalityReductionTests(MLlibTestCase): + + denseData = [ + Vectors.dense([0.0, 1.0, 2.0]), + Vectors.dense([3.0, 4.0, 5.0]), + Vectors.dense([6.0, 7.0, 8.0]), + Vectors.dense([9.0, 0.0, 1.0]) + ] + sparseData = [ + Vectors.sparse(3, [(1, 1.0), (2, 2.0)]), + Vectors.sparse(3, [(0, 3.0), (1, 4.0), (2, 5.0)]), + Vectors.sparse(3, [(0, 6.0), (1, 7.0), (2, 8.0)]), + Vectors.sparse(3, [(0, 9.0), (2, 1.0)]) + ] + + def assertEqualUpToSign(self, vecA, vecB): + eq1 = vecA - vecB + eq2 = vecA + vecB + self.assertTrue(sum(abs(eq1)) < 1e-6 or sum(abs(eq2)) < 1e-6) + + def test_svd(self): + denseMat = RowMatrix(self.sc.parallelize(self.denseData)) + sparseMat = RowMatrix(self.sc.parallelize(self.sparseData)) + m = 4 + n = 3 + for mat in [denseMat, sparseMat]: + for k in range(1, 4): + rm = mat.computeSVD(k, computeU=True) + self.assertEqual(rm.s.size, k) + self.assertEqual(rm.U.numRows(), m) + self.assertEqual(rm.U.numCols(), k) + self.assertEqual(rm.V.numRows, n) + self.assertEqual(rm.V.numCols, k) + + # Test that U returned is None if computeU is set to False. + self.assertEqual(mat.computeSVD(1).U, None) + + # Test that low rank matrices cannot have number of singular values + # greater than a limit. + rm = RowMatrix(self.sc.parallelize(tile([1, 2, 3], (3, 1)))) + self.assertEqual(rm.computeSVD(3, False, 1e-6).s.size, 1) + + def test_pca(self): + expected_pcs = array([ + [0.0, 1.0, 0.0], + [sqrt(2.0) / 2.0, 0.0, sqrt(2.0) / 2.0], + [sqrt(2.0) / 2.0, 0.0, -sqrt(2.0) / 2.0] + ]) + n = 3 + denseMat = RowMatrix(self.sc.parallelize(self.denseData)) + sparseMat = RowMatrix(self.sc.parallelize(self.sparseData)) + for mat in [denseMat, sparseMat]: + for k in range(1, 4): + pcs = mat.computePrincipalComponents(k) + self.assertEqual(pcs.numRows, n) + self.assertEqual(pcs.numCols, k) + + # We can just test the updated principal component for equality. + self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1]) + + if __name__ == "__main__": from pyspark.mllib.tests import * if not _have_scipy: