Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,184 @@ class LinearRegression(override val uid: String)
override def copy(extra: ParamMap): LinearRegression = defaultCopy(extra)
}

/**
* :: Experimental ::
* Robust regression.
*
* The learning objective is to minimize the HuberCostFun, with regularization.
* The specific squared error loss function used is:
*/
@Experimental
class RobustRegression(override val uid: String)
extends Regressor[Vector, RobustRegression, LinearRegressionModel]
with LinearRegressionParams with Logging {

def this() = this(Identifiable.randomUID("linReg"))

/**
* Set the regularization parameter.
* Default is 0.0.
* @group setParam
*/
def setRegParam(value: Double): this.type = set(regParam, value)
setDefault(regParam -> 0.0)

/**
* Set if we should fit the intercept
* Default is true.
* @group setParam
*/
def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
setDefault(fitIntercept -> true)

/**
* Set the ElasticNet mixing parameter.
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
* For 0 < alpha < 1, the penalty is a combination of L1 and L2.
* Default is 0.0 which is an L2 penalty.
* @group setParam
*/
def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value)
setDefault(elasticNetParam -> 0.0)

/**
* Set the maximum number of iterations.
* Default is 100.
* @group setParam
*/
def setMaxIter(value: Int): this.type = set(maxIter, value)
setDefault(maxIter -> 100)

/**
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-6.
* @group setParam
*/
def setTol(value: Double): this.type = set(tol, value)
setDefault(tol -> 1E-6)

override protected def train(dataset: DataFrame): LinearRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist instances.
val instances = extractLabeledPoints(dataset).map {
case LabeledPoint(label: Double, features: Vector) => (label, features)
}
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val (summarizer, statCounter) = instances.treeAggregate(
(new MultivariateOnlineSummarizer, new StatCounter))(
seqOp = (c, v) => (c, v) match {
case ((summarizer: MultivariateOnlineSummarizer, statCounter: StatCounter),
(label: Double, features: Vector)) =>
(summarizer.add(features), statCounter.merge(label))
},
combOp = (c1, c2) => (c1, c2) match {
case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: StatCounter),
(summarizer2: MultivariateOnlineSummarizer, statCounter2: StatCounter)) =>
(summarizer1.merge(summarizer2), statCounter1.merge(statCounter2))
})

val numFeatures = summarizer.mean.size
val yMean = statCounter.mean
val yStd = math.sqrt(statCounter.variance)

// If the yStd is zero, then the intercept is yMean with zero weights;
// as a result, training is not needed.
if (yStd == 0.0) {
logWarning(s"The standard deviation of the label is zero, so the weights will be zeros " +
s"and the intercept will be the mean of the label; as a result, training is not needed.")
if (handlePersistence) instances.unpersist()
val weights = Vectors.sparse(numFeatures, Seq())
val intercept = yMean

val model = new LinearRegressionModel(uid, weights, intercept)
val trainingSummary = new LinearRegressionTrainingSummary(
model.transform(dataset).select($(predictionCol), $(labelCol)),
$(predictionCol),
$(labelCol),
Array(0D))
return copyValues(model.setSummary(trainingSummary))
}

val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.variance.toArray.map(math.sqrt)

// Since we implicitly do the feature scaling when we compute the cost function
// to improve the convergence, the effective regParam will be changed.
val effectiveRegParam = $(regParam) / yStd
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam

val costFun = new HuberCostFun(instances, yStd, yMean, $(fitIntercept),
featuresStd, featuresMean, effectiveL2RegParam)

val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
} else {
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, effectiveL1RegParam, $(tol))
}

val initialWeights = Vectors.zeros(numFeatures)
val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialWeights.toBreeze.toDenseVector)

val (weights, objectiveHistory) = {
/*
Note that in Linear Regression, the objective history (loss + regularization) returned
from optimizer is computed in the scaled space given by the following formula.
{{{
L = 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 + regTerms
}}}
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
logError(msg)
throw new SparkException(msg)
}

/*
The weights are trained in the scaled space; we're converting them back to
the original space.
*/
val rawWeights = state.x.toArray.clone()
var i = 0
val len = rawWeights.length
while (i < len) {
rawWeights(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 }
i += 1
}

(Vectors.dense(rawWeights).compressed, arrayBuilder.result())
}

/*
The intercept in R's GLMNET is computed using closed form after the coefficients are
converged. See the following discussion for detail.
http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
*/
val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0

if (handlePersistence) instances.unpersist()

val model = copyValues(new LinearRegressionModel(uid, weights, intercept))
val trainingSummary = new LinearRegressionTrainingSummary(
model.transform(dataset).select($(predictionCol), $(labelCol)),
$(predictionCol),
$(labelCol),
objectiveHistory)
model.setSummary(trainingSummary)
}

override def copy(extra: ParamMap): RobustRegression = defaultCopy(extra)
}

/**
* :: Experimental ::
* Model produced by [[LinearRegression]].
Expand Down Expand Up @@ -591,3 +769,56 @@ private class LeastSquaresCostFun(
(loss, gradient.toBreeze.asInstanceOf[BDV[Double]])
}
}

/**
* HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used in Robust regression.
* The Huber M-estimator corresponds to a probability distribution for the errors which is normal
* in the centre but like a double exponential distribution in the tails (Hogg 1979: 109).
* L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
* substantial resistance to outliers otherwise.
* See also the documentation for the precise formulation.
* It's used in Breeze's convex optimization routines.
*/
private class HuberCostFun(
data: RDD[(Double, Vector)],
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {

override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
val w = Vectors.fromBreeze(weights)

val leastSquaresAggregator = data.treeAggregate(new LeastSquaresAggregator(w, labelStd,
labelMean, fitIntercept, featuresStd, featuresMean))(
seqOp = (c, v) => (c, v) match {
case (aggregator, (label, features)) => aggregator.add(label, features)
},
combOp = (c1, c2) => (c1, c2) match {
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

val k = 1.345
val bcW = data.context.broadcast(w)
val diff = dot(bcW.value, w) - labelMean
val norm = brzNorm(weights, 2.0)
var regVal = 0.0
if(diff < -k){
regVal = -k * 0.5 * effectiveL2regParam * diff - 0.5 * k * k
} else if (diff >= -k && diff <= k){
regVal = 0.25 * effectiveL2regParam * norm * norm
} else {
regVal = k * 0.5 * effectiveL2regParam * diff - 0.5 * k * k
}

val loss = leastSquaresAggregator.loss + regVal
val gradient = leastSquaresAggregator.gradient
axpy(effectiveL2regParam, w, gradient)

(loss, gradient.toBreeze.asInstanceOf[BDV[Double]])
}
}
Loading