diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala index 8a1a41b2950c..f83621506500 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala @@ -17,8 +17,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg._ /** * HuberAggregator computes the gradient and loss for a huber loss function, @@ -62,19 +62,17 @@ import org.apache.spark.ml.linalg.Vector * * @param fitIntercept Whether to fit an intercept term. * @param epsilon The shape parameter to control the amount of robustness. - * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param bcParameters including three parts: the regression coefficients corresponding * to the features, the intercept (if fitIntercept is ture) * and the scale parameter (sigma). */ private[ml] class HuberAggregator( + numFeatures: Int, fitIntercept: Boolean, - epsilon: Double, - bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, HuberAggregator] { + epsilon: Double)(bcParameters: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, HuberAggregator] { protected override val dim: Int = bcParameters.value.size - private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1 private val sigma: Double = bcParameters.value(dim - 1) private val intercept: Double = if (fitIntercept) { bcParameters.value(dim - 2) @@ -82,7 +80,8 @@ private[ml] class HuberAggregator( 0.0 } // make transient so we do not serialize between aggregation stages - @transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures) + @transient private lazy val linear = + new DenseVector(bcParameters.value.toArray.take(numFeatures)) /** * Add a new training instance to this HuberAggregator, and update the loss and gradient @@ -98,16 +97,13 @@ private[ml] class HuberAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficients + val localCoefficients = linear.values val localGradientSumArray = gradientSumArray val margin = { var sum = 0.0 features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * (value / localFeaturesStd(index)) - } + sum += localCoefficients(index) * value } if (fitIntercept) sum += intercept sum @@ -119,10 +115,7 @@ private[ml] class HuberAggregator( val linearLossDivSigma = linearLoss / sigma features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientSumArray(index) += - -1.0 * weight * linearLossDivSigma * (value / localFeaturesStd(index)) - } + localGradientSumArray(index) -= weight * linearLossDivSigma * value } if (fitIntercept) { localGradientSumArray(dim - 2) += -1.0 * weight * linearLossDivSigma @@ -134,10 +127,7 @@ private[ml] class HuberAggregator( (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon) features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientSumArray(index) += - weight * sign * epsilon * (value / localFeaturesStd(index)) - } + localGradientSumArray(index) += weight * sign * epsilon * value } if (fitIntercept) { localGradientSumArray(dim - 2) += weight * sign * epsilon @@ -149,4 +139,75 @@ private[ml] class HuberAggregator( this } } + + /** + * Add a new training instance block to this HuberAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The instance block of data point to be added. + * @return This HuberAggregator object. + */ + def add(block: InstanceBlock): HuberAggregator = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + val localGradientSumArray = gradientSumArray + + // vec here represents margins or dotProducts + val vec = if (fitIntercept && intercept != 0) { + new DenseVector(Array.fill(size)(intercept)) + } else { + new DenseVector(Array.ofDim[Double](size)) + } + + if (fitIntercept) { + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + } else { + BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) + } + + // in-place convert margins to multipliers + // then, vec represents multipliers + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + val margin = vec(i) + val linearLoss = label - margin + + if (math.abs(linearLoss) <= sigma * epsilon) { + lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma) + val linearLossDivSigma = linearLoss / sigma + val multiplier = -1.0 * weight * linearLossDivSigma + vec.values(i) = multiplier + localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0)) + } else { + lossSum += 0.5 * weight * + (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon) + val sign = if (linearLoss >= 0) -1.0 else 1.0 + val multiplier = weight * sign * epsilon + vec.values(i) = multiplier + localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - epsilon * epsilon) + } + } else { + vec.values(i) = 0.0 + } + i += 1 + } + + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + if (fitIntercept) { + localGradientSumArray(dim - 2) += vec.values.sum + } + + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala index 7a5806dc24ae..a8bda9ca5d24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala @@ -17,8 +17,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg._ /** * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function, @@ -157,26 +157,25 @@ private[ml] class LeastSquaresAggregator( labelStd: Double, labelMean: Double, fitIntercept: Boolean, - bcFeaturesStd: Broadcast[Array[Double]], - bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] { + bcFeaturesStd: Broadcast[Vector], + bcFeaturesMean: Broadcast[Vector])(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, LeastSquaresAggregator] { require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " + s"deviation to be positive.") - private val numFeatures = bcFeaturesStd.value.length + private val numFeatures = bcFeaturesStd.value.size protected override val dim: Int = numFeatures // make transient so we do not serialize between aggregation stages - @transient private lazy val featuresStd = bcFeaturesStd.value @transient private lazy val effectiveCoefAndOffset = { val coefficientsArray = bcCoefficients.value.toArray.clone() val featuresMean = bcFeaturesMean.value + val featuresStd = bcFeaturesStd.value var sum = 0.0 var i = 0 val len = coefficientsArray.length while (i < len) { if (featuresStd(i) != 0.0) { - coefficientsArray(i) /= featuresStd(i) - sum += coefficientsArray(i) * featuresMean(i) + sum += coefficientsArray(i) / featuresStd(i) * featuresMean(i) } else { coefficientsArray(i) = 0.0 } @@ -186,7 +185,7 @@ private[ml] class LeastSquaresAggregator( (Vectors.dense(coefficientsArray), offset) } // do not use tuple assignment above because it will circumvent the @transient tag - @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1 + @transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1 @transient private lazy val offset = effectiveCoefAndOffset._2 /** @@ -204,16 +203,20 @@ private[ml] class LeastSquaresAggregator( if (weight == 0.0) return this - val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset + val localEffectiveCoefficientsVec = effectiveCoefficientsVec + + val diff = { + var dot = 0.0 + features.foreachNonZero { (index, value) => + dot += localEffectiveCoefficientsVec(index) * value + } + dot - label / labelStd + offset + } if (diff != 0) { val localGradientSumArray = gradientSumArray - val localFeaturesStd = featuresStd features.foreachNonZero { (index, value) => - val fStd = localFeaturesStd(index) - if (fStd != 0.0) { - localGradientSumArray(index) += weight * diff * value / fStd - } + localGradientSumArray(index) += weight * diff * value } lossSum += weight * diff * diff / 2.0 } @@ -221,4 +224,43 @@ private[ml] class LeastSquaresAggregator( this } } + + /** + * Add a new training instance block to this LeastSquaresAggregator, and update the loss + * and gradient of the objective function. + * + * @param block The instance block of data point to be added. + * @return This LeastSquaresAggregator object. + */ + def add(block: InstanceBlock): LeastSquaresAggregator = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + + // vec here represents diffs + val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd)) + BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec) + + // in-place convert diffs to multipliers + // then, vec represents multipliers + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + val diff = vec(i) + lossSum += weight * diff * diff / 2 + weightSum += weight + val multiplier = weight * diff + vec.values(i) = multiplier + i += 1 + } + + val gradSumVec = new DenseVector(gradientSumArray) + BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) + + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 64e5e191ffd1..fc59da8a9c16 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -28,7 +28,7 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.{PipelineStage, PredictorParams} -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.linalg.BLAS._ import org.apache.spark.ml.optim.WeightedLeastSquares @@ -55,7 +55,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver - with HasAggregationDepth with HasLoss { + with HasAggregationDepth with HasLoss with HasBlockSize { import LinearRegression._ @@ -316,6 +316,15 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setEpsilon(value: Double): this.type = set(epsilon, value) setDefault(epsilon -> 1.35) + /** + * Set block size for stacking input data in matrices. + * Default is 1024. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr => // Extract the number of features before deciding optimization solver. val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol)) @@ -354,9 +363,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel.setSummary(Some(trainingSummary)) } - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val (featuresSummarizer, ySummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std"), Summarizer.createSummarizerBuffer("mean", "std", "count")))( @@ -392,7 +398,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String s"will be zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") } - if (handlePersistence) instances.unpersist() val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean @@ -421,8 +426,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String // if y is constant (rawYStd is zero), then y cannot be scaled. In this case // setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm. val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean) - val featuresMean = featuresSummarizer.mean.toArray - val featuresStd = featuresSummarizer.std.toArray + val featuresMean = featuresSummarizer.mean.compressed + val featuresStd = featuresSummarizer.std.compressed val bcFeaturesMean = instances.context.broadcast(featuresMean) val bcFeaturesStd = instances.context.broadcast(featuresStd) @@ -442,23 +447,36 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam - val getFeaturesStd = (j: Int) => if (j >= 0 && j < numFeatures) featuresStd(j) else 0.0 val regularization = if (effectiveL2RegParam != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(effectiveL2RegParam, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) + if ($(standardization)) None else Some(featuresStd.apply))) } else { None } + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStd = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStd(i) + if (std != 0) array(i) = v / std + } + Instance(label, weight, Vectors.dense(array)) + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + val costFun = $(loss) match { case SquaredError => val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), bcFeaturesStd, bcFeaturesMean)(_) - new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) + new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) case Huber => - val getAggregatorFunc = new HuberAggregator($(fitIntercept), $(epsilon), bcFeaturesStd)(_) - new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) + val getAggregatorFunc = new HuberAggregator(numFeatures, $(fitIntercept), $(epsilon))(_) + new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) } val optimizer = $(loss) match { @@ -524,6 +542,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String throw new SparkException(msg) } + blocks.unpersist() bcFeaturesMean.destroy() bcFeaturesStd.destroy() @@ -557,7 +576,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String after the coefficients are converged. See the following discussion for detail. http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet */ - yMean - dot(Vectors.dense(rawCoefficients), Vectors.dense(featuresMean)) + yMean - dot(Vectors.dense(rawCoefficients), featuresMean) case Huber => parameters(numFeatures) } } else { @@ -572,8 +591,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String (Vectors.dense(rawCoefficients).compressed, interceptValue, scaleValue, arrayBuilder.result()) } - if (handlePersistence) instances.unpersist() - val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept, scale)) // Handle possible missing or invalid prediction columns val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala index 718ffa230a74..7c544e99f88b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( + )) + instancesConstantFeatureFiltered = standardize(Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - ) + )) } /** Get summary statistics for some data and create a new HuberAggregator. */ @@ -56,10 +56,28 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { fitIntercept: Boolean, epsilon: Double): HuberAggregator = { val (featuresSummarizer, _) = getRegressionSummarizers(instances) - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val numFeatures = featuresSummarizer.variance.size val bcParameters = spark.sparkContext.broadcast(parameters) - new HuberAggregator(fitIntercept, epsilon, bcFeaturesStd)(bcParameters) + new HuberAggregator(numFeatures, fitIntercept, epsilon)(bcParameters) + } + + private def standardize( + instances: Array[Instance], + std: Array[Double] = null): Array[Instance] = { + val stdArray = if (std == null) { + getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt) + } else { + std + } + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } } test("aggregator add method should check input size") { @@ -155,9 +173,15 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val parametersFiltered = Vectors.dense(2.0, 3.0, 4.0) val aggConstantFeature = getNewAggregator(instancesConstantFeature, parameters, fitIntercept = true, epsilon = 1.35) + // std of instancesConstantFeature + val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature) + ._1.variance.toArray.map(math.sqrt) + // Since 3.0.0, we start to standardize input outside of gradient computation, + // so here we use std of instancesConstantFeature to standardize instances + standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add) + val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered, parametersFiltered, fitIntercept = true, epsilon = 1.35) - instances.foreach(aggConstantFeature.add) instancesConstantFeatureFiltered.foreach(aggConstantFeatureFiltered.add) // constant features should not affect gradient def validateGradient(grad: Vector, gradFiltered: Vector): Unit = { @@ -167,4 +191,19 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeature.gradient, aggConstantFeatureFiltered.gradient) } + + test("add instance block") { + val paramWithIntercept = Vectors.dense(1.0, 2.0, 3.0, 4.0) + val agg1 = getNewAggregator(instances, paramWithIntercept, + fitIntercept = true, epsilon = 1.35) + instances.foreach(agg1.add) + + val agg2 = getNewAggregator(instances, paramWithIntercept, + fitIntercept = true, epsilon = 1.35) + val block = InstanceBlock.fromInstances(instances) + agg2.add(block) + + assert(agg1.loss ~== agg2.loss relTol 1e-8) + assert(agg1.gradient ~== agg2.gradient relTol 1e-8) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala index 35b694462470..5eb4e41c5826 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantLabel = Array( + )) + instancesConstantLabel = standardize(Array( Instance(1.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(1.0, 0.3, Vectors.dense(4.0, 0.5)) - ) + )) } /** Get summary statistics for some data and create a new LeastSquaresAggregator. */ @@ -57,15 +57,34 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val featuresStd = Vectors.dense(featuresSummarizer.variance.toArray.map(math.sqrt)) val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) - val featuresMean = featuresSummarizer.mean - val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray) - val bcCoefficients = spark.sparkContext.broadcast(coefficients) + val featuresMean = featuresSummarizer.mean.asML + val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.compressed) + val bcCoefficients = spark.sparkContext.broadcast(coefficients.compressed) new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, bcFeaturesMean)(bcCoefficients) } + private def standardize( + instances: Array[Instance], + std: Array[Double] = null): Array[Instance] = { + val stdArray = if (std == null) { + getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt) + } else { + std + } + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } + } + test("aggregator add method input size") { val coefficients = Vectors.dense(1.0, 2.0) val agg = getNewAggregator(instances, coefficients, fitIntercept = true) @@ -145,9 +164,15 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte test("check with zero standard deviation") { val coefficients = Vectors.dense(1.0, 2.0) + // aggConstantFeature contains std of instancesConstantFeature, and the std of dim=0 is 0 val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients, fitIntercept = true) - instances.foreach(aggConstantFeature.add) + // std of instancesConstantFeature + val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature) + ._1.variance.toArray.map(math.sqrt) + // Since 3.0.0, we start to standardize input outside of gradient computation, + // so here we use std of instancesConstantFeature to standardize instances + standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) @@ -157,4 +182,17 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte } } } + + test("add instance block") { + val coefficients = Vectors.dense(1.0, 2.0) + val agg1 = getNewAggregator(instances, coefficients, fitIntercept = true) + instances.foreach(agg1.add) + + val agg2 = getNewAggregator(instances, coefficients, fitIntercept = true) + val block = InstanceBlock.fromInstances(instances) + agg2.add(block) + + assert(agg1.loss ~== agg2.loss relTol 1e-8) + assert(agg1.gradient ~== agg2.gradient relTol 1e-8) + } } diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 481271f52315..464ef3879d94 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -62,7 +62,7 @@ class JavaRegressionModel(JavaPredictionModel, _JavaPredictorParams): class _LinearRegressionParams(_JavaPredictorParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasTol, HasFitIntercept, HasStandardization, HasWeightCol, HasSolver, - HasAggregationDepth, HasLoss): + HasAggregationDepth, HasLoss, HasBlockSize): """ Params for :py:class:`LinearRegression` and :py:class:`LinearRegressionModel`. @@ -124,6 +124,8 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J >>> lr.setRegParam(0.0) LinearRegression... >>> model = lr.fit(df) + >>> model.getBlockSize() + 1024 >>> model.setFeaturesCol("features") LinearRegressionModel... >>> model.setPredictionCol("newPrediction") @@ -169,17 +171,18 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35): + loss="squaredError", epsilon=1.35, blockSize=1024): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35) + loss="squaredError", epsilon=1.35, blockSize=1024) """ super(LinearRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.regression.LinearRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35) + self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35, + blockSize=1024) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -188,12 +191,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35): + loss="squaredError", epsilon=1.35, blockSize=1024): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35) + loss="squaredError", epsilon=1.35, blockSize=1024) Sets params for linear regression. """ kwargs = self._input_kwargs @@ -269,6 +272,13 @@ def setLoss(self, value): """ return self._set(lossType=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LinearRegressionModel(JavaRegressionModel, _LinearRegressionParams, GeneralJavaMLWritable, JavaMLReadable, HasTrainingSummary):