diff --git a/docs/mllib-isotonic-regression.md b/docs/mllib-isotonic-regression.md index 95be32a819e8a..711e828bd809e 100644 --- a/docs/mllib-isotonic-regression.md +++ b/docs/mllib-isotonic-regression.md @@ -43,7 +43,14 @@ best fitting the original data points. which uses an approach to [parallelizing isotonic regression](https://doi.org/10.1007/978-3-642-99789-1_10). The training input is an RDD of tuples of three double values that represent -label, feature and weight in this order. Additionally, IsotonicRegression algorithm has one +label, feature and weight in this order. In case there are multiple tuples with +the same feature then these tuples are aggregated into a single tuple as follows: + +* Aggregated label is the weighted average of all labels. +* Aggregated feature is the unique feature value. +* Aggregated weight is the sum of all weights. + +Additionally, IsotonicRegression algorithm has one optional parameter called $isotonic$ defaulting to true. This argument specifies if the isotonic regression is isotonic (monotonically increasing) or antitonic (monotonically decreasing). @@ -53,17 +60,12 @@ labels for both known and unknown features. The result of isotonic regression is treated as piecewise linear function. The rules for prediction therefore are: * If the prediction input exactly matches a training feature - then associated prediction is returned. In case there are multiple predictions with the same - feature then one of them is returned. Which one is undefined - (same as java.util.Arrays.binarySearch). + then associated prediction is returned. * If the prediction input is lower or higher than all training features then prediction with lowest or highest feature is returned respectively. - In case there are multiple predictions with the same feature - then the lowest or highest is returned respectively. * If the prediction input falls between two training features then prediction is treated as piecewise linear function and interpolated value is calculated from the - predictions of the two closest features. In case there are multiple values - with the same feature then the same rules as in previous point are used. + predictions of the two closest features. ### Examples diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 0b2bf14750168..fbf0dc9c35788 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -23,7 +23,6 @@ import java.util.Arrays.binarySearch import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.commons.math3.util.Precision import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -272,8 +271,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input RDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. - * If multiple labels share the same feature value then they are ordered before - * the algorithm is executed. + * If multiple labels share the same feature value then they are aggregated using + * the weighted average before the algorithm is executed. * @return Isotonic regression model. */ @Since("1.3.0") @@ -298,8 +297,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. - * If multiple labels share the same feature value then they are ordered before - * the algorithm is executed. + * If multiple labels share the same feature value then they are aggregated using + * the weighted average before the algorithm is executed. * @return Isotonic regression model. */ @Since("1.3.0") @@ -310,21 +309,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali /** * Aggregates points of duplicate feature values into a single point using as label the weighted * average of the labels of the points with duplicate feature values. All points for a unique - * feature values are aggregated as: + * feature value are aggregated as: * - * - Aggregated label is the weighted average of all labels - * - Aggregated feature is the weighted average of all equal features[1] - * - Aggregated weight is the sum of all weights + * - Aggregated label is the weighted average of all labels. + * - Aggregated feature is the unique feature value. + * - Aggregated weight is the sum of all weights. * - * [1] Note: It is possible that feature values to be equal up to a resolution due to - * representation errors, since we cannot know which feature value to use in that case, we - * compute the weighted average of the features. Ideally, all feature values will be equal and - * the weighted average is just the value at any point. - * - * @param input - * Input data of tuples (label, feature, weight). Weights must be non-negative. - * @return - * Points with unique feature values. + * @param input Input data of tuples (label, feature, weight). Weights must be non-negative. + * @return Points with unique feature values. */ private[regression] def makeUnique( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { @@ -339,28 +331,28 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali if (cleanInput.length <= 1) { cleanInput } else { - // whether or not two double features are equal up to a precision - @inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b) - val pointsAccumulator = new IsotonicRegression.PointsAccumulator - var (_, prevFeature, _) = cleanInput.head - - // Go through input points, merging all points with approximately equal feature values into - // a single point. Equality of features is defined by areEqual method. The label of the - // accumulated points is the weighted average of the labels of all points of equal feature - // value. It is possible that feature values to be equal up to a resolution due to - // representation errors, since we cannot know which feature value to use in that case, - // we compute the weighted average of the features. - cleanInput.foreach { case point @ (_, feature, _) => - if (areEqual(feature, prevFeature)) { + + // Go through input points, merging all points with equal feature values into a single point. + // Equality of features is defined by shouldAccumulate method. The label of the accumulated + // points is the weighted average of the labels of all points of equal feature value. + + // Initialize with first point + pointsAccumulator := cleanInput.head + // Accumulate the rest + cleanInput.tail.foreach { case point @ (_, feature, _) => + if (pointsAccumulator.shouldAccumulate(feature)) { + // Still on a duplicate feature, accumulate pointsAccumulator += point } else { + // A new unique feature encountered: + // - append the last accumulated point to unique features output pointsAccumulator.appendToOutput() + // - and reset pointsAccumulator := point } - prevFeature = feature } - // Append the last accumulated point + // Append the last accumulated point to unique features output pointsAccumulator.appendToOutput() pointsAccumulator.getOutput } @@ -488,14 +480,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // Points with same or adjacent features must collocate within the same partition. .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) .values - // Lexicographically sort points by features then labels. - .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) + // Lexicographically sort points by features. + .mapPartitions(p => Iterator(p.toArray.sortBy(_._2))) // Aggregate points with equal features into a single point. .map(makeUnique) .flatMap(poolAdjacentViolators) .collect() // Sort again because collect() doesn't promise ordering. - .sortBy(x => (x._2, x._1)) + .sortBy(_._2) poolAdjacentViolators(parallelStepResult) } } @@ -511,30 +503,32 @@ object IsotonicRegression { private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = (0d, 0d, 0d) + /** Whether or not this feature exactly equals the current accumulated feature. */ + @inline def shouldAccumulate(feature: Double): Boolean = currentFeature == feature + /** Resets the current value of the point accumulator using the provided point. */ - def :=(point: (Double, Double, Double)): Unit = { + @inline def :=(point: (Double, Double, Double)): Unit = { val (label, feature, weight) = point currentLabel = label * weight - currentFeature = feature * weight + currentFeature = feature currentWeight = weight } /** Accumulates the provided point into the current value of the point accumulator. */ - def +=(point: (Double, Double, Double)): Unit = { - val (label, feature, weight) = point + @inline def +=(point: (Double, Double, Double)): Unit = { + val (label, _, weight) = point currentLabel += label * weight - currentFeature += feature * weight currentWeight += weight } /** Appends the current value of the point accumulator to the output. */ - def appendToOutput(): Unit = + @inline def appendToOutput(): Unit = output += (( currentLabel / currentWeight, - currentFeature / currentWeight, + currentFeature, currentWeight)) /** Returns all accumulated points so far. */ - def getOutput: Array[(Double, Double, Double)] = output.toArray + @inline def getOutput: Array[(Double, Double, Double)] = output.toArray } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index b59d16be6cd0a..a206e922e5fc4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.regression -import org.apache.commons.math3.util.Precision import org.scalatest.matchers.must.Matchers import org.apache.spark.{SparkException, SparkFunSuite} @@ -225,12 +224,18 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w test("SPARK-41008 isotonic regression with duplicate features differs from sklearn") { val model = runIsotonicRegressionOnInput( - Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + Seq((1, 0.6, 1), (0, 0.6, 1), + (0, 1.0 / 3, 1), (1, 1.0 / 3, 1), (0, 1.0 / 3, 1), + (1, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1)), true, 2) - assert(model.boundaries === Array(1.0, 3.0)) - assert(model.predictions === Array(0.75, 0.75)) + assert(model.boundaries === Array(0.2, 1.0 / 3, 0.6)) + assert(model.predictions === Array(0.25, 1.0 / 3, 0.5)) + + assert(model.predict(0.6) === 0.5) + assert(model.predict(1.0 / 3) === 1.0 / 3) + assert(model.predict(0.2) === 0.25) } test("isotonic regression prediction") { @@ -327,9 +332,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w test("makeUnique: handle duplicate features") { val regressor = new IsotonicRegression() import regressor.makeUnique - import Precision.EPSILON - // Note: input must be lexicographically sorted by (feature, label) + // Note: input must be lexicographically sorted by feature // empty assert(makeUnique(Array.empty) === Array.empty) @@ -373,9 +377,14 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0), (10.0, 3.0, 1.0))) - // duplicate up to resolution error - assert( - makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + EPSILON, 1.0))) === - Array((1.0, 1.0, 3.0))) + // don't handle tiny representation errors + // e.g. infinitely adjacent doubles are already unique + val adjacentDoubles = { + // i-th next representable double to 1.0 is java.lang.Double.longBitsToDouble(base + i) + val base = java.lang.Double.doubleToRawLongBits(1.0) + (0 until 10).map(i => java.lang.Double.longBitsToDouble(base + i)) + .map((1.0, _, 1.0)).toArray + } + assert(makeUnique(adjacentDoubles) === adjacentDoubles) } }