diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala
new file mode 100644
index 0000000000000..ed1ffb643cc58
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.regression.BiweightRobustRegressionWithSGD
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}
+
+/**
+ * An example app for Biweight Robust regression. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.BiweightRobustRegression
+ * }}}
+ * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app
+ */
+object BiweightRobustRegression extends App {
+
+ object RegType extends Enumeration {
+ type RegType = Value
+ val NONE, L1, L2 = Value
+ }
+
+ import RegType._
+
+ case class Params(
+ input: String = null,
+ numIterations: Int = 5000,
+ stepSize: Double = 1.0,
+ regType: RegType = L2,
+ regParam: Double = 0.1)
+
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("BiweightRobustRegression") {
+ head("BiweightRobustRegression: an example app for Biweight M-Estimation Robust regression.")
+ opt[Int]("numIterations")
+ .text("number of iterations")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[Double]("stepSize")
+ .text(s"initial step size, default: ${defaultParams.stepSize}")
+ .action((x, c) => c.copy(stepSize = x))
+ opt[String]("regType")
+ .text(s"regularization type (${RegType.values.mkString(",")}), " +
+ s"default: ${defaultParams.regType}")
+ .action((x, c) => c.copy(regType = RegType.withName(x)))
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ arg[String]("")
+ .required()
+ .text("input paths to labeled examples in LIBSVM format")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app on a synthetic dataset:
+ |
+ | bin/spark-submit --class org.apache.spark.examples.mllib.BiweightRobustRegression \
+ | examples/target/scala-*/spark-examples-*.jar \
+ | data/mllib/sample_linear_regression_data.txt
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"BiweightRobustRegression with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+ println(s"Training: $numTraining, test: $numTest.")
+
+ examples.unpersist(blocking = false)
+
+ val updater = params.regType match {
+ case NONE => new SimpleUpdater()
+ case L1 => new L1Updater()
+ case L2 => new SquaredL2Updater()
+ }
+
+ val algorithm = new BiweightRobustRegressionWithSGD()
+ algorithm.optimizer
+ .setNumIterations(params.numIterations)
+ .setStepSize(params.stepSize)
+ .setUpdater(updater)
+ .setRegParam(params.regParam)
+
+ val model = algorithm.run(training)
+
+ val prediction = model.predict(test.map(_.features))
+ val predictionAndLabel = prediction.zip(test.map(_.label))
+
+ val loss = predictionAndLabel.map { case (p, l) =>
+ val err = p - l
+ err * err
+ }.reduce(_ + _)
+ val rmse = math.sqrt(loss / numTest)
+
+ println(s"Test RMSE = $rmse.")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala
new file mode 100644
index 0000000000000..6d7c4520202fc
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.regression.HuberRobustRegressionWithSGD
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}
+
+/**
+ * An example app for Huber Robust regression. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression
+ * }}}
+ * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object HuberRobustRegression extends App {
+
+ object RegType extends Enumeration {
+ type RegType = Value
+ val NONE, L1, L2 = Value
+ }
+
+ import RegType._
+
+ case class Params(
+ input: String = null,
+ numIterations: Int = 100,
+ stepSize: Double = 1.0,
+ regType: RegType = L2,
+ regParam: Double = 0.1)
+
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("HuberRobustRegression") {
+ head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.")
+ opt[Int]("numIterations")
+ .text("number of iterations")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[Double]("stepSize")
+ .text(s"initial step size, default: ${defaultParams.stepSize}")
+ .action((x, c) => c.copy(stepSize = x))
+ opt[String]("regType")
+ .text(s"regularization type (${RegType.values.mkString(",")}), " +
+ s"default: ${defaultParams.regType}")
+ .action((x, c) => c.copy(regType = RegType.withName(x)))
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ arg[String]("")
+ .required()
+ .text("input paths to labeled examples in LIBSVM format")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app on a synthetic dataset:
+ |
+ | bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \
+ | examples/target/scala-*/spark-examples-*.jar \
+ | data/mllib/sample_linear_regression_data.txt
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+ println(s"Training: $numTraining, test: $numTest.")
+
+ examples.unpersist(blocking = false)
+
+ val updater = params.regType match {
+ case NONE => new SimpleUpdater()
+ case L1 => new L1Updater()
+ case L2 => new SquaredL2Updater()
+ }
+
+ val algorithm = new HuberRobustRegressionWithSGD()
+ algorithm.optimizer
+ .setNumIterations(params.numIterations)
+ .setStepSize(params.stepSize)
+ .setUpdater(updater)
+ .setRegParam(params.regParam)
+
+ val model = algorithm.run(training)
+
+ val prediction = model.predict(test.map(_.features))
+ val predictionAndLabel = prediction.zip(test.map(_.label))
+
+ val loss = predictionAndLabel.map { case (p, l) =>
+ val err = p - l
+ err * err
+ }.reduce(_ + _)
+ val rmse = math.sqrt(loss / numTest)
+
+ println(s"Test RMSE = $rmse.")
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index fdd67160114ca..0cb090155733d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal}
+import scala.math.pow
/**
* :: DeveloperApi ::
@@ -118,6 +119,114 @@ class LeastSquaresGradient extends Gradient {
}
}
+/**
+ * :: DeveloperApi ::
+ * Compute gradient and loss for Huber objective function, as used in robust regression.
+ * The Huber M-estimator corresponds to a probability distribution for the errors which is normal
+ * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109).
+ * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
+ * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
+ * where k = 1.345 which produce 95% efficiency when the errors are normal and
+ * substantial resistance to outliers otherwise.
+ * See also the documentation for the precise formulation.
+ */
+@DeveloperApi
+class HuberRobustGradient extends Gradient {
+ override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+ val diff = dot(data, weights) - label
+ val loss = diff * diff
+ val gradient = data.copy
+ val k = 1.345
+ /**
+ * Tuning constant is generally picked to give reasonably high efficiency in the normal case.
+ * Smaller values produce more resistance to outliers while at the expense of
+ * lower efficiency when the errors are normally distributed.
+ */
+ if(diff < -k){
+ scal(-k, gradient)
+ (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2)))
+ }else if(diff >= -k && diff <= k){
+ scal(diff, gradient)
+ (gradient, (1.0 / 2.0 * loss))
+ }else {
+ scal(k, gradient)
+ (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2)))
+ }
+ }
+
+ override def compute(
+ data: Vector,
+ label: Double,
+ weights: Vector,
+ cumGradient: Vector): Double = {
+ val diff = dot(data, weights) - label
+ val loss = diff * diff
+ val k = 1.345
+ if(diff < -k){
+ axpy(-k, data, cumGradient)
+ }else if(diff >= -k && diff <= k){
+ axpy(diff, data, cumGradient)
+ }else {
+ axpy(k, data, cumGradient)
+ }
+ if(diff < -k){
+ -k * diff - 1.0 / 2.0 * pow(k, 2)
+ }else if(diff >= -k && diff <= k){
+ 1.0 / 2.0 * loss
+ }else {
+ k * diff - 1.0 /2.0 * pow(k, 2)
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Compute gradient and loss for Tukey bisquare weight function, as used in robust regression.
+ * The biweight function produces and M-estimator that is more resistant to regression
+ * outliers than the Huber M-estimator (Andersen 2008: 19). Especially on the extreme tails.
+ * L = k^2 / 6 * (1 - (1 - ||A weights-y||^2 / k^2)^3) if |A weights-y| <= k
+ * L = k^2 / 6 if |A weights-y| > k
+ * where k = 4.685 which produce 95% efficiency when the errors are normal and
+ * substantial resistance to outliers otherwise.
+ * See also the documentation for the precise formulation.
+ */
+@DeveloperApi
+class BiweightRobustGradient extends Gradient {
+ override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+ val diff = dot(data, weights) - label
+ val loss = diff * diff
+ val k = 4.685
+ /**
+ * Tuning constant is generally picked to give reasonably high efficiency in the normal case.
+ * Smaller values produce more resistance to outliers while at the expense of
+ * lower efficiency when the errors are normally distributed.
+ */
+ if(diff >= -k && diff <= k){
+ val gradient = data.copy
+ scal(pow((1 - loss / pow(k, 2)), 2) * diff, gradient)
+ (gradient, (pow(k, 2) / 6.0 * (1 - pow((1 - loss / pow(k, 2)), 3))))
+ }else {
+ (Vectors.sparse(weights.size, Array.empty, Array.empty), (pow(k, 2) / 6.0))
+ }
+ }
+
+ override def compute(
+ data: Vector,
+ label: Double,
+ weights: Vector,
+ cumGradient: Vector): Double = {
+ val diff = dot(data, weights) - label
+ val loss = diff * diff
+ val k = 4.685
+ if(diff >= -k && diff <= k){
+ axpy((pow((1.0 - loss / pow(k, 2)), 2) * diff), data, cumGradient)
+ pow(k, 2) / 6.0 * (1.0 - pow((1.0 - loss / pow(k, 2)), 3))
+ }else {
+ pow(k, 2) / 6.0
+ }
+ }
+}
+
/**
* :: DeveloperApi ::
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala
new file mode 100644
index 0000000000000..061b6da75c6c7
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.optimization._
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Regression model trained using Huber M-Estimation RobustRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
+ */
+class HuberRobustRegressionModel (
+ override val weights: Vector,
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
+
+ override protected def predictPoint(
+ dataMatrix: Vector,
+ weightMatrix: Vector,
+ intercept: Double): Double = {
+ weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+ }
+}
+
+/**
+ * Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent.
+ * This solves the Huber objective function
+ * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k
+ * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
+ * where k = 1.345 which produce 95% efficiency when the errors are normal and
+ * substantial resistance to outliers otherwise.
+ * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
+ * its corresponding right hand side label y.
+ * See also the documentation for the precise formulation.
+ */
+class HuberRobustRegressionWithSGD private[mllib] (
+ private var stepSize: Double,
+ private var numIterations: Int,
+ private var miniBatchFraction: Double)
+ extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable {
+
+ private val gradient = new HuberRobustGradient()
+ private val updater = new SimpleUpdater()
+ override val optimizer = new GradientDescent(gradient, updater)
+ .setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ /**
+ * Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0,
+ * numIterations: 100, miniBatchFraction: 1.0}.
+ */
+ def this() = this(1.0, 100, 1.0)
+
+ override protected def createModel(weights: Vector, intercept: Double) = {
+ new HuberRobustRegressionModel(weights, intercept)
+ }
+}
+
+/**
+ * Top-level methods for calling HuberRobustRegression.
+ */
+object HuberRobustRegressionWithSGD {
+
+ /**
+ * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
+ * in gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
+ initialWeights: Vector): HuberRobustRegressionModel = {
+ new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
+ .run(input, initialWeights)
+ }
+
+ /**
+ * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double): HuberRobustRegressionModel = {
+ new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
+ }
+
+ /**
+ * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. We use the entire
+ * data set to compute the true gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a HuberRobustRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double): HuberRobustRegressionModel = {
+ train(input, numIterations, stepSize, 1.0)
+ }
+
+ /**
+ * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using a step size of 1.0. We use the entire data
+ * set to compute the true gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a HuberRobustRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int): HuberRobustRegressionModel = {
+ train(input, numIterations, 1.0, 1.0)
+ }
+}
+
+/**
+ * Regression model trained using Tukey bisquare (Biweight) M-Estimation RobustRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
+ */
+class BiweightRobustRegressionModel (
+ override val weights: Vector,
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
+
+ override protected def predictPoint(
+ dataMatrix: Vector,
+ weightMatrix: Vector,
+ intercept: Double): Double = {
+ weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+ }
+}
+
+/**
+ * Train a Biweight Robust regression model with no regularization using SGD.
+ * This solves the Biweight objective function
+ * L = k^2 / 6 * (1 - (1 - ||A weights-y||^2 / k^2)^3) if |A weights-y| <= k
+ * L = k^2 / 6 if |A weights-y| > k
+ * where k = 4.685 which produce 95% efficiency when the errors are normal and
+ * substantial resistance to outliers otherwise.
+ * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
+ * its corresponding right hand side label y.
+ * See also the documentation for the precise formulation.
+ */
+class BiweightRobustRegressionWithSGD private[mllib] (
+ private var stepSize: Double,
+ private var numIterations: Int,
+ private var miniBatchFraction: Double)
+ extends GeneralizedLinearAlgorithm[BiweightRobustRegressionModel] with Serializable {
+
+ private val gradient = new BiweightRobustGradient()
+ private val updater = new SimpleUpdater()
+ override val optimizer = new GradientDescent(gradient, updater)
+ .setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ /**
+ * Construct a Biweight M-Estimation RobustRegression object with default parameters:
+ * {stepSize: 1.0, numIterations: 5000, miniBatchFraction: 1.0}.
+ */
+ def this() = this(1.0, 5000, 1.0)
+
+ override protected def createModel(weights: Vector, intercept: Double) = {
+ new BiweightRobustRegressionModel(weights, intercept)
+ }
+}
+
+/**
+ * Top-level methods for calling BiweightRobustRegression.
+ */
+object BiweightRobustRegressionWithSGD {
+
+ /**
+ * Train a BiweightRobust Regression model given an RDD of (label, features) pairs. We run a
+ * fixed number of iterations of gradient descent using the specified step size. Each iteration
+ * uses `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights
+ * used in gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
+ initialWeights: Vector): BiweightRobustRegressionModel = {
+ new BiweightRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
+ .run(input, initialWeights)
+ }
+
+ /**
+ * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double): BiweightRobustRegressionModel = {
+ new BiweightRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
+ }
+
+ /**
+ * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. We use the entire
+ * data set to compute the true gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a BiweightRobustRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double): BiweightRobustRegressionModel = {
+ train(input, numIterations, stepSize, 1.0)
+ }
+
+ /**
+ * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed
+ * number of iterations of gradient descent using a step size of 1.0. We use the entire data
+ * set to compute the true gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
+ * matrix A as well as the corresponding right hand side label y
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a BiweightRobustRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int): BiweightRobustRegressionModel = {
+ train(input, numIterations, 1.0, 1.0)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala
new file mode 100644
index 0000000000000..3362c5abae17c
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator,
+ LocalSparkContext}
+
+class RobustRegressionSuite extends FunSuite with LocalSparkContext {
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected.label) > 0.5
+ }
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2
+ test("Huber Robust regression") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new HuberRobustRegressionWithSGD().setIntercept(true)
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(testRDD)
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ // Test if we can correctly learn Y = 10*X1 + 10*X2
+ test("Huber Robust regression without intercept") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 0.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new HuberRobustRegressionWithSGD().setIntercept(false)
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(testRDD)
+
+ assert(model.intercept === 0.0)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 0.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ // Test if we can correctly learn Y = 10*X1 + 10*X10000
+ test("sparse Huber Robust regression without intercept") {
+ val denseRDD = sc.parallelize(
+ LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2)
+ val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }.cache()
+ val linReg = new HuberRobustRegressionWithSGD().setIntercept(false)
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(sparseRDD)
+
+ assert(model.intercept === 0.0)
+
+ val weights = model.weights
+ assert(weights.size === 10000)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
+ val sparseValidationData = validationData.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }
+ val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(
+ model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData)
+
+ // Test prediction on Array.
+ validatePrediction(
+ sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
+ }
+
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2
+ test("Biweight Robust regression") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new BiweightRobustRegressionWithSGD().setIntercept(true)
+ linReg.optimizer.setNumIterations(3000).setStepSize(1.0) //Default numIterations: 5000
+
+ val model = linReg.run(testRDD)
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ // Test if we can correctly learn Y = 10*X1 + 10*X2
+ test("Biweight Robust regression without intercept") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 0.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false)
+ linReg.optimizer.setNumIterations(4000).setStepSize(1.0) //Default numIterations: 5000
+
+ val model = linReg.run(testRDD)
+
+ assert(model.intercept === 0.0)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 0.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ // Test if we can correctly learn Y = 10*X1 + 10*X10000
+ test("sparse Biweight Robust regression without intercept") {
+ val denseRDD = sc.parallelize(
+ LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2)
+ val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }.cache()
+ val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false)
+ linReg.optimizer.setNumIterations(4000).setStepSize(1.0) //Default numIterations: 5000
+
+ val model = linReg.run(sparseRDD)
+
+ assert(model.intercept === 0.0)
+
+ val weights = model.weights
+ assert(weights.size === 10000)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+
+ val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
+ val sparseValidationData = validationData.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }
+ val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(
+ model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData)
+
+ // Test prediction on Array.
+ validatePrediction(
+ sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
+ }
+}