diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 70e23033c875..6aeeb5773b8f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -18,13 +18,17 @@ package org.apache.spark.mllib.linalg import com.github.fommil.netlib.{BLAS => NetlibBLAS, F2jBLAS} +import com.github.fommil.netlib.BLAS.{getInstance => NativeBLAS} + +import org.apache.spark.Logging /** * BLAS routines for MLlib's vectors and matrices. */ -private[mllib] object BLAS extends Serializable { +object BLAS extends Serializable with Logging { @transient private var _f2jBLAS: NetlibBLAS = _ + @transient private var _nativeBLAS: NetlibBLAS = _ // For level-1 routines, we use Java implementation. private def f2jBLAS: NetlibBLAS = { @@ -197,4 +201,463 @@ private[mllib] object BLAS extends Serializable { throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.") } } + + // For level-3 routines, we use the native BLAS. + private def nativeBLAS: NetlibBLAS = { + if (_nativeBLAS == null) { + _nativeBLAS = NativeBLAS + } + _nativeBLAS + } + + /** + * C := alpha * A * B + beta * C + * @param transA whether to use the transpose of matrix A (true), or A itself (false). + * @param transB whether to use the transpose of matrix B (true), or B itself (false). + * @param alpha a scalar to scale the multiplication A * B. + * @param A the matrix A that will be left multiplied to B. Size of m x k. + * @param B the matrix B that will be left multiplied by A. Size of k x n. + * @param beta a scalar that can be used to scale matrix C. + * @param C the resulting matrix C. Size of m x n. + */ + def gemm( + transA: Boolean, + transB: Boolean, + alpha: Double, + A: Matrix, + B: Matrix, + beta: Double, + C: DenseMatrix): Unit = { + if (alpha == 0.0) { + logDebug("gemm: alpha is equal to 0. Returning C.") + } else { + A match { + case sparse: SparseMatrix => + B match { + case dB: DenseMatrix => gemm(transA, transB, alpha, sparse, dB, beta, C) + case sB: SparseMatrix => + throw new IllegalArgumentException(s"gemm doesn't support sparse-sparse matrix " + + s"multiplication") + case _ => + throw new IllegalArgumentException(s"gemm doesn't support matrix type ${B.getClass}.") + } + case dense: DenseMatrix => + B match { + case dB: DenseMatrix => gemm(transA, transB, alpha, dense, dB, beta, C) + case sB: SparseMatrix => gemm(transA, transB, alpha, dense, sB, beta, C) + case _ => + throw new IllegalArgumentException(s"gemm doesn't support matrix type ${B.getClass}.") + } + case _ => + throw new IllegalArgumentException(s"gemm doesn't support matrix type ${A.getClass}.") + } + } + } + + /** + * C := alpha * A * B + beta * C + * + * @param alpha a scalar to scale the multiplication A * B. + * @param A the matrix A that will be left multiplied to B. Size of m x k. + * @param B the matrix B that will be left multiplied by A. Size of k x n. + * @param beta a scalar that can be used to scale matrix C. + * @param C the resulting matrix C. Size of m x n. + */ + def gemm( + alpha: Double, + A: Matrix, + B: Matrix, + beta: Double, + C: DenseMatrix): Unit = { + gemm(false, false, alpha, A, B, beta, C) + } + + /** + * C := alpha * A * B + beta * C + * For `DenseMatrix` A. + */ + private def gemm( + transA: Boolean, + transB: Boolean, + alpha: Double, + A: DenseMatrix, + B: DenseMatrix, + beta: Double, + C: DenseMatrix): Unit = { + val mA: Int = if (!transA) A.numRows else A.numCols + val nB: Int = if (!transB) B.numCols else B.numRows + val kA: Int = if (!transA) A.numCols else A.numRows + val kB: Int = if (!transB) B.numRows else B.numCols + val tAstr = if (!transA) "N" else "T" + val tBstr = if (!transB) "N" else "T" + + require(kA == kB, s"The columns of A don't match the rows of B. A: $kA, B: $kB") + require(mA == C.numRows, s"The rows of C don't match the rows of A. C: ${C.numRows}, A: $mA") + require(nB == C.numCols, + s"The columns of C don't match the columns of B. C: ${C.numCols}, A: $nB") + + nativeBLAS.dgemm(tAstr, tBstr, mA, nB, kA, alpha, A.values, A.numRows, B.values, B.numRows, + beta, C.values, C.numRows) + } + + /** + * C := alpha * A * B + beta * C + * For `SparseMatrix` A. + */ + private def gemm( + transA: Boolean, + transB: Boolean, + alpha: Double, + A: SparseMatrix, + B: DenseMatrix, + beta: Double, + C: DenseMatrix): Unit = { + val mA: Int = if (!transA) A.numRows else A.numCols + val nB: Int = if (!transB) B.numCols else B.numRows + val kA: Int = if (!transA) A.numCols else A.numRows + val kB: Int = if (!transB) B.numRows else B.numCols + + require(kA == kB, s"The columns of A don't match the rows of B. A: $kA, B: $kB") + require(mA == C.numRows, s"The rows of C don't match the rows of A. C: ${C.numRows}, A: $mA") + require(nB == C.numCols, + s"The columns of C don't match the columns of B. C: ${C.numCols}, A: $nB") + + val Avals = A.values + val Arows = if (!transA) A.rowIndices else A.colPtrs + val Acols = if (!transA) A.colPtrs else A.rowIndices + + if (transA) { + // Naive matrix multiplication by using only non-zeros in A. This is the optimal + // multiplication setting for sparse matrices. + var colCounterForB = 0 + if (!transB) { // Expensive to put the check inside the loop + // Loop through columns of B as you multiply each column with the rows of A + while (colCounterForB < nB) { + var rowCounterForA = 0 + val Cstart = colCounterForB * mA + val Bstart = colCounterForB * kA + while (rowCounterForA < mA) { + var i = Arows(rowCounterForA) + val indEnd = Arows(rowCounterForA + 1) + var sum = 0.0 + while (i < indEnd) { + sum += Avals(i) * B.values(Bstart + Acols(i)) + i += 1 + } + val Cindex = Cstart + rowCounterForA + C.values(Cindex) = beta * C.values(Cindex) + sum * alpha + rowCounterForA += 1 + } + colCounterForB += 1 + } + } else { + // Loop through columns of B as you multiply each column with the rows of A. Not as + // efficient as you have to jump through the values of B, because B is column major. + while (colCounterForB < nB) { + var rowCounter = 0 + val Cstart = colCounterForB * mA + while (rowCounter < mA) { + var i = Arows(rowCounter) + val indEnd = Arows(rowCounter + 1) + var sum = 0.0 + while (i < indEnd) { + sum += Avals(i) * B(colCounterForB, Acols(i)) + i += 1 + } + val Cindex = Cstart + rowCounter + C.values(Cindex) = beta * C.values(Cindex) + sum * alpha + rowCounter += 1 + } + colCounterForB += 1 + } + } + } else { + // Perform matrix multiplication and add to C. The rows of A are multiplied by the columns of + // B, and added to C. Each value of C gets updated multiple times therefore scale C first. + if (beta != 0.0) { + f2jBLAS.dscal(C.values.length, beta, C.values, 1) + } + + var colCounterForB = 0 // the column to be updated in C + if (!transB) { // Expensive to put the check inside the loop + while (colCounterForB < nB) { + var colCounterForA = 0 // The column of A to multiply with the row of B + val Bstart = colCounterForB * kB + val Cstart = colCounterForB * mA + while (colCounterForA < kA) { + var i = Acols(colCounterForA) + val indEnd = Acols(colCounterForA + 1) + val Bval = B.values(Bstart + colCounterForA) * alpha + while (i < indEnd) { + C.values(Cstart + Arows(i)) += Avals(i) * Bval + i += 1 + } + colCounterForA += 1 + } + colCounterForB += 1 + } + } else { + while (colCounterForB < nB) { + var colCounterForA = 0 // The column of A to multiply with the row of B + val Cstart = colCounterForB * mA + while (colCounterForA < kA) { + var i = Acols(colCounterForA) + val indEnd = Acols(colCounterForA + 1) + val Bval = B(colCounterForB, colCounterForA) * alpha + while (i < indEnd) { + C.values(Cstart + Arows(i)) += Avals(i) * Bval + i += 1 + } + colCounterForA += 1 + } + colCounterForB += 1 + } + } + } + } + + /** + * C := alpha * A * B + beta * C + * For `DenseMatrix` A and `SparseMatrix` B. + */ + private def gemm( + transA: Boolean, + transB: Boolean, + alpha: Double, + A: DenseMatrix, + B: SparseMatrix, + beta: Double, + C: DenseMatrix): Unit = { + val mA: Int = if (!transA) A.numRows else A.numCols + val nB: Int = if (!transB) B.numCols else B.numRows + val kA: Int = if (!transA) A.numCols else A.numRows + val kB: Int = if (!transB) B.numRows else B.numCols + + require(kA == kB, s"The columns of A don't match the rows of B. A: $kA, B: $kB") + require(mA == C.numRows, s"The rows of C don't match the rows of A. C: ${C.numRows}, A: $mA") + require(nB == C.numCols, + s"The columns of C don't match the columns of B. C: ${C.numCols}, A: $nB") + + val Bvals = B.values + val Brows = if (!transB) B.rowIndices else B.colPtrs + val Bcols = if (!transB) B.colPtrs else B.rowIndices + + if (transA) { + // Easy to loop over rows of A, since A was column major and we are using its transpose + var colCounterForB = 0 + if (!transB) { // Expensive to put the check inside the loop + // Naive matrix multiplication using only non-zero elements in B + while (colCounterForB < nB) { + var rowCounterForA = 0 + val Cstart = colCounterForB * mA + val indEnd = Bcols(colCounterForB + 1) + while (rowCounterForA < mA) { + var i = Bcols(colCounterForB) + val Astart = rowCounterForA * kA + var sum = 0.0 + while (i < indEnd) { + sum += Bvals(i) * A.values(Astart + Brows(i)) + i += 1 + } + val Cindex = Cstart + rowCounterForA + C.values(Cindex) = beta * C.values(Cindex) + sum * alpha + rowCounterForA += 1 + } + colCounterForB += 1 + } + } else { + // Scale matrix first if `beta` is not equal to 0.0 as we update values of C multiple times. + if (beta != 0.0) { + nativeBLAS.dscal(C.values.length, beta, C.values, 1) + } + // Loop over values of A as you pick the non-zero values in B and add it to C. + var rowCounterForA = 0 + while (rowCounterForA < mA) { + var colCounterForA = 0 + val Astart = rowCounterForA * kA + while (colCounterForA < kA) { + var i = Brows(colCounterForA) + val indEnd = Brows(colCounterForA + 1) + while (i < indEnd) { + val Cindex = Bcols(i) * mA + rowCounterForA + C.values(Cindex) += A.values(Astart + colCounterForA) * Bvals(i) * alpha + i += 1 + } + colCounterForA += 1 + } + rowCounterForA += 1 + } + } + } else { + // Scale matrix first if `beta` is not equal to 0.0 + if (beta != 0.0) { + nativeBLAS.dscal(C.values.length, beta, C.values, 1) + } + if (!transB) { // Expensive to put the check inside the loop + // Loop over the columns of B, pick non-zero row in B, select corresponding column in A, + // and update the whole column in C by looping over rows in A. + var colCounterForB = 0 // the column to be updated in C + while (colCounterForB < nB) { + var i = Bcols(colCounterForB) + val indEnd = Bcols(colCounterForB + 1) + while (i < indEnd) { + var rowCounterForA = 0 + val Bval = Bvals(i) + val Cstart = colCounterForB * mA + val Astart = mA * Brows(i) + while (rowCounterForA < mA) { + C.values(Cstart + rowCounterForA) += A.values(Astart + rowCounterForA) * Bval * alpha + rowCounterForA += 1 + } + i += 1 + } + colCounterForB += 1 + } + } else { + // Multiply columns of A with the rows of B and add to C. + var colCounterForA = 0 + while (colCounterForA < kA) { + var rowCounterForA = 0 + val Astart = mA * colCounterForA + val indEnd = Brows(colCounterForA + 1) + while (rowCounterForA < mA) { + var i = Brows(colCounterForA) + while (i < indEnd) { + val Cindex = Bcols(i) * mA + rowCounterForA + C.values(Cindex) += A.values(Astart + rowCounterForA) * Bvals(i) * alpha + i += 1 + } + rowCounterForA += 1 + } + colCounterForA += 1 + } + } + } + } + + /** + * y := alpha * A * x + beta * y + * @param trans whether to use the transpose of matrix A (true), or A itself (false). + * @param alpha a scalar to scale the multiplication A * x. + * @param A the matrix A that will be left multiplied to x. Size of m x n. + * @param x the vector x that will be left multiplied by A. Size of n x 1. + * @param beta a scalar that can be used to scale vector y. + * @param y the resulting vector y. Size of m x 1. + */ + def gemv( + trans: Boolean, + alpha: Double, + A: Matrix, + x: DenseVector, + beta: Double, + y: DenseVector): Unit = { + + val mA: Int = if (!trans) A.numRows else A.numCols + val nx: Int = x.size + val nA: Int = if (!trans) A.numCols else A.numRows + + require(nA == nx, s"The columns of A don't match the number of elements of x. A: $nA, x: $nx") + require(mA == y.size, + s"The rows of A don't match the number of elements of y. A: $mA, y:${y.size}}") + if (alpha == 0.0) { + logDebug("gemv: alpha is equal to 0. Returning y.") + } else { + A match { + case sparse: SparseMatrix => + gemv(trans, alpha, sparse, x, beta, y) + case dense: DenseMatrix => + gemv(trans, alpha, dense, x, beta, y) + case _ => + throw new IllegalArgumentException(s"gemv doesn't support matrix type ${A.getClass}.") + } + } + } + + /** + * y := alpha * A * x + beta * y + * + * @param alpha a scalar to scale the multiplication A * x. + * @param A the matrix A that will be left multiplied to x. Size of m x n. + * @param x the vector x that will be left multiplied by A. Size of n x 1. + * @param beta a scalar that can be used to scale vector y. + * @param y the resulting vector y. Size of m x 1. + */ + def gemv( + alpha: Double, + A: Matrix, + x: DenseVector, + beta: Double, + y: DenseVector): Unit = { + gemv(false, alpha, A, x, beta, y) + } + + /** + * y := alpha * A * x + beta * y + * For `DenseMatrix` A. + */ + private def gemv( + trans: Boolean, + alpha: Double, + A: DenseMatrix, + x: DenseVector, + beta: Double, + y: DenseVector): Unit = { + val tStrA = if (!trans) "N" else "T" + nativeBLAS.dgemv(tStrA, A.numRows, A.numCols, alpha, A.values, A.numRows, x.values, 1, beta, + y.values, 1) + } + + /** + * y := alpha * A * x + beta * y + * For `SparseMatrix` A. + */ + private def gemv( + trans: Boolean, + alpha: Double, + A: SparseMatrix, + x: DenseVector, + beta: Double, + y: DenseVector): Unit = { + + val mA: Int = if(!trans) A.numRows else A.numCols + val nA: Int = if(!trans) A.numCols else A.numRows + + val Avals = A.values + val Arows = if (!trans) A.rowIndices else A.colPtrs + val Acols = if (!trans) A.colPtrs else A.rowIndices + + if (trans) { + // Since A is column majored, the transpose allows easy access to the column indices in A'. + var rowCounter = 0 + while (rowCounter < mA) { + var i = Arows(rowCounter) + val indEnd = Arows(rowCounter + 1) + var sum = 0.0 + while(i < indEnd) { + sum += Avals(i) * x.values(Acols(i)) + i += 1 + } + y.values(rowCounter) = beta * y.values(rowCounter) + sum * alpha + rowCounter += 1 + } + } else { + // Scale vector first if `beta` is not equal to 0.0 as values in y are updated multiple times + if (beta != 0.0) { + scal(beta, y) + } + // Perform matrix-vector multiplication and add to y + var colCounterForA = 0 + while (colCounterForA < nA) { + var i = Acols(colCounterForA) + val indEnd = Acols(colCounterForA + 1) + val xVal = x.values(colCounterForA) * alpha + while (i < indEnd) { + val rowIndex = Arows(i) + y.values(rowIndex) += Avals(i) * xVal + i += 1 + } + colCounterForA += 1 + } + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index b11ba5d30fbd..732d88aad329 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -17,12 +17,19 @@ package org.apache.spark.mllib.linalg -import breeze.linalg.{Matrix => BM, DenseMatrix => BDM} +import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM} + +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.util.Utils + +import scala.collection.mutable.ArrayBuffer +import java.util.Arrays /** * Trait for a local matrix. */ -trait Matrix extends Serializable { +sealed trait Matrix extends Serializable { /** Number of rows. */ def numRows: Int @@ -37,9 +44,169 @@ trait Matrix extends Serializable { private[mllib] def toBreeze: BM[Double] /** Gets the (i, j)-th element. */ - private[mllib] def apply(i: Int, j: Int): Double = toBreeze(i, j) + private[mllib] def apply(i: Int, j: Int): Double + + /** Return the index for the (i, j)-th element in the backing array. */ + private[mllib] def index(i: Int, j: Int): Int + + /** Update element at (i, j) */ + private[mllib] def update(i: Int, j: Int, v: Double): Unit + + /** Get a deep copy of the matrix. */ + def copy: Matrix + + /** Convenience method for `Matrix`-`Matrix` multiplication. + * Note: `SparseMatrix`-`SparseMatrix` multiplication is not supported */ + def multiply(y: Matrix): DenseMatrix = { + val C: DenseMatrix = DenseMatrix.zeros(numRows, y.numCols) + BLAS.gemm(false, false, 1.0, this, y, 0.0, C) + C + } + + /** Convenience method for `Matrix`-`DenseVector` multiplication. */ + def multiply(y: DenseVector): DenseVector = { + val output = new DenseVector(new Array[Double](numRows)) + BLAS.gemv(1.0, this, y, 0.0, output) + output + } + + /** Convenience method for `Matrix`^T^-`Matrix` multiplication. + * Note: `SparseMatrix`-`SparseMatrix` multiplication is not supported */ + def transposeMultiply(y: Matrix): DenseMatrix = { + val C: DenseMatrix = DenseMatrix.zeros(numCols, y.numCols) + BLAS.gemm(true, false, 1.0, this, y, 0.0, C) + C + } + + /** Convenience method for `Matrix`^T^-`DenseVector` multiplication. */ + def transposeMultiply(y: DenseVector): DenseVector = { + val output = new DenseVector(new Array[Double](numCols)) + BLAS.gemv(true, 1.0, this, y, 0.0, output) + output + } + /** A human readable representation of the matrix */ override def toString: String = toBreeze.toString() + + /** Maps all the values in the matrix. Preserves shape. Generates a new matrix. */ + private[mllib] def map(f: Double => Double): Matrix + + /** Updates all the values in the matrix. Preserves shape. In Place. */ + private[mllib] def update(f: Double => Double): Matrix + + /** Applies an operator element-wise with another matrix on all the columns of the calling matrix. + * The number of rows of the calling matrix and `y` should be equal. Operation is in place. */ + private[mllib] def elementWiseOperateOnColumnsInPlace(f: (Double, Double) => Double, + y: Matrix): Matrix + + /** Applies an operator element-wise with another matrix on all the rows of the calling matrix. + * The number of columns of the calling matrix and `y` should be equal. Operation is in place. */ + private[mllib] def elementWiseOperateOnRowsInPlace(f: (Double, Double) => Double, + y: Matrix): Matrix + + /** Applies an operator element-wise with another matrix. The shape of the calling matrix and `y` + * should be equal. Operation is in place. */ + private[mllib] def elementWiseOperateInPlace(f: (Double, Double) => Double, y: Matrix): Matrix + + /** Applies an operator element-wise with a scalar. Operation is in place. */ + private[mllib] def elementWiseOperateScalarInPlace(f: (Double, Double) => Double, + y: Double): Matrix + + /** Applies an operator element-wise with another matrix. Operation is in place. */ + private[mllib] def operateInPlace(f: (Double, Double) => Double, y: Matrix): Matrix + +/** Applies an operator element-wise with another matrix on all the columns of the calling matrix. + * The number of rows of the calling matrix and `y` should be equal. Returns new matrix. */ + private[mllib] def elementWiseOperateOnColumns(f: (Double, Double) => Double, y: Matrix): Matrix + + /** Applies an operator element-wise with another matrix on all the rows of the calling matrix. + * The number of columns of the calling matrix and `y` should be equal. Returns new matrix. */ + private[mllib] def elementWiseOperateOnRows(f: (Double, Double) => Double, y: Matrix): Matrix + + /** Applies an operator element-wise with another matrix. The shape of the calling matrix and `y` + * should be equal. Returns new matrix. */ + private[mllib] def elementWiseOperate(f: (Double, Double) => Double, y: Matrix): Matrix + + /** Applies an operator element-wise with a scalar. Returns new matrix. */ + private[mllib] def elementWiseOperateScalar(f: (Double, Double) => Double, y: Double): Matrix + + /** Applies an operator element-wise with another matrix. Returns new matrix. */ + private[mllib] def operate(f: (Double, Double) => Double, y: Matrix): Matrix + + /** Element-wise multiplication of the calling matrix with `Matrix` y. Operation is in place. */ + private[mllib] def *=(y: Matrix) = operateInPlace(_ * _, y) + + /** Element-wise multiplication of the calling matrix with `Matrix` y. Returns new matrix. */ + private[mllib] def *(y: Matrix) = operate(_ * _, y) + + /** Element-wise addition of the calling matrix with `Matrix` y. Operation is in place. */ + private[mllib] def +=(y: Matrix) = operateInPlace(_ + _, y) + + /** Element-wise addition of the calling matrix with `Matrix` y. Returns new matrix. */ + private[mllib] def +(y: Matrix) = operate(_ + _, y) + + /** Element-wise subtraction of `Matrix` y from the calling matrix. Operation is in place. */ + private[mllib] def -=(y: Matrix) = operateInPlace(_ - _, y) + + /** Element-wise subtraction of `Matrix` y from the calling matrix. Returns new matrix. */ + private[mllib] def -(y: Matrix) = operate(_ - _, y) + + /** Element-wise division of the calling matrix by `Matrix` y. Operation is in place. */ + private[mllib] def /=(y: Matrix) = operateInPlace(_ / _, y) + + /** Element-wise division of the calling matrix by `Matrix` y. Returns new matrix. */ + private[mllib] def /(y: Matrix) = operate(_ / _, y) + + /** Element-wise multiplication of the calling matrix with the scalar y. Operation is in place. */ + private[mllib] def *=(y: Double) = elementWiseOperateScalarInPlace(_ * _, y) + + /** Element-wise multiplication of the calling matrix with the scalar y. Returns new matrix. */ + private[mllib] def *(y: Double) = elementWiseOperateScalar(_ * _, y) + + /** Element-wise addition of the scalar y on the calling matrix. Operation is in place. */ + private[mllib] def +=(y: Double) = elementWiseOperateScalarInPlace(_ + _, y) + + /** Element-wise addition of the scalar y on the calling matrix. Returns new matrix. */ + private[mllib] def +(y: Double) = elementWiseOperateScalar(_ + _, y) + + /** Element-wise subtraction of the scalar y from the calling matrix. Operation is in place. */ + private[mllib] def -=(y: Double) = elementWiseOperateScalarInPlace(_ - _, y) + + /** Element-wise subtraction of the scalar y from the calling matrix. Returns new matrix. */ + private[mllib] def -(y: Double) = elementWiseOperateScalar(_ - _, y) + + /** Element-wise division of the calling matrix with the scalar y. Operation is in place. */ + private[mllib] def /=(y: Double) = elementWiseOperateScalarInPlace(_ / _, y) + + /** Element-wise division of the calling matrix with the scalar y. Returns new matrix. */ + private[mllib] def /(y: Double) = elementWiseOperateScalar(_ / _, y) + + /** Multiply all elements with -1. Returns new matrix. */ + private[mllib] def neg: Matrix + + /** Multiply all elements with -1 in place. */ + private[mllib] def negInPlace: Matrix + + /** Compare elements in this matrix with `v`, such as less than or greater than. Outputs binary + * `DenseMatrix` */ + private[mllib] def compare(v: Double, f: (Double, Double) => Boolean): DenseMatrix + + /** Returns the p-th norm for each column */ + private[mllib] def colNorms(p: Double): DenseMatrix + + /** Sum the columns in this matrix. */ + private[mllib] def colSums: DenseMatrix = colSums(false) + + /** Sum the columns in this matrix. Can specify rows to skip using a binary matrix and + * whether to take absolute values of elements. */ + private[mllib] def colSums(absolute: Boolean, skipRows: DenseMatrix = null): DenseMatrix + + /** Sum the rows in this matrix. */ + private[mllib] def rowSums: Vector = rowSums(false) + + /** Sum the rows in this matrix. Can specify columns to skip using a binary matrix and + * whether to take absolute values of elements. */ + private[mllib] def rowSums(absolute: Boolean, skipCols: DenseVector = null): Vector } /** @@ -57,13 +224,850 @@ trait Matrix extends Serializable { * @param numCols number of columns * @param values matrix entries in column major */ -class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix { +class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) extends Matrix with Serializable { - require(values.length == numRows * numCols) + require(values.length == numRows * numCols, "The number of values supplied doesn't match the " + + s"size of the matrix! values.length: ${values.length}, numRows * numCols: ${numRows * numCols}") override def toArray: Array[Double] = values - private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) + private[mllib] def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) + + private[mllib] def apply(i: Int): Double = values(i) + + private[mllib] def apply(i: Int, j: Int): Double = values(index(i, j)) + + private[mllib] def index(i: Int, j: Int): Int = i + numRows * j + + private[mllib] def update(i: Int, j: Int, v: Double): Unit = { + values(index(i, j)) = v + } + + override def copy = new DenseMatrix(numRows, numCols, values.clone()) + + private[mllib] def elementWiseOperateOnColumnsInPlace( + f: (Double, Double) => Double, + y: Matrix): DenseMatrix = { + val y_vals = y.toArray + val len = y_vals.length + require(y_vals.length == numRows) + var j = 0 + while (j < numCols) { + var i = 0 + while (i < len) { + val idx = index(i, j) + values(idx) = f(values(idx), y_vals(i)) + i += 1 + } + j += 1 + } + this + } + + private[mllib] def elementWiseOperateOnRowsInPlace( + f: (Double, Double) => Double, + y: Matrix): DenseMatrix = { + val y_vals = y.toArray + require(y_vals.length == numCols) + var j = 0 + while (j < numCols) { + var i = 0 + while (i < numRows) { + val idx = index(i, j) + values(idx) = f(values(idx), y_vals(j)) + i += 1 + } + j += 1 + } + this + } + + private[mllib] def elementWiseOperateInPlace( + f: (Double, Double) => Double, + y: Matrix): DenseMatrix = { + val y_val = y.toArray + val len = values.length + require(y_val.length == values.length) + var j = 0 + while (j < len) { + values(j) = f(values(j), y_val(j)) + j += 1 + } + this + } + + private[mllib] def elementWiseOperateScalarInPlace( + f: (Double, Double) => Double, + y: Double): DenseMatrix = { + var j = 0 + val len = values.length + while (j < len) { + values(j) = f(values(j), y) + j += 1 + } + this + } + + private[mllib] def operateInPlace(f: (Double, Double) => Double, y: Matrix): DenseMatrix = { + if (y.numCols==1 || y.numRows == 1) { + require(numCols != numRows, "Operation is ambiguous. Please use elementWiseOperateOnRows " + + "or elementWiseOperateOnColumns instead") + } + if (y.numCols == 1 && y.numRows == 1) { + elementWiseOperateScalarInPlace(f, y.toArray(0)) + } else { + if (y.numCols==1) { + elementWiseOperateOnColumnsInPlace(f, y) + } else if (y.numRows==1) { + elementWiseOperateOnRowsInPlace(f, y) + } else{ + elementWiseOperateInPlace(f, y) + } + } + } + + private[mllib] def elementWiseOperateOnColumns( + f: (Double, Double) => Double, + y: Matrix): DenseMatrix = { + val dup = this.copy + dup.elementWiseOperateOnColumnsInPlace(f, y) + } + + private[mllib] def elementWiseOperateOnRows( + f: (Double, Double) => Double, + y: Matrix): DenseMatrix = { + val dup = this.copy + dup.elementWiseOperateOnRowsInPlace(f, y) + } + + private[mllib] def elementWiseOperate(f: (Double, Double) => Double, y: Matrix): DenseMatrix = { + val dup = this.copy + dup.elementWiseOperateInPlace(f, y) + } + + private[mllib] def elementWiseOperateScalar( + f: (Double, Double) => Double, + y: Double): DenseMatrix = { + val dup = this.copy + dup.elementWiseOperateScalarInPlace(f, y) + } + + private[mllib] def operate(f: (Double, Double) => Double, y: Matrix): DenseMatrix = { + val dup = this.copy + dup.operateInPlace(f, y) + } + + def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f)) + + def update(f: Double => Double): DenseMatrix = { + val len = values.length + var i = 0 + while (i < len) { + values(i) = f(values(i)) + i += 1 + } + this + } + + private[mllib] def colSums(absolute: Boolean, skipRows: DenseMatrix = null): DenseMatrix = { + val sums = new DenseMatrix(1, numCols, new Array[Double](numCols)) + var j = 0 + while (j < numCols) { + var i = 0 + while (i < numRows) { + if (skipRows == null) { + var v = values(index(i, j)) + if (absolute) v = math.abs(v) + sums.values(j) += v + } else { + if (skipRows(i) != 1.0) { + var v = values(index(i, j)) + if (absolute) v = math.abs(v) + sums.values(j) += v + } + } + i += 1 + } + j += 1 + } + sums + } + + /** Sum the rows in this matrix. Can specify columns to skip using a binary matrix and + * whether to take absolute values of elements. */ + private[mllib] def rowSums(absolute: Boolean, skipCols: DenseVector = null): DenseVector = { + val sums = new DenseVector(new Array[Double](numRows)) + var j = 0 + while (j < numCols) { + if (skipCols(j) != 0.0) { + var i = 0 + while (i < numRows) { + var v = values(index(i, j)) + if (absolute) v = math.abs(v) + sums.values(i) += v + i += 1 + } + } + j += 1 + } + sums + } + + def colNorms(p: Double): DenseMatrix = { + if (p == 1.0) return colSums(true) + val sums = new DenseMatrix(1, numCols, new Array[Double](numCols)) + var j = 0 + while (j < numCols) { + var i = 0 + while (i < numRows) { + val idx = index(i, j) + val power = if (p == 2.0) values(idx) * values(idx) else math.pow(values(idx), p) + sums.values(j) += power + i += 1 + } + j += 1 + } + if (p == 2.0) sums.update(math.sqrt) else sums.update(math.pow(_, 1 / p)) + sums + } + + private[mllib] def negInPlace: DenseMatrix = { + var j = 0 + val len = values.length + while (j < len) { + values(j) *= -1 + j += 1 + } + this + } + + private[mllib] def neg: DenseMatrix = { + val copy = new DenseMatrix(numRows, numCols, values.clone()) + copy.negInPlace + } + + private[mllib] def compareInPlace(v: Double, f: (Double, Double) => Boolean): DenseMatrix = { + var j = 0 + val len = values.length + while (j < len) { + values(j) = if (f(values(j), v)) 1.0 else 0.0 + j += 1 + } + this + } + + private[mllib] def compare(v: Double, f: (Double, Double) => Boolean): DenseMatrix = { + val copy = new DenseMatrix(numRows, numCols, values.clone()) + copy.compareInPlace(v, f) + } + + private[mllib] def multiplyInPlace(y: Matrix): DenseMatrix = { + val copy = this multiply y + BLAS.copy(Vectors.dense(copy.values), Vectors.dense(values)) + this + } +} + +object DenseMatrix { + + /** + * Generate a `DenseMatrix` consisting of zeros. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values of zeros + */ + def zeros(numRows: Int, numCols: Int): DenseMatrix = + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(0.0)) + + /** + * Generate a `DenseMatrix` consisting of ones. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values of ones + */ + def ones(numRows: Int, numCols: Int): DenseMatrix = + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0)) + + /** + * Generate an Identity Matrix in `DenseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `DenseMatrix` with size `n` x `n` and values of ones on the diagonal + */ + def eye(n: Int): DenseMatrix = { + val identity = DenseMatrix.zeros(n, n) + var i = 0 + while (i < n) { + identity.update(i, i, 1.0) + i += 1 + } + identity + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int): DenseMatrix = { + val rand = new XORShiftRandom + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextDouble())) + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int): DenseMatrix = { + val rand = new XORShiftRandom + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextGaussian())) + } + + /** + * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `DenseMatrix` with size `values.length` x `values.length` and `values` + * on the diagonal + */ + def diag(vector: Vector): DenseMatrix = { + val n = vector.size + val matrix = DenseMatrix.eye(n) + val values = vector.toArray + var i = 0 + while (i < n) { + matrix.update(i, i, values(i)) + i += 1 + } + matrix + } +} + +/** + * Column-majored sparse matrix. + * The entry values are stored in Compressed Sparse Column (CSC) format. + * For example, the following matrix + * {{{ + * 1.0 0.0 4.0 + * 0.0 3.0 5.0 + * 2.0 0.0 6.0 + * }}} + * is stored as `values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`, + * `rowIndices=[0, 2, 1, 0, 1, 2]`, `colPointers=[0, 2, 3, 6]`. + * + * @param numRows number of rows + * @param numCols number of columns + * @param colPtrs the index corresponding to the start of a new column + * @param rowIndices the row index of the entry. They must be in strictly increasing order for each + * column + * @param values non-zero matrix entries in column major + */ +class SparseMatrix( + val numRows: Int, + val numCols: Int, + val colPtrs: Array[Int], + val rowIndices: Array[Int], + val values: Array[Double]) extends Matrix with Serializable { + + require(values.length == rowIndices.length, "The number of row indices and values don't match! " + + s"values.length: ${values.length}, rowIndices.length: ${rowIndices.length}") + require(colPtrs.length == numCols + 1, "The length of the column indices should be the " + + s"number of columns + 1. Currently, colPointers.length: ${colPtrs.length}, " + + s"numCols: $numCols") + + override def toArray: Array[Double] = { + val arr = new Array[Double](numRows * numCols) + var j = 0 + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + val offset = j * numRows + while (i < indEnd) { + val rowIndex = rowIndices(i) + arr(offset + rowIndex) = values(i) + i += 1 + } + j += 1 + } + arr + } + + private[mllib] def toBreeze: BM[Double] = + new BSM[Double](values, numRows, numCols, colPtrs, rowIndices) + + private[mllib] def apply(i: Int, j: Int): Double = { + val ind = index(i, j) + if (ind < 0) 0.0 else values(ind) + } + + private[mllib] def index(i: Int, j: Int): Int = { + Arrays.binarySearch(rowIndices, colPtrs(j), colPtrs(j + 1), i) + } + + private[mllib] def update(i: Int, j: Int, v: Double): Unit = { + val ind = index(i, j) + if (ind == -1) { + throw new NoSuchElementException("The given row and column indices correspond to a zero " + + "value. Only non-zero elements in Sparse Matrices can be updated.") + } else { + values(index(i, j)) = v + } + } + + override def copy = new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.clone()) + + /** Note: If `f` is not a multiplication or division operation, the operation will not be in place + * and a `DenseMatrix` will be returned. */ + private[mllib] def elementWiseOperateOnColumnsInPlace( + f: (Double, Double) => Double, + y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val y_vals = y.toArray + require(y_vals.length == numRows) + var j = 0 + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + values(i) = f(values(i), y_vals(rowIndices(i))) + i += 1 + } + j += 1 + } + this + } else { + val dup = this.toDense + dup.elementWiseOperateOnColumnsInPlace(f, y) + } + } + + /** Note: If `f` is not a multiplication or division operation, the operation will not be in place + * and a `DenseMatrix` will be returned. */ + private[mllib] def elementWiseOperateOnRowsInPlace( + f: (Double, Double) => Double, + y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val y_vals = y.toArray + require(y_vals.length == numCols) + var j = 0 + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + values(i) = f(values(i), y_vals(j)) + i += 1 + } + j += 1 + } + this + } else { + val dup = this.toDense + dup.elementWiseOperateOnRowsInPlace(f, y) + } + } + + /** Note: If `f` is not a multiplication or division operation, the operation will not be in place + * and a `DenseMatrix` will be returned. */ + private[mllib] def elementWiseOperateInPlace( + f: (Double, Double) => Double, + y: Matrix): Matrix = { + require(y.numCols == numCols) + require(y.numRows == numRows) + if (isMultiplication(f) || isDivision(f)) { + var j = 0 + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + values(i) = f(values(i), y(rowIndices(i), j)) + i += 1 + } + j += 1 + } + this + } else { + val dup = this.toDense + dup.elementWiseOperateInPlace(f, y) + } + } + + /** Note: If `f` is not a multiplication or division operation, the operation will not be in place + * and a `DenseMatrix` will be returned. */ + private[mllib] def elementWiseOperateScalarInPlace( + f: (Double, Double) => Double, + y: Double): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + var j = 0 + val len = values.length + while (j < len) { + values(j) = f(values(j), y) + j += 1 + } + this + } else { + val dup = this.toDense + dup.elementWiseOperateScalarInPlace(f, y) + } + } + + private def isMultiplication(f: (Double, Double) => Double): Boolean = { + if (f(2, 9) != 18) return false + if (f(3, 7) != 21) return false + if (f(8, 9) != 72) return false + true + } + + private def isDivision(f: (Double, Double) => Double): Boolean = { + if (f(12, 3) != 4) return false + if (f(72, 4) != 18) return false + if (f(72, 9) != 8) return false + true + } + + /** Note: If `f` is not a multiplication or division operation, the operation will not be in place + * and a `DenseMatrix` will be returned. */ + private[mllib] def operateInPlace(f: (Double, Double) => Double, y: Matrix): Matrix = { + if (y.numCols==1 || y.numRows == 1) { + require(numCols != numRows, "Operation is ambiguous. Please use elementWiseMultiplyRows " + + "or elementWiseMultiplyColumns instead") + } + if (y.numCols == 1 && y.numRows == 1) { + elementWiseOperateScalarInPlace(f, y.toArray(0)) + } else { + if (y.numCols == 1) { + elementWiseOperateOnColumnsInPlace(f, y) + } else if (y.numRows == 1) { + elementWiseOperateOnRowsInPlace(f, y) + } else{ + elementWiseOperateInPlace(f, y) + } + } + } + + private[mllib] def elementWiseOperateOnColumns( + f: (Double, Double) => Double, + y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val dup = this.copy + dup.elementWiseOperateOnColumnsInPlace(f, y) + } else { + val dup = this.toDense + dup.elementWiseOperateOnColumnsInPlace(f, y) + } + } + + private[mllib] def elementWiseOperateOnRows( + f: (Double, Double) => Double, + y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val dup = this.copy + dup.elementWiseOperateOnRowsInPlace(f, y) + } else { + val dup = this.toDense + dup.elementWiseOperateOnRowsInPlace(f, y) + } + } + + private[mllib] def elementWiseOperate(f: (Double, Double) => Double, y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val dup = this.copy + dup.elementWiseOperateInPlace(f, y) + } else { + val dup = this.toDense + dup.elementWiseOperateInPlace(f, y) + } + } + + private[mllib] def elementWiseOperateScalar(f: (Double, Double) => Double, y: Double): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val dup = this.copy + dup.elementWiseOperateScalarInPlace(f, y) + } else { + val dup = this.toDense + dup.elementWiseOperateScalarInPlace(f, y) + } + } + + private[mllib] def operate(f: (Double, Double) => Double, y: Matrix): Matrix = { + if (isMultiplication(f) || isDivision(f)) { + val dup = this.copy + dup.operateInPlace(f, y) + } else { + val dup = this.toDense + dup.operateInPlace(f, y) + } + } + + def map(f: Double => Double): SparseMatrix = + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f)) + + def update(f: Double => Double): SparseMatrix = { + val len = values.length + var i = 0 + while (i < len) { + values(i) = f(values(i)) + i += 1 + } + this + } + + private[mllib] def colSums(absolute: Boolean, skipRows: DenseMatrix = null): DenseMatrix = { + val sums = new DenseMatrix(1, numCols, new Array[Double](numCols)) + var j = 0 + if (skipRows == null) { + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + var v = values(i) + if (absolute) v = math.abs(v) + sums.values(j) += v + i += 1 + } + j += 1 + } + } else { + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + if (skipRows(rowIndices(i)) != 0) { + var v = values(i) + if (absolute) v = math.abs(v) + sums.values(j) += v + } + i += 1 + } + j += 1 + } + } + sums + } + + /** Sum the rows in this matrix. Can specify columns to skip using a binary matrix and + * whether to take absolute values of elements. */ + private[mllib] def rowSums(absolute: Boolean, skipCols: DenseVector = null): DenseVector = { + val sums = new DenseVector(new Array[Double](numRows)) + var j = 0 + while (j < numCols) { + if (skipCols(j) != 0.0) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + var v = values(i) + if (absolute) v = math.abs(v) + sums.values(rowIndices(i)) += v + i += 1 + } + } + j += 1 + } + sums + } + + def colNorms(p: Double): DenseMatrix = { + if (p == 1.0) return colSums(true) + val sums = new DenseMatrix(1, numCols, new Array[Double](numCols)) + var j = 0 + while (j < numCols) { + var i = colPtrs(j) + val indEnd = colPtrs(j + 1) + while (i < indEnd) { + val power = if (p == 2.0) values(i) * values(i) else math.pow(values(i), p) + sums.values(j) += power + i += 1 + } + j += 1 + } + if (p == 2.0) sums.update(math.sqrt) else sums.update(math.pow(_, 1 / p)) + sums + } + + private[mllib] def negInPlace: SparseMatrix = { + var j = 0 + val len = values.length + while (j < len) { + values(j) *= -1 + j += 1 + } + this + } + + private[mllib] def neg: SparseMatrix = { + val copy = this.copy + copy.negInPlace + } + + private[mllib] def compare(v: Double, f: (Double, Double) => Boolean): DenseMatrix = { + val copy = new DenseMatrix(numRows, numCols, this.toArray) + copy.compareInPlace(v, f) + } + + /** Generate a dense copy of this matrix */ + def toDense: DenseMatrix = new DenseMatrix(numRows, numCols, this.toArray) +} + +object SparseMatrix { + + /** + * Generate an Identity Matrix in `SparseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `SparseMatrix` with size `n` x `n` and values of ones on the diagonal + */ + def speye(n: Int): SparseMatrix = { + new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) + } + + /** + * Given the values of a column majored `DenseMatrix`, output the sparse version + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param raw the values of the `DenseMatrix` + * @param nonZero number of non-zero elements in `raw` + * @return the sparsified version of the dense matrix + */ + private def sparsifyFromDenseArray( + numRows: Int, + numCols: Int, + raw: Array[Double], + nonZero: Int): SparseMatrix = { + val sparseA: ArrayBuffer[Double] = new ArrayBuffer(nonZero) + + val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) + val sRows: ArrayBuffer[Int] = new ArrayBuffer(nonZero) + + var i = 0 + var nnz = 0 + var lastCol = -1 + + raw.foreach { v => + val r = i % numRows + val c = (i - r) / numRows + if ( v != 0.0) { + sRows.append(r) + sparseA.append(v) + while (c != lastCol) { + sCols.append(nnz) + lastCol += 1 + } + nnz += 1 + } + i += 1 + } + sCols.append(sparseA.length) + new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): SparseMatrix = { + + require(density > 0.0 && density < 1.0, "density must be a double in the range " + + s"0.0 < d < 1.0. Currently, density: $density") + val rand = new XORShiftRandom(seed) + val length = numRows * numCols + val rawA = Array.fill(length)(0.0) + var nnz = 0 + for (i <- 0 until length) { + val p = rand.nextDouble() + if (p < density) { + rawA.update(i, rand.nextDouble()) + nnz += 1 + } + } + sparsifyFromDenseArray(numRows, numCols, rawA, nnz) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): SparseMatrix = { + require(density > 0.0 && density < 1.0, "density must be a double in the range " + + s"0.0 < d < 1.0. Currently, density: $density") + val rand = new XORShiftRandom(seed) + val length = numRows * numCols + val rawA = Array.fill(length)(0.0) + var nnz = 0 + for (i <- 0 until length) { + val p = rand.nextDouble() + if (p < density) { + rawA.update(i, rand.nextGaussian()) + nnz += 1 + } + } + sparsifyFromDenseArray(numRows, numCols, rawA, nnz) + } + + /** + * Generate a diagonal matrix in `SparseMatrix` format from the supplied values. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `SparseMatrix` with size `values.length` x `values.length` and non-zero `values` + * on the diagonal + */ + def diag(vector: Vector): SparseMatrix = { + val n = vector.size + vector match { + case sVec: SparseVector => + val rows = sVec.indices + val values = sVec.values + var i = 0 + var lastCol = -1 + val colPtrs = new ArrayBuffer[Int](n) + rows.foreach { r => + while (r != lastCol) { + colPtrs.append(i) + lastCol += 1 + } + i += 1 + } + colPtrs.append(n) + new SparseMatrix(n, n, colPtrs.toArray, rows, values) + case dVec: DenseVector => + val values = dVec.values + var i = 0 + var nnz = 0 + val sVals = values.filter( v => v != 0.0) + var lastCol = -1 + val colPtrs = new ArrayBuffer[Int](n + 1) + val sRows = new ArrayBuffer[Int](sVals.length) + values.foreach { v => + if (v != 0.0) { + sRows.append(i) + while (lastCol != i) { + colPtrs.append(nnz) + lastCol += 1 + } + nnz += 1 + } + i += 1 + } + while (lastCol != i) { + colPtrs.append(nnz) + lastCol += 1 + } + new SparseMatrix(n, n, colPtrs.toArray, sRows.toArray, sVals) + } + } } /** @@ -82,6 +1086,24 @@ object Matrices { new DenseMatrix(numRows, numCols, values) } + /** + * Creates a column-majored sparse matrix in Compressed Sparse Column (CSC) format. + * + * @param numRows number of rows + * @param numCols number of columns + * @param colPtrs the index corresponding to the start of a new column + * @param rowIndices the row index of the entry + * @param values non-zero matrix entries in column major + */ + def sparse( + numRows: Int, + numCols: Int, + colPtrs: Array[Int], + rowIndices: Array[Int], + values: Array[Double]): Matrix = { + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values) + } + /** * Creates a Matrix instance from a breeze matrix. * @param breeze a breeze matrix @@ -93,9 +1115,354 @@ object Matrices { require(dm.majorStride == dm.rows, "Do not support stride size different from the number of rows.") new DenseMatrix(dm.rows, dm.cols, dm.data) + case sm: BSM[Double] => + new SparseMatrix(sm.rows, sm.cols, sm.colPtrs, sm.rowIndices, sm.data) case _ => throw new UnsupportedOperationException( s"Do not support conversion from type ${breeze.getClass.getName}.") } } + + /** + * Generate a `DenseMatrix` consisting of zeros. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values of zeros + */ + def zeros(numRows: Int, numCols: Int): Matrix = DenseMatrix.zeros(numRows, numCols) + + /** + * Generate a `DenseMatrix` consisting of ones. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values of ones + */ + def ones(numRows: Int, numCols: Int): Matrix = DenseMatrix.ones(numRows, numCols) + + /** + * Generate an Identity Matrix in `DenseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal + */ + def eye(n: Int): Matrix = DenseMatrix.eye(n) + + /** + * Generate an Identity Matrix in `SparseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal + */ + def speye(n: Int): Matrix = SparseMatrix.speye(n) + + /** + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int): Matrix = DenseMatrix.rand(numRows, numCols) + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int): Matrix = DenseMatrix.randn(numRows, numCols) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): Matrix = + SparseMatrix.sprand(numRows, numCols, density, seed) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn( + numRows: Int, + numCols: Int, + density: Double, + seed: Long = Utils.random.nextLong()): Matrix = + SparseMatrix.sprandn(numRows, numCols, density, seed) + + /** + * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. Use + * [[org.apache.spark.mllib.linalg.SparseMatrix.diag()]] in order to generate the matrix in + * `SparseMatrix` format. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `Matrix` with size `values.length` x `values.length` and `values` + * on the diagonal + */ + def diag(vector: Vector): Matrix = DenseMatrix.diag(vector) + + /** + * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format + * the matrices are supplied in. Supplying a mix of dense and sparse matrices is not supported. + * @param matrices sequence of matrices + * @return a single `Matrix` composed of the matrices that were horizontally concatenated + */ + private[mllib] def horzcat(matrices: Seq[Matrix]): Matrix = { + if (matrices.size == 1) { + return matrices(0) + } + val numRows = matrices(0).numRows + var rowsMatch = true + var isDense = false + var isSparse = false + for (mat <- matrices) { + if (numRows != mat.numRows) rowsMatch = false + mat match { + case sparse: SparseMatrix => isSparse = true + case dense: DenseMatrix => isDense = true + } + } + require(rowsMatch, "The number of rows of the matrices in this array, don't match!") + var numCols = 0 + matrices.foreach(numCols += _.numCols) + if (isSparse && !isDense) { + val allColPtrs: Array[Int] = Array(0) ++ matrices.flatMap { mat => + val ptr = mat.asInstanceOf[SparseMatrix].colPtrs + ptr.slice(1, ptr.length) + } + var counter = 0 + val adjustedPtrs = allColPtrs.map { p => + counter += p + counter + } + new SparseMatrix(numRows, numCols, adjustedPtrs, + matrices.flatMap(_.asInstanceOf[SparseMatrix].rowIndices).toArray, + matrices.flatMap(_.asInstanceOf[SparseMatrix].values).toArray) + } else if (!isSparse && !isDense) { + throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + + " DenseMatrix format!") + } else { + new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + } + } + + /** + * Batches LabeledPoint examples into matrices so that efficient matrix multiplication methods + * can be used to speed up gradient calculation and weight updates in optimization methods such as + * Gradient Descent and L-BFGS. + * @param rows RDD of labels (Double) and feature vectors (Vector) + * @param partitionMetaData Map of a partition to the max number of non-zeros in that partition + * so that we can preallocate a memory efficient buffer + * @param batchSize the number of examples to be batched together + * @param buildSparseThreshold the density threshold of a matrix where either a `SparseMatrix` or + * a `DenseMatrix` is constructed + * @param generateOnTheFly whether to allocate a memory buffer so that matrices can be generated + * on the fly or to compute a cacheable RDD. + * @return an RDD of the batched labels (`DenseMatrix`) and the feature vectors (`Matrix`) + */ + private[mllib] def fromRDD( + rows: RDD[(Double, Vector)], + partitionMetaData: Map[Int, Int], + batchSize : Int, + buildSparseThreshold: Double, + generateOnTheFly: Boolean = true): RDD[(DenseMatrix, Matrix)] = { + + if (!generateOnTheFly) { + rows.mapPartitions { iter => + iter.grouped(batchSize) + }.map(fromSeq(_, batchSize)) + } else { + val numFeatures = rows.first()._2.size + + rows.mapPartitionsWithIndex{ case (ind, iter) => + val nnz = partitionMetaData(ind) + val matrixBuffer = + if (nnz != -1) { + val density = nnz * 1.0 / (numFeatures * batchSize) + if (density <= buildSparseThreshold) { + (DenseMatrix.zeros(batchSize, 1), new SparseMatrix(numFeatures, batchSize, + Array.fill(batchSize + 1)(0), Array.fill(nnz)(0), Array.fill(nnz)(0.0))) + } else { + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + } + } else { + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + } + iter.grouped(batchSize).map(fromSeqIntoBuffer(_, matrixBuffer, batchSize)._2) + } + } + } + + /** + * Collects data on the maximum number of non-zero elements in a partition for each batch of + * matrices + * @param rows RDD of labels (Double) and feature vectors (Vector) + * @param batchSize the number of examples to be batched together + * @return a mapping of partitions to the maximum number of non-zero elements observed in that + * partition + */ + private[mllib] def getSparsityData( + rows: RDD[(Double, Vector)], + batchSize : Int = 64): Map[Int, Int] = { + val numFeatures = rows.first()._2.size + + val partitionMetaData = rows.mapPartitionsWithIndex { case (ind, iter) => + val matrixBuffer = + (DenseMatrix.zeros(batchSize, 1), DenseMatrix.zeros(numFeatures, batchSize)) + var partitionMaxNNZ = -1 + + iter.grouped(batchSize).foreach { r => + val (metaData, _) = fromSeqIntoBuffer(r, matrixBuffer, batchSize) + val maxNNZ = + if (metaData > partitionMaxNNZ) metaData else partitionMaxNNZ + + partitionMaxNNZ = maxNNZ + } + + Iterator((ind, partitionMaxNNZ)) + } + partitionMetaData.collect().toMap + } + + /** + * Constructs matrices from a batch of examples. Allocates memory for each batch, therefore the + * resulting RDD will be cacheable. + * @param rows Sequence of labels (Double) and feature vectors (Vector) + * @param batchSize the number of examples to be batched together + * @return a tuple of the batched labels (`DenseMatrix`) and the feature vectors (`Matrix`) + */ + private def fromSeq(rows: Seq[(Double, Vector)], batchSize: Int) : (DenseMatrix, Matrix) = { + val numExamples = rows.length + val numFeatures = rows(0)._2.size + val matrixBuffer = DenseMatrix.zeros(numExamples, numFeatures) + val labelBuffer = DenseMatrix.zeros(numExamples, 1) + flattenMatrix(rows, matrixBuffer, labelBuffer, batchSize) + + (matrixBuffer, labelBuffer) + } + + /** + * Constructs matrices from a batch of examples into a pre-allocated buffer. The RDD generated + * using this method is not cacheable. Used for on-the-fly matrix generation. + * @param rows Sequence of labels (Double) and feature vectors (Vector) + * @param batchSize the number of examples to be batched together + * @return a tuple of the batched labels (`DenseMatrix`) and the feature vectors (`Matrix`) + */ + private def fromSeqIntoBuffer( + rows: Seq[(Double, Vector)], + buffer: (DenseMatrix, Matrix), + batchSize: Int) : (Int, (DenseMatrix, Matrix)) = { + val labelBuffer = buffer._1 + val matrixBuffer = buffer._2 + val metadata = flattenMatrix(rows, matrixBuffer, labelBuffer, batchSize) + + (metadata, buffer) + } + + /** + * Unrolls a sequence of label, feature vector pairs into matrices. For speed, the transpose of + * the feature vectors is generated. + * @param vals Sequence of labels (Double) and feature vectors (Vector) + * @param matrixInto The `Matrix` to unroll the feature vectors into + * @param labelsInto The `DenseMatrix` to unroll the labels into + * @param batchSize The number of examples that were supposed to be batched. Usually will equal + * the size of `vals`, except maybe for the last group of examples. Required to + * clear previous data. + * @return The number of non-zeros in this batch of feature vectors + */ + private def flattenMatrix( + vals: Seq[(Double, Vector)], + matrixInto: Matrix, + labelsInto: DenseMatrix, + batchSize: Int): Int = { + val numExamples = vals.length + val numFeatures = vals(0)._2.size + var i = 0 + var nnz = 0 + matrixInto match { + case intoSparse: SparseMatrix => + for (r <- vals) { + labelsInto.values(i) = r._1 + r._2 match { + case sVec: SparseVector => + val len = sVec.indices.length + var j = 0 + intoSparse.colPtrs(i) = nnz + while (j < len) { + intoSparse.rowIndices(nnz) = sVec.indices(j) + intoSparse.values(nnz) = sVec.values(j) + nnz += 1 + j += 1 + } + case dVec: DenseVector => + var j = 0 + intoSparse.colPtrs(i) = nnz + while (j < numFeatures) { + val value = dVec.values(j) + if (value != 0.0) { + intoSparse.rowIndices(nnz) = j + intoSparse.values(nnz) = dVec.values(j) + nnz += 1 + } + j += 1 + } + } + i += 1 + } + // Make sure that existing values already in further down the array won't be used. + while (i < batchSize) { + intoSparse.colPtrs(i) = nnz + i += 1 + } + case intoDense: DenseMatrix => // Have to loop over all values to clear existing ones + for (r <- vals) { + labelsInto.values(i) = r._1 + val startIndex = numFeatures * i + r._2 match { + case sVec: SparseVector => + val len = sVec.indices.length + var j = 0 + var sVecCounter = 0 + while (j < numFeatures) { + intoDense.values(startIndex + j) = 0.0 + if (sVecCounter < len) { + if (j == sVec.indices(sVecCounter)) { + intoDense.values(startIndex + j) = sVec.values(sVecCounter) + nnz += 1 + sVecCounter += 1 + } + } + j += 1 + } + case dVec: DenseVector => + var j = 0 + while (j < numFeatures) { + val value = dVec.values(j) + if (value != 0.0) nnz += 1 + intoDense.values(startIndex + j) = value + j += 1 + } + } + i += 1 + } + // clear existing values if we can not fill up the matrix + if (numExamples != batchSize) { + var j = numExamples * numFeatures + val len = intoDense.values.length + while (j < len) { + intoDense.values(j) = 0.0 + j += 1 + } + } + } + nnz + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index a45781d12e41..3b04ef6da4c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkException * * Note: Users should not implement this interface. */ -trait Vector extends Serializable { +sealed trait Vector extends Serializable { /** * Size of the vector. @@ -72,6 +72,12 @@ trait Vector extends Serializable { def copy: Vector = { throw new NotImplementedError(s"copy is not implemented for ${this.getClass}.") } + + /** Maps all the values in the vector. Generates a new vector. */ + private[mllib] def map(f: Double => Double): Vector + + /** Updates all the values in the vector. In Place. */ + private[mllib] def update(f: Double => Double): Vector } /** @@ -186,6 +192,52 @@ object Vectors { sys.error("Unsupported Breeze vector type: " + v.getClass.getName) } } + + /** + * Appends a sequence of vectors together. Preserves sparsity if and only if only vecs is composed + * of SparseVectors only. + * @param vecs sequence of vectors + * @return a single long `Vector` composed of the vectors that were appended to each other. + */ + private[mllib] def append(vecs: Seq[Vector]): Vector = { + if (vecs.size == 1) { + return vecs(0) + } + var isDense = false + var isSparse = false + var totalLength = 0 + val lengths: Seq[(Int, Int)] = vecs.map { + case sparse: SparseVector => + isSparse = true + totalLength += sparse.size + (totalLength, sparse.indices.length) + case dense: DenseVector => + isDense = true + totalLength += dense.size + (totalLength, dense.size) + } + if (isSparse && !isDense) { + val allIndices: Seq[Int] = vecs.flatMap(_.asInstanceOf[SparseVector].indices) + var vectorCounter = 0 + var elementCounter = 0 // The count inside the vector + val adjustedIndices = allIndices.map { p => + // less than equal, in case Vector is empty + while (lengths(vectorCounter)._2 <= elementCounter + 1) { + vectorCounter += 1 + elementCounter = -1 + } + elementCounter += 1 + if (elementCounter != 0) lengths(vectorCounter)._1 + p else lengths(vectorCounter - 1)._2 + }.toArray + new SparseVector(totalLength, adjustedIndices, + vecs.flatMap(_.asInstanceOf[SparseMatrix].values).toArray) + } else if (!isSparse && !isDense) { + throw new IllegalArgumentException("The supplied vectors are neither in SparseVector or" + + " DenseVector format!") + } else { + new DenseVector(vecs.flatMap(_.toArray).toArray) + } + } } /** @@ -206,6 +258,19 @@ class DenseVector(val values: Array[Double]) extends Vector { override def copy: DenseVector = { new DenseVector(values.clone()) } + + private[mllib] def map(f: Double => Double): Vector = { + new DenseVector(values.map(f)) + } + + private[mllib] def update(f: Double => Double): Vector = { + var i = 0 + while (i < size) { + values(i) = f(values(i)) + i += 1 + } + this + } } /** @@ -241,4 +306,17 @@ class SparseVector( } private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) -} + + private[mllib] def map(f: Double => Double): Vector = { + new SparseVector(size, indices, values.map(f)) + } + + private[mllib] def update(f: Double => Double): Vector = { + var i = 0 + while (i < size) { + values(i) = f(values(i)) + i += 1 + } + this + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index fdd67160114c..acbcea112e0f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -17,9 +17,11 @@ package org.apache.spark.mllib.optimization +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} + import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, gemm, scal} /** * :: DeveloperApi :: @@ -157,3 +159,250 @@ class HingeGradient extends Gradient { } } } + +/** + * :: DeveloperApi :: + * Class used to compute the gradient for a loss function, given a series of data points. + */ +@DeveloperApi +abstract class MultiModelGradient extends Serializable { + /** + * Compute the gradient and loss given the features of all data points. + * + * @param data features for one data point + * @param label label for this data point + * @param weights weights/coefficients corresponding to features + * + * @return (gradient: DenseMatrix, loss: Double) + */ + def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix): (DenseMatrix, Matrix) + + /** + * Compute the gradient and loss given the features of a series of data point, + * add the gradient to a provided matrix to avoid creating new objects, and return loss. + * + * @param data features for the data points + * @param label label for the data points + * @param weights weights/coefficients corresponding to features + * @param cumGradient the computed gradient will be added to this matrix + * + * @return loss + */ + def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix, cumGradient: DenseMatrix): Matrix +} +/* +/** + * :: DeveloperApi :: + * Compute gradient and loss for a logistic loss function, as used in binary classification. + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class MultiModelLogisticGradient extends MultiModelGradient { + + private def sigmoid(p: DenseMatrix): DenseMatrix = { + def takeSigmoid(p: Double): Double = { + 1.0 / (math.exp(-p) + 1.0) + } + p.map(takeSigmoid) + } + + override def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix): (DenseMatrix, Vector) = { + val margin = data transposeMultiply weights + val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols) + + gemm(false, false, 1.0, data, sigmoid(margin).elementWiseOperateOnColumnsInPlace(_ - _, label), + 0.0, gradient) + + val negativeLabels = label.compare(0.0, _ == _) + val addMargin = margin.elementWiseOperateOnColumns(_ * _, negativeLabels) + + val loss = margin.update(v => math.log1p(math.exp(-v))). + elementWiseOperateInPlace(_ + _, addMargin) + + val lossVector = + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + (gradient, lossVector) + } + + override def compute(data: Matrix, + label: DenseMatrix, + weights: DenseMatrix, + cumGradient: DenseMatrix): Vector = { + val margin = data transposeMultiply weights + gemm(false, false, 1.0, data, sigmoid(margin).elementWiseOperateOnColumnsInPlace(_ - _, label), + 1.0, cumGradient) + + val negativeLabels = label.compare(0.0, _ == _) + val addMargin = margin.elementWiseOperateOnColumns(_ * _, negativeLabels) + + val loss = margin.update(v => math.log1p(math.exp(-v))). + elementWiseOperateInPlace(_ + _, addMargin) + + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + } +} + +/** + * :: DeveloperApi :: + * Compute gradient and loss for a Least-squared loss function, as used in linear regression. + * This is correct for the averaged least squares loss function (mean squared error) + * L = 1/n ||A weights-y||^2 + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class MultiModelLeastSquaresGradient extends MultiModelGradient { + override def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix): (DenseMatrix, Vector) = { + + val diff = (data transposeMultiply weights).elementWiseOperateOnColumnsInPlace(_ - _, label) + + val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols) + + gemm(false, false, 2.0, data, diff, 0.0, gradient) + + val loss = diff.update(v => v * v) + + val lossVector = + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + (gradient, lossVector) + } + + override def compute(data: Matrix, + label: DenseMatrix, + weights: DenseMatrix, + cumGradient: DenseMatrix): Vector = { + val diff = (data transposeMultiply weights).elementWiseOperateOnColumnsInPlace(_ - _, label) + + gemm(false, false, 2.0, data, diff, 1.0, cumGradient) + val loss = diff.update(v => v * v) + + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + } +} +*/ + +/** + * :: DeveloperApi :: + * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification. + * See also the documentation for the precise formulation. + * NOTE: This assumes that the labels are {0,1} + */ +@DeveloperApi +class MultiModelHingeGradient extends MultiModelGradient { + override def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix): (DenseMatrix, Vector) = { + + val dotProduct = (data transposeMultiply weights).toBreeze + val brzData = data.toBreeze + // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val brzScaledLabels = new BDM[Double](1, label.numRows, label.values.map(_ * 2 - 1.0)) + + dotProduct *= brzScaledLabels + brzScaledLabels. + val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace) + val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols) + val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active + + gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, gradient) + + val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _)) + + val lossVector = + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + (gradient, lossVector) + } + + override def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix, cumGradient: DenseMatrix): Vector = { + + val dotProduct = data transposeMultiply weights + // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = new DenseMatrix(1, label.numRows, label.map(_ * 2 - 1.0).values) + dotProduct.elementWiseOperateOnColumnsInPlace(_ * _, labelScaled) + + val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace) + + val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active + + gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, cumGradient) + + val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _)) + + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + } +} +/* +override def compute(data: Matrix, label: DenseMatrix, + weights: DenseMatrix, cumGradient: DenseMatrix): Vector = { + + val dotProduct = data transposeMultiply weights + // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = new DenseMatrix(1, label.numRows, label.map(_ * 2 - 1.0).values) + dotProduct.elementWiseOperateOnColumnsInPlace(_ * _, labelScaled) + + val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace) + + val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active + + gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, cumGradient) + + val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _)) + + if (data.isInstanceOf[DenseMatrix]) { + val numFeatures = data.numRows + val zeroEntries = data.compare(0.0, _ == _) + val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _) + loss.colSums(false, shouldSkip) + } else { + loss.colSums + } + } + */ \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index a6912056395d..1f578ed0d0fe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -33,7 +33,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._ * @param updater Updater to be used to update weights after every iteration. */ class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater) - extends Optimizer with Logging { + extends Optimizer[Vector] with Logging { private var stepSize: Double = 1.0 private var numIterations: Int = 100 @@ -181,6 +181,7 @@ object GradientDescent extends Logging { var regVal = updater.compute( weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 + //println(s"initial:\n$weights\n\n") for (i <- 1 to numIterations) { val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index d16d0daf0856..410e8249fda9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -38,7 +38,7 @@ import org.apache.spark.rdd.RDD */ @DeveloperApi class LBFGS(private var gradient: Gradient, private var updater: Updater) - extends Optimizer with Logging { + extends Optimizer[Vector] with Logging { private var numCorrections = 10 private var convergenceTol = 1E-4 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescent.scala new file mode 100644 index 000000000000..c764bbc58e6e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescent.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.collection.mutable.ArrayBuffer + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.rdd.RDDFunctions._ + +class MultiModelGradientDescent private[mllib] ( + private var gradient: MultiModelGradient, + private var updater: Array[MultiModelUpdater]) extends Optimizer[Matrix] with Logging { + + private var stepSize: Array[Double] = Array(1.0, 0.1) + private var numIterations: Array[Int] = Array(100) + private var regParam: Array[Double] = Array(0.0, 0.1, 1.0) + private var miniBatchFraction: Double = 1.0 + + /** + * Set the initial step size of SGD for the first step. Default (1.0, 0.1). + * In subsequent steps, the step size will decrease with stepSize/sqrt(t) + */ + def setStepSize(step: Array[Double]): this.type = { + this.stepSize = step + this + } + + /** + * :: Experimental :: + * Set fraction of data to be used for each SGD iteration. + * Default 1.0 (corresponding to deterministic/classical gradient descent) + */ + @Experimental + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Array[Int]): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter. Default (0.0, 0.1, 1.0). + */ + def setRegParam(regParam: Array[Double]): this.type = { + this.regParam = regParam + this + } + + /** + * Set the gradient function (of the loss function of one single data example) + * to be used for SGD. + */ + def setGradient(gradient: MultiModelGradient): this.type = { + this.gradient = gradient + this + } + + + /** + * Set the updater function to actually perform a gradient step in a given direction. + * The updater is responsible to perform the update from the regularization term as well, + * and therefore determines what kind or regularization is used, if any. + */ + def setUpdater(updater: Array[MultiModelUpdater]): this.type = { + this.updater = updater + this + } + + /** + * :: DeveloperApi :: + * Runs gradient descent on the given training data. + * @param data training data + * @param initialWeights initial weights + * @return solution vector + */ + @DeveloperApi + def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Matrix = { + val (weights, _) = MultiModelGradientDescent.runMiniBatchMMSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +/** + * :: DeveloperApi :: + * Top-level method to run gradient descent. + */ +@DeveloperApi +object MultiModelGradientDescent extends Logging { + /** + * Run stochastic gradient descent (SGD) in parallel using mini batches. + * In each iteration, we sample a subset (fraction miniBatchFraction) of the total data + * in order to compute a gradient estimate. + * Sampling, and averaging the subgradients over this subset is performed using one standard + * spark map-reduce in each iteration. + * + * @param data - Input data for SGD. RDD of the set of data examples, each of + * the form (label, [feature values]). + * @param gradient - Gradient object (used to compute the gradient of the loss function of + * one single data example) + * @param updater - Updater function to actually perform a gradient step in a given direction. + * @param stepSize - initial step size for the first step + * @param numIterations - number of iterations that SGD should be run. + * @param regParam - regularization parameter + * @param miniBatchFraction - fraction of the input data set that should be used for + * one iteration of SGD. Default value 1.0. + * + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the + * stochastic loss computed for every iteration. + */ + def runMiniBatchMMSGD( + data: RDD[(Double, Vector)], + gradient: MultiModelGradient, + updater: Array[MultiModelUpdater], + stepSize: Array[Double], + numIterations: Array[Int], + regParam: Array[Double], + miniBatchFraction: Double, + initialWeights: Vector, + batchSize: Int = 64, + useSparse: Boolean = true, + buildSparseThreshold: Double = 0.2): (Matrix, Array[Vector]) = { + + val maxNumIter = numIterations.max + val stochasticLossHistory = new ArrayBuffer[Vector](maxNumIter) + + val numExamples = data.count() + val miniBatchSize = numExamples * miniBatchFraction + val numModels = stepSize.length * regParam.length + val numFeatures = initialWeights.size + val numRegularizers = updater.length + val updaterCounter = 0 until numRegularizers + // Initialize weights as a column vector + var weights = updaterCounter.map { i => + new DenseMatrix(numFeatures, 1, initialWeights.toArray). + multiply(DenseMatrix.ones(1, numModels)) + } + + var finalWeights: Matrix = new DenseMatrix(numFeatures, 0, Array.empty[Double]) + + // if no data, return initial weights to avoid NaNs + if (numExamples == 0) { + + logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found") + return (Matrices.horzcat(weights), stochasticLossHistory.toArray) + + } + val stepSizeMatrix = new DenseMatrix(1, numModels, + stepSize.flatMap{ ss => + for (i <- 1 to regParam.length) yield ss + } + ) + val regMatrix = SparseMatrix.diag(Vectors.dense(stepSize.flatMap{ ss => + for (reg <- regParam) yield reg + })) + + val bcMetaData = + if (useSparse) { + data.context.broadcast(Matrices.getSparsityData(data, batchSize)) + } else { + val emptyData: Map[Int, Int] = (0 until data.partitions.length).map { i => + (i, -1)}.toMap + data.context.broadcast(emptyData) + } + val points = Matrices.fromRDD(data, bcMetaData.value, batchSize, buildSparseThreshold) + + /** + * For the first iteration, the regVal will be initialized as sum of weight squares + * if it's L2 updater; for L1 updater, the same logic is followed. + */ + val updaterWithIndex = updater.zipWithIndex + + var regVal = updaterWithIndex.map { case (u, ind) => + u.compute(weights(ind), DenseMatrix.zeros(numFeatures, numModels), + DenseMatrix.zeros(1, numModels), 1, regMatrix)._2 + } + val orderedIters = numIterations.sorted + var iterIndexCounter = 0 + for (i <- 1 to maxNumIter) { + val bcWeights = data.context.broadcast(weights) + // Sample a subset (fraction miniBatchFraction) of the total data + // compute and sum up the subgradients on this subset (this is one map-reduce) + val (gradientSum, lossSum) = points.sample(false, miniBatchFraction, 42 + i) + .treeAggregate(updaterCounter.map(j => Matrices.zeros(numFeatures, numModels)), + updaterCounter.map(j => Matrices.zeros(1, numModels)))( + seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => + val l = updaterCounter.map { j => + gradient.compute(features, label, bcWeights.value(j), + grad(j).asInstanceOf[DenseMatrix]) + } + (grad, loss.zip(l).map(r => r._1.elementWiseOperateInPlace(_ + _, r._2))) + }, + combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => + (grad1.zip(grad2).map(r => r._1.elementWiseOperateInPlace(_ + _, r._2)), + loss1.zip(loss2).map(r => r._1.elementWiseOperateInPlace(_ + _, r._2))) + }) + stochasticLossHistory.append(Vectors.dense(Matrices.horzcat(updaterCounter.map { i => + lossSum(i).elementWiseOperateScalarInPlace(_ / _, miniBatchSize). + elementWiseOperateOnRowsInPlace(_ + _, regVal(i)) + }).toArray)) + val update = updaterWithIndex.map { case (u, ind) => u.compute( + weights(ind), gradientSum(ind).elementWiseOperateScalarInPlace(_ / _, miniBatchSize). + asInstanceOf[DenseMatrix], stepSizeMatrix, i, regMatrix) + } + weights = update.map(_._1).toIndexedSeq + regVal = update.map(_._2) + + if (i == orderedIters(iterIndexCounter)){ + finalWeights = Matrices.horzcat(Seq(finalWeights) ++ weights) + iterIndexCounter += 1 + } + } + + logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + + (finalWeights, stochasticLossHistory.toArray) + + } +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala index 7f6d94571b5e..8f7d40a1ef05 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala @@ -27,10 +27,10 @@ import org.apache.spark.mllib.linalg.Vector * Trait for optimization problem solvers. */ @DeveloperApi -trait Optimizer extends Serializable { +trait Optimizer[V] extends Serializable { /** * Solve the provided convex optimization problem. */ - def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector + def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): V } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala index 3ed3a5b9b384..57bf900c3855 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala @@ -19,10 +19,11 @@ package org.apache.spark.mllib.optimization import scala.math._ -import breeze.linalg.{norm => brzNorm, axpy => brzAxpy, Vector => BV} +import breeze.linalg.{norm => brzNorm} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.BLAS.{axpy, gemm, scal} /** * :: DeveloperApi :: @@ -68,17 +69,17 @@ abstract class Updater extends Serializable { */ @DeveloperApi class SimpleUpdater extends Updater { - override def compute( + def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { val thisIterStepSize = stepSize / math.sqrt(iter) - val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector - brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) - (Vectors.fromBreeze(brzWeights), 0) + axpy(-thisIterStepSize, gradient, weightsOld) + + (weightsOld, 0) } } @@ -103,7 +104,7 @@ class SimpleUpdater extends Updater { */ @DeveloperApi class L1Updater extends Updater { - override def compute( + def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, @@ -111,18 +112,22 @@ class L1Updater extends Updater { regParam: Double): (Vector, Double) = { val thisIterStepSize = stepSize / math.sqrt(iter) // Take gradient step - val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector - brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) + //println(s"\n$iter:") + //println(s"\n$gradient\n") + axpy(-thisIterStepSize, gradient, weightsOld) + //println(s"\n$weightsOld\n") // Apply proximal operator (soft thresholding) val shrinkageVal = regParam * thisIterStepSize + //println(s"\n$shrinkageVal\n") var i = 0 - while (i < brzWeights.length) { - val wi = brzWeights(i) - brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal) + val weightValues = weightsOld.toArray + while (i < weightsOld.size) { + val wi = weightValues(i) + weightValues(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal) i += 1 } - - (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam) + //println(s"\n$weightsOld\n") + (weightsOld, brzNorm(weightsOld.toBreeze, 1.0) * regParam) } } @@ -134,7 +139,7 @@ class L1Updater extends Updater { */ @DeveloperApi class SquaredL2Updater extends Updater { - override def compute( + def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, @@ -145,12 +150,151 @@ class SquaredL2Updater extends Updater { // w' = w - thisIterStepSize * (gradient + regParam * w) // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient val thisIterStepSize = stepSize / math.sqrt(iter) - val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector - brzWeights :*= (1.0 - thisIterStepSize * regParam) - brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) - val norm = brzNorm(brzWeights, 2.0) + scal(1.0 - thisIterStepSize * regParam, weightsOld) + axpy(-thisIterStepSize, gradient, weightsOld) + val norm = brzNorm(weightsOld.toBreeze, 2.0) - (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) + (weightsOld, 0.5 * regParam * norm * norm) } } +/** + * :: DeveloperApi :: + * Class used to perform steps (weight update) using Gradient Descent methods. + * + * For general minimization problems, or for regularized problems of the form + * min L(w) + regParam * R(w), + * the compute function performs the actual update step, when given some + * (e.g. stochastic) gradient direction for the loss L(w), + * and a desired step-size (learning rate). + * + * The updater is responsible to also perform the update coming from the + * regularization term R(w) (if any regularization is used). + */ +@DeveloperApi +abstract class MultiModelUpdater extends Serializable { + /** + * Compute an updated value for weights given the gradient, stepSize, iteration number and + * regularization parameter. Also returns the regularization value regParam * R(w) + * computed using the *updated* weights. + * + * @param weightsOld - Column matrix of size dx1 where d is the number of features. + * @param gradient - Column matrix of size dx1 where d is the number of features. + * @param stepSize - step size across iterations + * @param iter - Iteration number + * @param regParam - Regularization parameter + * + * @return A tuple of 2 elements. The first element is a column matrix containing updated weights, + * and the second element is the regularization value computed using updated weights. + */ + def compute( + weightsOld: DenseMatrix, + gradient: DenseMatrix, + stepSize: DenseMatrix, + iter: Int, + regParam: Matrix): (DenseMatrix, Matrix) +} + +/** + * :: DeveloperApi :: + * A simple updater for gradient descent *without* any regularization. + * Uses a step-size decreasing with the square root of the number of iterations. + */ +@DeveloperApi +class MultiModelSimpleUpdater extends MultiModelUpdater { + def compute( + weightsOld: DenseMatrix, + gradient: DenseMatrix, + stepSize: DenseMatrix, + iter: Int, + regParam: Matrix): (DenseMatrix, Matrix) = { + val thisIterStepSize = + SparseMatrix.diag(Vectors.dense(stepSize.map(-_ / sqrt(iter)).toArray)) + + gemm(1.0, gradient,thisIterStepSize, 1.0, weightsOld) + + (weightsOld, Matrices.zeros(1, weightsOld.numCols)) + } +} + +/** + * :: DeveloperApi :: + * Updater for L1 regularized problems. + * R(w) = ||w||_1 + * Uses a step-size decreasing with the square root of the number of iterations. + + * Instead of subgradient of the regularizer, the proximal operator for the + * L1 regularization is applied after the gradient step. This is known to + * result in better sparsity of the intermediate solution. + * + * The corresponding proximal operator for the L1 norm is the soft-thresholding + * function. That is, each weight component is shrunk towards 0 by shrinkageVal. + * + * If w > shrinkageVal, set weight component to w-shrinkageVal. + * If w < -shrinkageVal, set weight component to w+shrinkageVal. + * If -shrinkageVal < w < shrinkageVal, set weight component to 0. + * + * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) + */ +@DeveloperApi +class MultiModelL1Updater extends MultiModelUpdater { + def compute( + weightsOld: DenseMatrix, + gradient: DenseMatrix, + stepSize: DenseMatrix, + iter: Int, + regParam: Matrix): (DenseMatrix, Matrix) = { + val thisIterStepSize = + SparseMatrix.diag(Vectors.dense(stepSize.map(-_ / sqrt(iter)).toArray)) + + // Take gradient step + gemm(1.0, gradient,thisIterStepSize, 1.0, weightsOld) + // Apply proximal operator (soft thresholding) + val shrinkageVal = regParam.elementWiseOperate(_ * _, thisIterStepSize) + + var j = 0 + while (j < weightsOld.numCols){ + var i = 0 + while (i < weightsOld.numRows) { + val wi = weightsOld(i, j) + weightsOld.update(i, j, signum(wi) * max(0.0, abs(wi) + shrinkageVal(j, j))) + i += 1 + } + j += 1 + } + + (weightsOld, weightsOld.colNorms(1.0) multiplyInPlace regParam) + } +} +/** + * :: DeveloperApi :: + * Updater for L2 regularized problems. + * R(w) = 1/2 ||w||^2 + * Uses a step-size decreasing with the square root of the number of iterations. + */ +@DeveloperApi +class MultiModelSquaredL2Updater extends MultiModelUpdater { + def compute( + weightsOld: DenseMatrix, + gradient: DenseMatrix, + stepSize: DenseMatrix, + iter: Int, + regParam: Matrix): (DenseMatrix, Matrix) = { + // add up both updates from the gradient of the loss (= step) as well as + // the gradient of the regularizer (= regParam * weightsOld) + // w' = w - thisIterStepSize * (gradient + regParam * w) + // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient + val thisIterStepSize = + SparseMatrix.diag(Vectors.dense(stepSize.map(-_ / sqrt(iter)).toArray)) + + weightsOld multiplyInPlace thisIterStepSize.elementWiseOperate(_ * _, regParam). + elementWiseOperateInPlace(_ + _, Matrices.speye(thisIterStepSize.numRows)) + + gemm(1.0, gradient,thisIterStepSize, 1.0, weightsOld) + + val norm = weightsOld.colNorms(2.0) + + (weightsOld, (norm.elementWiseOperate(_ * _, norm) multiplyInPlace regParam). + elementWiseOperateScalarInPlace(_ * _, 0.5)) + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 20c1fdd2269c..88e56826177e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -88,7 +88,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] protected val validators: Seq[RDD[LabeledPoint] => Boolean] = List() /** The optimizer to solve the problem. */ - def optimizer: Optimizer + def optimizer: Optimizer[Vector] /** Whether to add intercept (default: false). */ protected var addIntercept: Boolean = false diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala index 1952e6734ecf..2fdb3bc74ab2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala @@ -126,4 +126,153 @@ class BLASSuite extends FunSuite { } } } + + test("gemm") { + + val dA = + new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0)) + val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0)) + + val B = new DenseMatrix(3, 2, Array(1.0, 0.0, 0.0, 0.0, 2.0, 1.0)) + val sB = new SparseMatrix(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(1.0, 2.0, 1.0)) + val BT = new DenseMatrix(2, 3, Array(1.0, 0.0, 0.0, 2.0, 0.0, 1.0)) + val sBT = new SparseMatrix(2, 3, Array(0, 1, 2, 3), Array(0, 1, 1), Array(1.0, 2.0, 1.0)) + val expected = new DenseMatrix(4, 2, Array(0.0, 1.0, 0.0, 0.0, 4.0, 0.0, 2.0, 3.0)) + + assert(dA multiply B ~== expected absTol 1e-15) + assert(sA multiply B ~== expected absTol 1e-15) + assert(dA multiply sB ~== expected absTol 1e-15) + + val C1 = new DenseMatrix(4, 2, Array(1.0, 0.0, 2.0, 1.0, 0.0, 0.0, 1.0, 0.0)) + val C2 = C1.copy + val C3 = C1.copy + val C4 = C1.copy + val C5 = C1.copy + val C6 = C1.copy + val C7 = C1.copy + val C8 = C1.copy + val C9 = C1.copy + val C10 = C1.copy + val C11 = C1.copy + val C12 = C1.copy + val C13 = C1.copy + val C14 = C1.copy + val C15 = C1.copy + val C16 = C1.copy + val expected2 = new DenseMatrix(4, 2, Array(2.0, 2.0, 4.0, 2.0, 8.0, 0.0, 6.0, 6.0)) + + gemm(2.0, dA, B, 2.0, C1) + gemm(2.0, sA, B, 2.0, C2) + gemm(2.0, dA, sB, 2.0, C3) + gemm(false, true, 2.0, dA, BT, 2.0, C4) + gemm(false, true, 2.0, sA, BT, 2.0, C5) + gemm(false, true, 2.0, dA, sBT, 2.0, C6) + assert(C1 ~== expected2 absTol 1e-15) + assert(C2 ~== expected2 absTol 1e-15) + assert(C3 ~== expected2 absTol 1e-15) + assert(C4 ~== expected2 absTol 1e-15) + assert(C5 ~== expected2 absTol 1e-15) + assert(C6 ~== expected2 absTol 1e-15) + + withClue("columns of A don't match the rows of B") { + intercept[Exception] { + gemm(true, false, 1.0, dA, B, 2.0, C1) + } + } + + withClue("not supported") { + intercept[Exception] { + gemm(false, false, 1.0, sA, sB, 2.0, C1) + } + } + + withClue("not supported") { + intercept[Exception] { + sA multiply sB + } + } + + val dAT = + new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0)) + val sAT = + new SparseMatrix(3, 4, Array(0, 1, 2, 3, 4), Array(1, 0, 1, 2), Array(2.0, 1.0, 1.0, 3.0)) + + assert(dAT transposeMultiply B ~== expected absTol 1e-15) + assert(sAT transposeMultiply B ~== expected absTol 1e-15) + assert(dAT transposeMultiply sB ~== expected absTol 1e-15) + + gemm(true, false, 2.0, dAT, B, 2.0, C7) + gemm(true, false, 2.0, sAT, B, 2.0, C8) + gemm(true, false, 2.0, dAT, sB, 2.0, C9) + gemm(true, true, 2.0, dAT, BT, 2.0, C10) + gemm(true, true, 2.0, sAT, BT, 2.0, C11) + gemm(true, true, 2.0, dAT, sBT, 2.0, C12) + assert(C7 ~== expected2 absTol 1e-15) + assert(C8 ~== expected2 absTol 1e-15) + assert(C9 ~== expected2 absTol 1e-15) + assert(C10 ~== expected2 absTol 1e-15) + assert(C11 ~== expected2 absTol 1e-15) + assert(C12 ~== expected2 absTol 1e-15) + + withClue("not supported") { + intercept[Exception] { + gemm(true, false, 1.0, sAT, sB, 2.0, C1) + } + } + } + + test("gemv") { + + val dA = + new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0)) + val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0)) + + val x = new DenseVector(Array(1.0, 2.0, 3.0)) + val expected = new DenseVector(Array(4.0, 1.0, 2.0, 9.0)) + + assert(dA multiply x ~== expected absTol 1e-15) + assert(sA multiply x ~== expected absTol 1e-15) + + val y1 = new DenseVector(Array(1.0, 3.0, 1.0, 0.0)) + val y2 = y1.copy + val y3 = y1.copy + val y4 = y1.copy + val y5 = y1.copy + val y6 = y1.copy + val y7 = y1.copy + val y8 = y1.copy + val expected2 = new DenseVector(Array(6.0, 7.0, 4.0, 9.0)) + val expected3 = new DenseVector(Array(10.0, 8.0, 6.0, 18.0)) + + gemv(1.0, dA, x, 2.0, y1) + gemv(1.0, sA, x, 2.0, y2) + gemv(2.0, dA, x, 2.0, y3) + gemv(2.0, sA, x, 2.0, y4) + assert(y1 ~== expected2 absTol 1e-15) + assert(y2 ~== expected2 absTol 1e-15) + assert(y3 ~== expected3 absTol 1e-15) + assert(y4 ~== expected3 absTol 1e-15) + withClue("columns of A don't match the rows of B") { + intercept[Exception] { + gemv(true, 1.0, dA, x, 2.0, y1) + } + } + + val dAT = + new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0)) + val sAT = + new SparseMatrix(3, 4, Array(0, 1, 2, 3, 4), Array(1, 0, 1, 2), Array(2.0, 1.0, 1.0, 3.0)) + + assert(dAT transposeMultiply x ~== expected absTol 1e-15) + assert(sAT transposeMultiply x ~== expected absTol 1e-15) + + gemv(true, 1.0, dAT, x, 2.0, y5) + gemv(true, 1.0, sAT, x, 2.0, y6) + gemv(true, 2.0, dAT, x, 2.0, y7) + gemv(true, 2.0, sAT, x, 2.0, y8) + assert(y5 ~== expected2 absTol 1e-15) + assert(y6 ~== expected2 absTol 1e-15) + assert(y7 ~== expected3 absTol 1e-15) + assert(y8 ~== expected3 absTol 1e-15) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala index 82d49c76ed02..73a6d3a27d86 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeMatrixConversionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import org.scalatest.FunSuite -import breeze.linalg.{DenseMatrix => BDM} +import breeze.linalg.{DenseMatrix => BDM, CSCMatrix => BSM} class BreezeMatrixConversionSuite extends FunSuite { test("dense matrix to breeze") { @@ -37,4 +37,26 @@ class BreezeMatrixConversionSuite extends FunSuite { assert(mat.numCols === breeze.cols) assert(mat.values.eq(breeze.data), "should not copy data") } + + test("sparse matrix to breeze") { + val values = Array(1.0, 2.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(1, 2, 1, 2) + val mat = Matrices.sparse(3, 2, colPtrs, rowIndices, values) + val breeze = mat.toBreeze.asInstanceOf[BSM[Double]] + assert(breeze.rows === mat.numRows) + assert(breeze.cols === mat.numCols) + assert(breeze.data.eq(mat.asInstanceOf[SparseMatrix].values), "should not copy data") + } + + test("sparse breeze matrix to sparse matrix") { + val values = Array(1.0, 2.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(1, 2, 1, 2) + val breeze = new BSM[Double](values, 3, 2, colPtrs, rowIndices) + val mat = Matrices.fromBreeze(breeze).asInstanceOf[SparseMatrix] + assert(mat.numRows === breeze.rows) + assert(mat.numCols === breeze.cols) + assert(mat.values.eq(breeze.data), "should not copy data") + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 9c66b4db9f16..40322594503c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -36,4 +36,84 @@ class MatricesSuite extends FunSuite { Matrices.dense(3, 2, Array(0.0, 1.0, 2.0)) } } + + test("sparse matrix construction") { + val m = 3 + val n = 2 + val values = Array(1.0, 2.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(1, 2, 1, 2) + val mat = Matrices.sparse(m, n, colPtrs, rowIndices, values).asInstanceOf[SparseMatrix] + assert(mat.numRows === m) + assert(mat.numCols === n) + assert(mat.values.eq(values), "should not copy data") + assert(mat.colPtrs.eq(colPtrs), "should not copy data") + assert(mat.rowIndices.eq(rowIndices), "should not copy data") + } + + test("sparse matrix construction with wrong number of elements") { + intercept[IllegalArgumentException] { + Matrices.sparse(3, 2, Array(0, 1), Array(1, 2, 1), Array(0.0, 1.0, 2.0)) + } + + intercept[IllegalArgumentException] { + Matrices.sparse(3, 2, Array(0, 1, 2), Array(1, 2), Array(0.0, 1.0, 2.0)) + } + } + + test("matrix copies are deep copies") { + val m = 3 + val n = 2 + + val denseMat = Matrices.dense(m, n, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val denseCopy = denseMat.copy + + assert(!denseMat.toArray.eq(denseCopy.toArray)) + + val values = Array(1.0, 2.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(1, 2, 1, 2) + val sparseMat = Matrices.sparse(m, n, colPtrs, rowIndices, values) + val sparseCopy = sparseMat.copy + + assert(!sparseMat.toArray.eq(sparseCopy.toArray)) + } + + test("matrix indexing and updating") { + val m = 3 + val n = 2 + val allValues = Array(0.0, 1.0, 2.0, 3.0, 4.0, 0.0) + + val denseMat = new DenseMatrix(m, n, allValues) + + assert(denseMat(0, 1) === 3.0) + assert(denseMat(0, 1) === denseMat.values(3)) + assert(denseMat(0, 1) === denseMat(3)) + assert(denseMat(0, 0) === 0.0) + + denseMat.update(0, 0, 10.0) + assert(denseMat(0, 0) === 10.0) + assert(denseMat.values(0) === 10.0) + + val sparseValues = Array(1.0, 2.0, 3.0, 4.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(1, 2, 0, 1) + val sparseMat = new SparseMatrix(m, n, colPtrs, rowIndices, sparseValues) + + assert(sparseMat(0, 1) === 3.0) + assert(sparseMat(0, 1) === sparseMat.values(2)) + assert(sparseMat(0, 0) === 0.0) + + intercept[NoSuchElementException] { + sparseMat.update(0, 0, 10.0) + } + + sparseMat.update(0, 1, 10.0) + assert(sparseMat(0, 1) === 10.0) + assert(sparseMat.values(2) === 10.0) + } + + test("element-wise operations") { + + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index cd651fe2d2dd..6613d7bb1e04 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -155,4 +155,15 @@ class VectorsSuite extends FunSuite { throw new RuntimeException(s"copy returned ${dvCopy.getClass} on ${dv.getClass}.") } } + + test("append") { + val sparseVecs: Seq[Vector] = Seq(new SparseVector(2, Array.empty[Int], Array.empty[Double]), + new SparseVector(1, Array.empty[Int], Array.empty[Double]), + new SparseVector(2, Array(0), Array(2.0)), + new SparseVector(3, Array.empty[Int], Array.empty[Double]), + new SparseVector(5, Array(1, 4), Array(1.0, 2.0)) + ) + val sparseCollection = Vectors.append(sparseVecs) + assert(sparseCollection === new SparseVector(13, Array(3, 9, 12), Array(2.0, 1.0, 2.0))) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescentSuite.scala new file mode 100644 index 000000000000..67bce856c156 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/MultiModelGradientDescentSuite.scala @@ -0,0 +1,444 @@ +package org.apache.spark.mllib.optimization + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.{FunSuite, Matchers} + +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors} +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalClusterSparkContext, LocalSparkContext} +import org.apache.spark.mllib.util.TestingUtils._ + +object MultiModelGradientDescentSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateGDInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale * X) + def generateGDInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + val unifRand = new Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i)))) + } + + def generateSVMInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) + } + + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateSVMInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DenseMatrix(weights.length, 1, weights) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0)) + val y = x.map { xi => + val yD = (new DenseMatrix(1, xi.length, xi) multiply weightsMat) + + intercept + 0.01 * rnd.nextGaussian() + if (yD.toArray(0) < 0) 0.0 else 1.0 + } + y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) + } +} + +class MultiModelGradientDescentSuite extends FunSuite with LocalSparkContext with Matchers { + test("Assert the loss is decreasing.") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new MultiModelLogisticGradient() + val updater: Array[MultiModelUpdater] = Array(new MultiModelSimpleUpdater()) + val stepSize = Array(1.0, 0.1) + val numIterations = Array(10) + val regParam = Array(0.0) + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0 +: features.toArray) + } + + val dataRDD = sc.parallelize(data, 2).cache() + val initialWeightsWithIntercept = Vectors.dense(1.0 +: initialWeights.toArray) + + val (_, loss) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(loss.last(0) - loss.head(0) < 0, "loss isn't decreasing.") + + val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs(0) - rhs(0) } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + } + + test("Test the loss and gradient of first iteration with regularization.") { + + val gradient = new MultiModelLogisticGradient() + val updater: Array[MultiModelUpdater] = Array(new MultiModelSquaredL2Updater()) + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(2.0, -1.5, 10000, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0 +: features.toArray) + } + + val dataRDD = sc.parallelize(data, 2).cache() + + // Prepare non-zero weights + val initialWeightsWithIntercept = Vectors.dense(1.0, 0.5) + + val regParam0 = Array(0.0) + val (newWeights0, loss0) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, gradient, updater, Array(1.0), Array(1), regParam0, 1.0, initialWeightsWithIntercept) + + val regParam1 = Array(1.0) + val (newWeights1, loss1) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, gradient, updater, Array(1.0), Array(1), regParam1, 1.0, initialWeightsWithIntercept) + + assert( + loss1(0)(0) ~== (loss0(0)(0) + (math.pow(initialWeightsWithIntercept(0), 2) + + math.pow(initialWeightsWithIntercept(1), 2)) / 2) absTol 1E-5, + """For non-zero weights, the regVal should be \frac{1}{2}\sum_i w_i^2.""") + + assert( + (newWeights1(0, 0) ~== (newWeights0(0, 0) - initialWeightsWithIntercept(0)) absTol 1E-5) && + (newWeights1(1, 0) ~== (newWeights0(1, 0) - initialWeightsWithIntercept(1)) absTol 1E-5), + "The different between newWeights with/without regularization " + + "should be initialWeightsWithIntercept.") + } + + test("Check for correctness: LogisticRegression-(SimpleUpdater+SquaredL2Updater)") { + val nPoints = 100 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new MultiModelLogisticGradient() + val updater: Array[MultiModelUpdater] = + Array(new MultiModelSimpleUpdater(), new MultiModelSquaredL2Updater()) + val stepSize = Array(1.0, 0.1) + val numIterations = Array(10) + val regParam = Array(0.0, 0.1, 1.0) + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0 +: features.toArray) + } + val numModels = stepSize.length * regParam.length + + val dataRDD = sc.parallelize(data, 2).cache() + + val forLoop = (0 until numModels).map { i => + val (weightsGD, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new LogisticGradient(), + new SimpleUpdater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(1.0 +: initialWeights.toArray.clone())) + (weightsGD, loss) + } + val forLoop2 = (0 until numModels).map { i => + val (weightsGD2, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new LogisticGradient(), + new SquaredL2Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(1.0 +: initialWeights.toArray.clone())) + (weightsGD2, loss) + } + + val res2 = Matrices.horzcat(forLoop.map(v => new DenseMatrix(v._1.size, 1, v._1.toArray)) ++ + forLoop2.map(v => new DenseMatrix(v._1.size, 1, v._1.toArray))) + + val (weightsMMGD, mmLoss) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + Vectors.dense(1.0 +: initialWeights.toArray)) + + assert(res2 ~== weightsMMGD absTol 1e-10) + + val gdLosses1 = forLoop.map(_._2.last) + val gdLosses2 = forLoop2.map(_._2.last) + val lastFromGD = Vectors.dense((gdLosses1 ++ gdLosses2).toArray[Double]) + + assert(lastFromGD ~== mmLoss.last absTol 1e-10) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X10000 + test("use sparse matrices instead of dense") { + val nPoints = 100 + + val denseRDD = sc.parallelize( + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42), 2) + val sparseRDD = denseRDD.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + (label, sv) + }.cache() + val gradient = new MultiModelLeastSquaresGradient() + val updater: Array[MultiModelUpdater] = Array(new MultiModelSquaredL2Updater()) + val stepSize = Array(1.0, 0.1) + val numIterations = Array(10) + val regParam = Array(0.0, 0.1, 1.0) + val miniBatchFrac = 1.0 + val initialWeights = Array.fill(10000)(0.0) + // Add a extra variable consisting of all 1.0's for the intercept. + + val numModels = stepSize.length * regParam.length + + val forLoop = (0 until numModels).map { i => + val (weightsGD, loss) = GradientDescent.runMiniBatchSGD( + sparseRDD, + new LeastSquaresGradient(), + new SquaredL2Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(initialWeights.clone())) + (weightsGD, loss) + } + + val res = Matrices.horzcat(forLoop.map(v => new DenseMatrix(v._1.size, 1, v._1.toArray))) + + val (weightsMMGD, mmLoss) = MultiModelGradientDescent.runMiniBatchMMSGD( + sparseRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + Vectors.dense(initialWeights)) + + assert(res ~== weightsMMGD absTol 1e-10) + + val gdLosses1 = forLoop.map(_._2.last) + val lastFromGD = Vectors.dense(gdLosses1.toArray[Double]) + + assert(lastFromGD ~== mmLoss.last absTol 1e-10) + } + + test("Check for correctness: LeastSquaresRegression-SquaredL2Updater & multiple numIterations") { + val nPoints = 100 + val numFeatures = 5 + + val initialWeights = Matrices.zeros(numFeatures, 1).toArray + + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + val w = Matrices.rand(numFeatures, 1) -= 0.5 + + // Use half of data for training and other half for validation + val data = LinearDataGenerator.generateLinearInput(0.0, w.toArray, nPoints, 42, 10.0) + + val gradient = new MultiModelLeastSquaresGradient() + val updater: Array[MultiModelUpdater] = Array(new MultiModelSquaredL2Updater()) + val stepSize = Array(1.0, 0.1) + val numIterations = Array(10, 20) + val regParam = Array(0.0, 0.1, 1.0) + val miniBatchFrac = 1.0 + + val dataRDD = sc.parallelize(data, 2).map( p => (p.label, p.features)).cache() + val numModels = stepSize.length * regParam.length + + val forLoop = (0 until numModels).map { i => + val (weightsGD2, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new LeastSquaresGradient(), + new SquaredL2Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(initialWeights.clone())) + (weightsGD2, loss) + } + val forLoop2 = (0 until numModels).map { i => + val (weightsGD2, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new LeastSquaresGradient(), + new SquaredL2Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(1), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(initialWeights.clone())) + (weightsGD2, loss) + } + val res = Matrices.horzcat(forLoop.map( v => new DenseMatrix(v._1.size, 1, v._1.toArray)) ++ + forLoop2.map( v => new DenseMatrix(v._1.size, 1, v._1.toArray))) + + val (weightsMMGD, mmLoss) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + Vectors.dense(initialWeights)) + + assert(res ~== weightsMMGD absTol 1e-10) + + val gdLosses1 = forLoop.map(_._2.last) + val gdLosses2 = forLoop2.map(_._2.last) + val lastFromGD = Vectors.dense((gdLosses1 ++ gdLosses2).toArray) + + val mmLossTogether = Vectors.dense(mmLoss(numIterations(0) - 1).toArray ++ + mmLoss(numIterations(1) - 1).toArray) + + assert(lastFromGD ~== mmLossTogether absTol 1e-10) + } + + test("Check for correctness: SVM-(L1Updater+SquaredL2Updater)") { + val nPoints = 100 + + val initialWeights = Array(1.0, 0.0, 0.0) + + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = MultiModelGradientDescentSuite. + generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0 +: features.toArray) + } + + val gradient = new MultiModelHingeGradient() + val updater: Array[MultiModelUpdater] = + Array(new MultiModelL1Updater, new MultiModelSquaredL2Updater) + val stepSize = Array(1.0, 0.1) + val numIterations = Array(10) + val regParam = Array(0.0, 0.1) + val miniBatchFrac = 1.0 + + val dataRDD = sc.parallelize(data, 2).cache() + val numModels = stepSize.length * regParam.length + + val forLoop1 = (0 until numModels).map { i => + val (weightsGD2, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new HingeGradient(), + new L1Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(initialWeights.clone())) + (weightsGD2, loss) + } + val forLoop2 = (0 until numModels).map { i => + val (weightsGD2, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + new HingeGradient(), + new SquaredL2Updater(), + stepSize(math.round(i * 1.0 / numModels).toInt), + numIterations(0), + regParam(i % regParam.length), + miniBatchFrac, + Vectors.dense(initialWeights.clone())) + (weightsGD2, loss) + } + + val res = Matrices.horzcat(forLoop1.map( v => new DenseMatrix(v._1.size, 1, v._1.toArray)) ++ + forLoop2.map( v => new DenseMatrix(v._1.size, 1, v._1.toArray))) + + val (weightsMMGD, mmLoss) = MultiModelGradientDescent.runMiniBatchMMSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + Vectors.dense(initialWeights)) + + assert(res ~== weightsMMGD absTol 1e-10) + + val gdLosses1 = forLoop1.map(_._2.last) + val gdLosses2 = forLoop2.map(_._2.last) + val lastFromGD = Vectors.dense((gdLosses1 ++ gdLosses2).toArray[Double]) + + assert(lastFromGD ~== mmLoss.last absTol 1e-10) + } +} + +class MultiModelGradientDescentClusterSuite extends FunSuite with LocalClusterSparkContext { + + test("task size should be small") { + val m = 4 + val n = 200000 + val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => + val random = new Random(idx) + iter.map(i => (1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) + }.cache() + // If we serialize data directly in the task closure, the size of the serialized task would be + // greater than 1MB and hence Spark would throw an error. + val (weights, loss) = MultiModelGradientDescent.runMiniBatchMMSGD( + points, + new MultiModelLogisticGradient, + Array(new MultiModelSquaredL2Updater), + Array(0.1), + Array(2), + Array(1.0), + 1.0, + Vectors.dense(new Array[Double](n)), + useSparse = true) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala index 29cc42d8cbea..30b906aaa3ba 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.util -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.scalatest.exceptions.TestFailedException object TestingUtils { @@ -169,4 +169,67 @@ object TestingUtils { override def toString = x.toString } + case class CompareMatrixRightSide( + fun: (Matrix, Matrix, Double) => Boolean, y: Matrix, eps: Double, method: String) + + /** + * Implicit class for comparing two matrices using relative tolerance or absolute tolerance. + */ + implicit class MatrixWithAlmostEquals(val x: Matrix) { + + /** + * When the difference of two vectors are within eps, returns true; otherwise, returns false. + */ + def ~=(r: CompareMatrixRightSide): Boolean = r.fun(x, r.y, r.eps) + + /** + * When the difference of two vectors are within eps, returns false; otherwise, returns true. + */ + def !~=(r: CompareMatrixRightSide): Boolean = !r.fun(x, r.y, r.eps) + + /** + * Throws exception when the difference of two vectors are NOT within eps; + * otherwise, returns true. + */ + def ~==(r: CompareMatrixRightSide): Boolean = { + if (!r.fun(x, r.y, r.eps)) { + throw new TestFailedException( + s"Expected \n$x\n and \n${r.y}\n to be within ${r.eps}${r.method} for all elements.", 0) + } + true + } + + /** + * Throws exception when the difference of two matrices are within eps; otherwise, returns true. + */ + def !~==(r: CompareMatrixRightSide): Boolean = { + if (r.fun(x, r.y, r.eps)) { + throw new TestFailedException( + s"Did not expect \n$x\n and \n${r.y}\n to be within " + + "${r.eps}${r.method} for all elements.", 0) + } + true + } + + /** + * Comparison using absolute tolerance. + */ + def absTol(eps: Double): CompareMatrixRightSide = CompareMatrixRightSide( + (x: Matrix, y: Matrix, eps: Double) => { + x.toArray.zip(y.toArray).forall(x => x._1 ~= x._2 absTol eps) + }, x, eps, ABS_TOL_MSG) + + /** + * Comparison using relative tolerance. Note that comparing against sparse vector + * with elements having value of zero will raise exception because it involves with + * comparing against zero. + */ + def relTol(eps: Double): CompareMatrixRightSide = CompareMatrixRightSide( + (x: Matrix, y: Matrix, eps: Double) => { + x.toArray.zip(y.toArray).forall(x => x._1 ~= x._2 relTol eps) + }, x, eps, REL_TOL_MSG) + + override def toString = x.toString + } + } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 300589394b96..c0fa7d13e440 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -99,6 +99,8 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures") ) ++ + MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++ + MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++ MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++