Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
465 changes: 464 additions & 1 deletion mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala

Large diffs are not rendered by default.

1,379 changes: 1,373 additions & 6 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Large diffs are not rendered by default.

82 changes: 80 additions & 2 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.SparkException
*
* Note: Users should not implement this interface.
*/
trait Vector extends Serializable {
sealed trait Vector extends Serializable {

/**
* Size of the vector.
Expand Down Expand Up @@ -72,6 +72,12 @@ trait Vector extends Serializable {
def copy: Vector = {
throw new NotImplementedError(s"copy is not implemented for ${this.getClass}.")
}

/** Maps all the values in the vector. Generates a new vector. */
private[mllib] def map(f: Double => Double): Vector

/** Updates all the values in the vector. In Place. */
private[mllib] def update(f: Double => Double): Vector
}

/**
Expand Down Expand Up @@ -186,6 +192,52 @@ object Vectors {
sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
}
}

/**
* Appends a sequence of vectors together. Preserves sparsity if and only if only vecs is composed
* of SparseVectors only.
* @param vecs sequence of vectors
* @return a single long `Vector` composed of the vectors that were appended to each other.
*/
private[mllib] def append(vecs: Seq[Vector]): Vector = {
if (vecs.size == 1) {
return vecs(0)
}
var isDense = false
var isSparse = false
var totalLength = 0
val lengths: Seq[(Int, Int)] = vecs.map {
case sparse: SparseVector =>
isSparse = true
totalLength += sparse.size
(totalLength, sparse.indices.length)
case dense: DenseVector =>
isDense = true
totalLength += dense.size
(totalLength, dense.size)
}
if (isSparse && !isDense) {
val allIndices: Seq[Int] = vecs.flatMap(_.asInstanceOf[SparseVector].indices)
var vectorCounter = 0
var elementCounter = 0 // The count inside the vector
val adjustedIndices = allIndices.map { p =>
// less than equal, in case Vector is empty
while (lengths(vectorCounter)._2 <= elementCounter + 1) {
vectorCounter += 1
elementCounter = -1
}
elementCounter += 1
if (elementCounter != 0) lengths(vectorCounter)._1 + p else lengths(vectorCounter - 1)._2
}.toArray
new SparseVector(totalLength, adjustedIndices,
vecs.flatMap(_.asInstanceOf[SparseMatrix].values).toArray)
} else if (!isSparse && !isDense) {
throw new IllegalArgumentException("The supplied vectors are neither in SparseVector or" +
" DenseVector format!")
} else {
new DenseVector(vecs.flatMap(_.toArray).toArray)
}
}
}

/**
Expand All @@ -206,6 +258,19 @@ class DenseVector(val values: Array[Double]) extends Vector {
override def copy: DenseVector = {
new DenseVector(values.clone())
}

private[mllib] def map(f: Double => Double): Vector = {
new DenseVector(values.map(f))
}

private[mllib] def update(f: Double => Double): Vector = {
var i = 0
while (i < size) {
values(i) = f(values(i))
i += 1
}
this
}
}

/**
Expand Down Expand Up @@ -241,4 +306,17 @@ class SparseVector(
}

private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size)
}

private[mllib] def map(f: Double => Double): Vector = {
new SparseVector(size, indices, values.map(f))
}

private[mllib] def update(f: Double => Double): Vector = {
var i = 0
while (i < size) {
values(i) = f(values(i))
i += 1
}
this
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline?

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.mllib.optimization

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, gemm, scal}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -157,3 +159,250 @@ class HingeGradient extends Gradient {
}
}
}

/**
* :: DeveloperApi ::
* Class used to compute the gradient for a loss function, given a series of data points.
*/
@DeveloperApi
abstract class MultiModelGradient extends Serializable {
/**
* Compute the gradient and loss given the features of all data points.
*
* @param data features for one data point
* @param label label for this data point
* @param weights weights/coefficients corresponding to features
*
* @return (gradient: DenseMatrix, loss: Double)
*/
def compute(data: Matrix, label: DenseMatrix,
weights: DenseMatrix): (DenseMatrix, Matrix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should fit on 1 line. Also check methods below


/**
* Compute the gradient and loss given the features of a series of data point,
* add the gradient to a provided matrix to avoid creating new objects, and return loss.
*
* @param data features for the data points
* @param label label for the data points
* @param weights weights/coefficients corresponding to features
* @param cumGradient the computed gradient will be added to this matrix
*
* @return loss
*/
def compute(data: Matrix, label: DenseMatrix,
weights: DenseMatrix, cumGradient: DenseMatrix): Matrix
}
/*
/**
* :: DeveloperApi ::
* Compute gradient and loss for a logistic loss function, as used in binary classification.
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class MultiModelLogisticGradient extends MultiModelGradient {

private def sigmoid(p: DenseMatrix): DenseMatrix = {
def takeSigmoid(p: Double): Double = {
1.0 / (math.exp(-p) + 1.0)
}
p.map(takeSigmoid)
}

override def compute(data: Matrix, label: DenseMatrix,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be implemented using the below compute method to avoid code duplication?

weights: DenseMatrix): (DenseMatrix, Vector) = {
val margin = data transposeMultiply weights
val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols)

gemm(false, false, 1.0, data, sigmoid(margin).elementWiseOperateOnColumnsInPlace(_ - _, label),
0.0, gradient)

val negativeLabels = label.compare(0.0, _ == _)
val addMargin = margin.elementWiseOperateOnColumns(_ * _, negativeLabels)

val loss = margin.update(v => math.log1p(math.exp(-v))).
elementWiseOperateInPlace(_ + _, addMargin)

val lossVector =
if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
(gradient, lossVector)
}

override def compute(data: Matrix,
label: DenseMatrix,
weights: DenseMatrix,
cumGradient: DenseMatrix): Vector = {
val margin = data transposeMultiply weights
gemm(false, false, 1.0, data, sigmoid(margin).elementWiseOperateOnColumnsInPlace(_ - _, label),
1.0, cumGradient)

val negativeLabels = label.compare(0.0, _ == _)
val addMargin = margin.elementWiseOperateOnColumns(_ * _, negativeLabels)

val loss = margin.update(v => math.log1p(math.exp(-v))).
elementWiseOperateInPlace(_ + _, addMargin)

if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really worthwhile? Computation is still linear in the size of the data, and the computation for colSums is pretty light.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies elsewhere too, but I won't repeat the comment.

loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
}
}

/**
* :: DeveloperApi ::
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
* L = 1/n ||A weights-y||^2
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class MultiModelLeastSquaresGradient extends MultiModelGradient {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, we should rename this to SquaredError (not LeastSquares).

override def compute(data: Matrix, label: DenseMatrix,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line formatting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about computing in terms of below compute() method.

weights: DenseMatrix): (DenseMatrix, Vector) = {

val diff = (data transposeMultiply weights).elementWiseOperateOnColumnsInPlace(_ - _, label)

val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols)

gemm(false, false, 2.0, data, diff, 0.0, gradient)

val loss = diff.update(v => v * v)

val lossVector =
if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
(gradient, lossVector)
}

override def compute(data: Matrix,
label: DenseMatrix,
weights: DenseMatrix,
cumGradient: DenseMatrix): Vector = {
val diff = (data transposeMultiply weights).elementWiseOperateOnColumnsInPlace(_ - _, label)

gemm(false, false, 2.0, data, diff, 1.0, cumGradient)
val loss = diff.update(v => v * v)

if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
}
}
*/

/**
* :: DeveloperApi ::
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
* See also the documentation for the precise formulation.
* NOTE: This assumes that the labels are {0,1}
*/
@DeveloperApi
class MultiModelHingeGradient extends MultiModelGradient {
override def compute(data: Matrix, label: DenseMatrix,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about implementing this in terms of the below compute() method.

weights: DenseMatrix): (DenseMatrix, Vector) = {

val dotProduct = (data transposeMultiply weights).toBreeze
val brzData = data.toBreeze
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val brzScaledLabels = new BDM[Double](1, label.numRows, label.values.map(_ * 2 - 1.0))

dotProduct *= brzScaledLabels
brzScaledLabels.
val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace)
val gradient = DenseMatrix.zeros(weights.numRows, weights.numCols)
val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active

gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, gradient)

val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _))

val lossVector =
if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
(gradient, lossVector)
}

override def compute(data: Matrix, label: DenseMatrix,
weights: DenseMatrix, cumGradient: DenseMatrix): Vector = {

val dotProduct = data transposeMultiply weights
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = new DenseMatrix(1, label.numRows, label.map(_ * 2 - 1.0).values)
dotProduct.elementWiseOperateOnColumnsInPlace(_ * _, labelScaled)

val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace)

val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active

gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, cumGradient)

val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _))

if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
}
}
/*
override def compute(data: Matrix, label: DenseMatrix,
weights: DenseMatrix, cumGradient: DenseMatrix): Vector = {

val dotProduct = data transposeMultiply weights
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = new DenseMatrix(1, label.numRows, label.map(_ * 2 - 1.0).values)
dotProduct.elementWiseOperateOnColumnsInPlace(_ * _, labelScaled)

val gradientMultiplier = data.elementWiseOperateOnRows(_ * _, labelScaled.negInPlace)

val activeExamples = dotProduct.compare(1.0, _ < _) // Examples where the hinge is active

gemm(false, false, 1.0, gradientMultiplier, activeExamples, 1.0, cumGradient)

val loss = activeExamples.elementWiseOperateInPlace(_ * _, dotProduct.update(1 - _))

if (data.isInstanceOf[DenseMatrix]) {
val numFeatures = data.numRows
val zeroEntries = data.compare(0.0, _ == _)
val shouldSkip = zeroEntries.colSums.compareInPlace(numFeatures, _ == _)
loss.colSums(false, shouldSkip)
} else {
loss.colSums
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._
* @param updater Updater to be used to update weights after every iteration.
*/
class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging {
extends Optimizer[Vector] with Logging {

private var stepSize: Double = 1.0
private var numIterations: Int = 100
Expand Down Expand Up @@ -181,6 +181,7 @@ object GradientDescent extends Logging {
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

//println(s"initial:\n$weights\n\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
Expand Down
Loading