Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
values
}

private[ml] def getTransformFunc(
private[spark] def getTransformFunc(
shift: Array[Double],
scale: Array[Double],
withShift: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._

/**
* HuberAggregator computes the gradient and loss for a huber loss function,
Expand Down Expand Up @@ -74,15 +74,12 @@ private[ml] class HuberAggregator(
extends DifferentiableLossAggregator[Instance, HuberAggregator] {

protected override val dim: Int = bcParameters.value.size
private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1
private val sigma: Double = bcParameters.value(dim - 1)
private val intercept: Double = if (fitIntercept) {
bcParameters.value(dim - 2)
} else {
0.0
}
private val numFeatures = if (fitIntercept) dim - 2 else dim - 1
private val sigma = bcParameters.value(dim - 1)
private val intercept = if (fitIntercept) bcParameters.value(dim - 2) else 0.0

// make transient so we do not serialize between aggregation stages
@transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures)
@transient private lazy val coefficients = bcParameters.value.toArray.take(numFeatures)

/**
* Add a new training instance to this HuberAggregator, and update the loss and gradient
Expand Down Expand Up @@ -150,3 +147,101 @@ private[ml] class HuberAggregator(
}
}
}


/**
* BlockHuberAggregator computes the gradient and loss for Huber loss function
* as used in linear regression for blocks in sparse or dense matrix in an online fashion.
*
* Two BlockHuberAggregators can be merged together to have a summary of loss and gradient
* of the corresponding joint dataset.
*
* NOTE: The feature values are expected to be standardized before computation.
*
* @param fitIntercept Whether to fit an intercept term.
*/
private[ml] class BlockHuberAggregator(
fitIntercept: Boolean,
epsilon: Double)(bcParameters: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, BlockHuberAggregator] {

protected override val dim: Int = bcParameters.value.size
private val numFeatures = if (fitIntercept) dim - 2 else dim - 1
private val sigma = bcParameters.value(dim - 1)
private val intercept = if (fitIntercept) bcParameters.value(dim - 2) else 0.0
// make transient so we do not serialize between aggregation stages
@transient private lazy val linear = Vectors.dense(bcParameters.value.toArray.take(numFeatures))

/**
* Add a new training instance block to this BlockHuberAggregator, and update the loss and
* gradient of the objective function.
*
* @param block The instance block of data point to be added.
* @return This BlockHuberAggregator object.
*/
def add(block: InstanceBlock): BlockHuberAggregator = {
require(block.matrix.isTransposed)
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")

if (block.weightIter.forall(_ == 0)) return this
val size = block.size

// vec here represents margins or dotProducts
val vec = if (fitIntercept) {
Vectors.dense(Array.fill(size)(intercept)).toDense
} else {
Vectors.zeros(size).toDense
}
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)

// in-place convert margins to multipliers
// then, vec represents multipliers
var sigmaGradSum = 0.0
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)
val margin = vec(i)
val linearLoss = label - margin

if (math.abs(linearLoss) <= sigma * epsilon) {
lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma)
val linearLossDivSigma = linearLoss / sigma
val multiplier = -1.0 * weight * linearLossDivSigma
vec.values(i) = multiplier
sigmaGradSum += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0))
} else {
lossSum += 0.5 * weight *
(sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
val sign = if (linearLoss >= 0) -1.0 else 1.0
val multiplier = weight * sign * epsilon
vec.values(i) = multiplier
sigmaGradSum += 0.5 * weight * (1.0 - epsilon * epsilon)
}
} else { vec.values(i) = 0.0 }
i += 1
}

block.matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
vec.values, 1, 1.0, gradientSumArray, 1)

case sm: SparseMatrix =>
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
gradientSumArray, 1)
}

gradientSumArray(dim - 1) += sigmaGradSum
if (fitIntercept) gradientSumArray(dim - 2) += vec.values.sum

this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._

/**
* LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
Expand Down Expand Up @@ -222,3 +222,92 @@ private[ml] class LeastSquaresAggregator(
}
}
}


/**
* BlockLeastSquaresAggregator computes the gradient and loss for LeastSquares loss function
* as used in linear regression for blocks in sparse or dense matrix in an online fashion.
*
* Two BlockLeastSquaresAggregators can be merged together to have a summary of loss and gradient
* of the corresponding joint dataset.
*
* NOTE: The feature values are expected to be standardized before computation.
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
*/
private[ml] class BlockLeastSquaresAggregator(
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, BlockLeastSquaresAggregator] {
require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
s"deviation to be positive.")

private val numFeatures = bcFeaturesStd.value.length
protected override val dim: Int = numFeatures
// make transient so we do not serialize between aggregation stages
@transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
val featuresMean = bcFeaturesMean.value
val featuresStd = bcFeaturesStd.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
while (i < len) {
if (featuresStd(i) != 0.0) {
sum += coefficientsArray(i) / featuresStd(i) * featuresMean(i)
} else {
coefficientsArray(i) = 0.0
}
i += 1
}
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
(Vectors.dense(coefficientsArray), offset)
}
// do not use tuple assignment above because it will circumvent the @transient tag
@transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2

/**
* Add a new training instance block to this BlockLeastSquaresAggregator, and update the loss
* and gradient of the objective function.
*
* @param block The instance block of data point to be added.
* @return This BlockLeastSquaresAggregator object.
*/
def add(block: InstanceBlock): BlockLeastSquaresAggregator = {
require(block.matrix.isTransposed)
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")

if (block.weightIter.forall(_ == 0)) return this
val size = block.size

// vec here represents diffs
val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd))
BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec)

// in-place convert diffs to multipliers
// then, vec represents multipliers
var i = 0
while (i < size) {
val weight = block.getWeight(i)
val diff = vec(i)
lossSum += weight * diff * diff / 2
weightSum += weight
val multiplier = weight * diff
vec.values(i) = multiplier
i += 1
}

val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)

this
}
}
Loading