From f7534621562fc8a39a9015e9e41cc9257b12cb40 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 28 Jan 2020 21:15:13 +0800 Subject: [PATCH 1/7] init init init --- .../classification/LogisticRegression.scala | 60 ++-- .../optim/aggregator/LogisticAggregator.scala | 300 ++++++++++++++++-- .../classification/LogisticRegression.scala | 4 +- .../LogisticRegressionSuite.scala | 4 +- .../aggregator/LogisticAggregatorSuite.scala | 56 +++- python/pyspark/ml/classification.py | 22 +- 6 files changed, 373 insertions(+), 73 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 50c14d086957..4815407e2018 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -50,7 +50,8 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth + with HasBlockSize { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -430,6 +431,15 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -482,24 +492,17 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => - val instances = extractInstances(dataset) - - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - + override protected[spark] def train( + dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) + val sc = dataset.sparkSession.sparkContext + val instances = extractInstances(dataset) + val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => @@ -582,8 +585,9 @@ class LogisticRegression @Since("1.2.0") ( s"dangerous ground, so the algorithm may not converge.") } - val featuresMean = summarizer.mean.toArray - val featuresStd = summarizer.std.toArray + val featuresMean = summarizer.mean.compressed + val featuresStd = summarizer.std.compressed + val bcFeaturesStd = sc.broadcast(featuresStd) if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { @@ -595,9 +599,8 @@ class LogisticRegression @Since("1.2.0") ( val regParamL1 = $(elasticNetParam) * $(regParam) val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), - multinomial = isMultinomial)(_) + val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept), + multinomial = isMultinomial, $(blockSize))(_) val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { featuresStd(j / numCoefficientSets) } else { @@ -612,7 +615,21 @@ class LogisticRegression @Since("1.2.0") ( None } - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStd = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStd(i) + if (std != 0) array(i) = v / std + } + Instance(label, weight, Vectors.dense(array)) + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets @@ -806,6 +823,7 @@ class LogisticRegression @Since("1.2.0") ( state = states.next() arrayBuilder += state.adjustedValue } + blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { @@ -875,8 +893,6 @@ class LogisticRegression @Since("1.2.0") ( } } - if (handlePersistence) instances.unpersist() - val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, numClasses, isMultinomial)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index f2b3566f8f09..af1713585da5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.{DenseVector, Vector} +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg._ import org.apache.spark.mllib.util.MLUtils /** @@ -171,7 +171,6 @@ import org.apache.spark.mllib.util.MLUtils * * * @param bcCoefficients The broadcast coefficients corresponding to the features. - * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. @@ -183,13 +182,13 @@ import org.apache.spark.mllib.util.MLUtils * since this form is optimal for the matrix operations used for prediction. */ private[ml] class LogisticAggregator( - bcFeaturesStd: Broadcast[Array[Double]], + numFeatures: Int, numClasses: Int, fitIntercept: Boolean, - multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging { + multinomial: Boolean, + blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { - private val numFeatures = bcFeaturesStd.value.length private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size protected override val dim: Int = coefficientSize @@ -209,6 +208,70 @@ private[ml] class LogisticAggregator( s"got type ${bcCoefficients.value.getClass}.)") } + // Helper vectors and matrices for binary: + @transient private lazy val binaryLinear = { + if (!multinomial) { + if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } + } else { + null + } + } + + @transient private lazy val binaryIntercept = + if (!multinomial && fitIntercept) bcCoefficients.value(numFeatures) else 0.0 + + @transient private lazy val binaryLinearGradSumVec = { + if (!multinomial && fitIntercept) { + new DenseVector(Array.ofDim[Double](numFeatures)) + } else { + null + } + } + + @transient private lazy val auxiliaryVec = + new DenseVector(Array.ofDim[Double](blockSize)) + + // Helper vectors and matrices for multinomial + @transient private lazy val multinomialLinear = { + if (multinomial) { + if (fitIntercept) { + new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures)) + } else { + new DenseMatrix(numClasses, numFeatures, coefficientsArray) + } + } else { + null + } + } + + @transient private lazy val multinomialIntercept = { + if (multinomial && fitIntercept) { + new DenseVector(coefficientsArray.takeRight(numClasses)) + } else { + null + } + } + + @transient private lazy val multinomialLinearGradSumMat = { + if (multinomial) { + new DenseMatrix(numFeatures, numClasses, Array.ofDim[Double](numClasses * numFeatures)) + } else { + null + } + } + + @transient private lazy val multinomialAuxiliaryMat = { + if (multinomial) { + new DenseMatrix(blockSize, numClasses, Array.ofDim[Double](blockSize * numClasses)) + } else { + null + } + } + if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + s"coefficients for positive and negative classes. When no regularization is applied, the" + @@ -219,15 +282,12 @@ private[ml] class LogisticAggregator( /** Update gradient and loss using binary loss function. */ private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray val margin = - { var sum = 0.0 features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } + sum += localCoefficients(index) * value } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -236,9 +296,7 @@ private[ml] class LogisticAggregator( val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientArray(index) += multiplier * value / localFeaturesStd(index) - } + localGradientArray(index) += multiplier * value } if (fitIntercept) { @@ -253,6 +311,63 @@ private[ml] class LogisticAggregator( } } + /** Update gradient and loss using binary loss function. */ + private def binaryUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + val localGradientSumArray = gradientSumArray + + // vec here represents margins or negative dotProducts + val vec = if (size == blockSize) { + auxiliaryVec + } else { + // the last block within one partition may be of size less than blockSize + new DenseVector(Array.ofDim[Double](size)) + } + + if (fitIntercept) { + var i = 0 + while (i < size) { + vec.values(i) = binaryIntercept + i += 1 + } + BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) + } else { + BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec) + } + + // in-place convert margins to multiplier + // then, vec represents multiplier + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + val margin = vec(i) + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += weight * MLUtils.log1pExp(margin) + } else { + lossSum += weight * (MLUtils.log1pExp(margin) - margin) + } + val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) + vec.values(i) = multiplier + } else { + vec.values(i) = 0.0 + } + i += 1 + } + + if (fitIntercept) { + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, binaryLinearGradSumVec) + binaryLinearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + localGradientSumArray(numFeatures) += vec.values.sum + } else { + val gradSumVec = new DenseVector(localGradientSumArray) + BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) + } + } + /** Update gradient and loss using multinomial (softmax) loss function. */ private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations @@ -260,7 +375,6 @@ private[ml] class LogisticAggregator( Note: this can still be used when numClasses = 2 for binary logistic regression without pivoting. */ - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray @@ -270,13 +384,10 @@ private[ml] class LogisticAggregator( val margins = new Array[Double](numClasses) features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - margins(j) += localCoefficients(index * numClasses + j) * stdValue - j += 1 - } + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * value + j += 1 } } var i = 0 @@ -314,13 +425,10 @@ private[ml] class LogisticAggregator( multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) } features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue - j += 1 - } + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += weight * multipliers(j) * value + j += 1 } } if (fitIntercept) { @@ -339,6 +447,116 @@ private[ml] class LogisticAggregator( lossSum += weight * loss } + /** Update gradient and loss using multinomial (softmax) loss function. */ + private def multinomialUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + val localGradientSumArray = gradientSumArray + + // mat here represents margins, shape: S X C + val mat = if (size == blockSize) { + multinomialAuxiliaryMat + } else { + // the last block within one partition may be of size less than blockSize + new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) + } + + if (fitIntercept) { + var i = 0 + while (i < size) { + var j = 0 + while (j < numClasses) { + mat.update(i, j, multinomialIntercept(j)) + j += 1 + } + i += 1 + } + BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat) + } else { + BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat) + } + + // in-place convert margins to multipliers + // then, mat represents multipliers + var i = 0 + val tmp = Array.ofDim[Double](numClasses) + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + + var maxMargin = Double.NegativeInfinity + var j = 0 + while (j < numClasses) { + tmp(j) = mat(i, j) + maxMargin = math.max(maxMargin, tmp(j)) + j += 1 + } + + // marginOfLabel is margins(label) in the formula + val marginOfLabel = tmp(label.toInt) + + var sum = 0.0 + j = 0 + while (j < numClasses) { + if (maxMargin > 0) tmp(j) -= maxMargin + val exp = math.exp(tmp(j)) + sum += exp + tmp(j) = exp + j += 1 + } + + j = 0 + while (j < numClasses) { + val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0)) + mat.update(i, j, multiplier) + j += 1 + } + + if (maxMargin > 0) { + lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) + } else { + lossSum += weight * (math.log(sum) - marginOfLabel) + } + } else { + var j = 0 + while (j < numClasses) { + mat.update(i, j, 0.0) + j += 1 + } + } + i += 1 + } + + // block.matrix: S X F, unknown type + // mat (multipliers): S X C, dense + // multinomialLinearGradSumMat: F X C, dense + // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense + block.matrix match { + case dm: DenseMatrix if !fitIntercept => + // If fitIntercept==false, gradientSumArray += mat.T X matrix + // GEMM requires block.matrix is dense + val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray) + BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) + + case _ => + // Otherwise, use multinomialLinearGradSumMat as a temp matrix: + // multinomialLinearGradSumMat = matrix.T X mat + BLAS.gemm(1.0, block.matrix.transpose, mat, + 0.0, multinomialLinearGradSumMat) + multinomialLinearGradSumMat.foreachActive { (i, j, v) => + if (v != 0) localGradientSumArray(i * numClasses + j) += v + } + + if (fitIntercept) { + val start = numClasses * numFeatures + mat.foreachActive { (i, j, v) => + if (v != 0) localGradientSumArray(start + j) += v + } + } + } + } + /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient * of the objective function. @@ -363,4 +581,28 @@ private[ml] class LogisticAggregator( this } } + + /** + * Add a new training instance block to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The instance block of data point to be added. + * @return This LogisticAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + + if (multinomial) { + multinomialUpdateInPlace(block) + } else { + binaryUpdateInPlace(block) + } + + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 21eb17dfaacb..f88f3fce61b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -339,10 +339,8 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6d31e6efc7e1..9e359ba098bf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in LogisticAggregator") { val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2, + val binaryAgg = new LogisticAggregator(1, 2, fitIntercept = true, multinomial = false)(bcCoefficientsBinary) val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { @@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { assert(thrownBinary.getMessage.contains("coefficients only supports dense")) val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) - val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3, + val multinomialAgg = new LogisticAggregator(1, 3, fitIntercept = true, multinomial = true)(bcCoefficientsMulti) val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index e699adcc14c0..83718076dde7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( + )) + instancesConstantFeatureFiltered = standardize(Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - ) + )) } /** Get summary statistics for some data and create a new LogisticAggregator. */ @@ -55,13 +55,27 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { coefficients: Vector, fitIntercept: Boolean, isMultinomial: Boolean): LogisticAggregator = { - val (featuresSummarizer, ySummarizer) = + val (_, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val numFeatures = instances.head.features.size val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + } + + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } } test("aggregator add method input size") { @@ -277,4 +291,24 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { validateGradient(aggConstantFeatureBinary.gradient, aggConstantFeatureBinaryFiltered.gradient, 1) } + + test("add instance block") { + val binaryInstances = instances.map { instance => + if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) + } + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + + val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) + binaryInstances.foreach(agg.add) + + val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) + val block = InstanceBlock.fromInstances(binaryInstances) + agg2.add(block) + + assert(agg.loss ~== agg2.loss relTol 1e-8) + assert(agg.gradient ~== agg2.gradient relTol 1e-8) + } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 89d27fbfa316..cd67d4f8ce7e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -388,7 +388,7 @@ def intercept(self): class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold): + HasThreshold, HasBlockSize): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -570,6 +570,8 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams, 10 >>> blor.clear(blor.maxIter) >>> blorModel = blor.fit(bdf) + >>> blorModel.getBlockSize() + 4096 >>> blorModel.setFeaturesCol("features") LogisticRegressionModel... >>> blorModel.setProbabilityCol("newProbability") @@ -638,7 +640,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -647,13 +649,14 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") + self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -666,7 +669,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -674,7 +677,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -769,6 +772,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams, JavaMLWritable, JavaMLReadable, HasTrainingSummary): From 3e0a8e740a5f28fec92f06813676da5fee99f6be Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Jan 2020 10:30:19 +0800 Subject: [PATCH 2/7] use local var --- .../ml/optim/aggregator/HingeAggregator.scala | 25 ++---- .../optim/aggregator/LogisticAggregator.scala | 78 ++++--------------- 2 files changed, 22 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 25f7c9ddab42..9d9180a91004 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -55,20 +55,6 @@ private[ml] class HingeAggregator( } } - @transient private lazy val intercept = - if (fitIntercept) coefficientsArray(numFeatures) else 0.0 - - @transient private lazy val linearGradSumVec = { - if (fitIntercept) { - new DenseVector(Array.ofDim[Double](numFeatures)) - } else { - null - } - } - - @transient private lazy val auxiliaryVec = - new DenseVector(Array.ofDim[Double](blockSize)) - /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -138,14 +124,10 @@ private[ml] class HingeAggregator( val localGradientSumArray = gradientSumArray // vec here represents dotProducts - val vec = if (size == blockSize) { - auxiliaryVec - } else { - // the last block within one partition may be of size less than blockSize - new DenseVector(Array.ofDim[Double](size)) - } + val vec = new DenseVector(Array.ofDim[Double](size)) if (fitIntercept) { + val intercept = coefficientsArray.last var i = 0 while (i < size) { vec.values(i) = intercept @@ -185,6 +167,9 @@ private[ml] class HingeAggregator( if (vec.values.forall(_ == 0)) return this if (fitIntercept) { + // localGradientSumArray is of size numFeatures+1, so can not + // be directly used as the output of BLAS.gemv + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } localGradientSumArray(numFeatures) += vec.values.sum diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index af1713585da5..52af815a4b52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -208,7 +208,6 @@ private[ml] class LogisticAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - // Helper vectors and matrices for binary: @transient private lazy val binaryLinear = { if (!multinomial) { if (fitIntercept) { @@ -221,21 +220,6 @@ private[ml] class LogisticAggregator( } } - @transient private lazy val binaryIntercept = - if (!multinomial && fitIntercept) bcCoefficients.value(numFeatures) else 0.0 - - @transient private lazy val binaryLinearGradSumVec = { - if (!multinomial && fitIntercept) { - new DenseVector(Array.ofDim[Double](numFeatures)) - } else { - null - } - } - - @transient private lazy val auxiliaryVec = - new DenseVector(Array.ofDim[Double](blockSize)) - - // Helper vectors and matrices for multinomial @transient private lazy val multinomialLinear = { if (multinomial) { if (fitIntercept) { @@ -248,29 +232,6 @@ private[ml] class LogisticAggregator( } } - @transient private lazy val multinomialIntercept = { - if (multinomial && fitIntercept) { - new DenseVector(coefficientsArray.takeRight(numClasses)) - } else { - null - } - } - - @transient private lazy val multinomialLinearGradSumMat = { - if (multinomial) { - new DenseMatrix(numFeatures, numClasses, Array.ofDim[Double](numClasses * numFeatures)) - } else { - null - } - } - - @transient private lazy val multinomialAuxiliaryMat = { - if (multinomial) { - new DenseMatrix(blockSize, numClasses, Array.ofDim[Double](blockSize * numClasses)) - } else { - null - } - } if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + @@ -317,17 +278,13 @@ private[ml] class LogisticAggregator( val localGradientSumArray = gradientSumArray // vec here represents margins or negative dotProducts - val vec = if (size == blockSize) { - auxiliaryVec - } else { - // the last block within one partition may be of size less than blockSize - new DenseVector(Array.ofDim[Double](size)) - } + val vec = new DenseVector(Array.ofDim[Double](size)) if (fitIntercept) { + val intercept = coefficientsArray.last var i = 0 while (i < size) { - vec.values(i) = binaryIntercept + vec.values(i) = intercept i += 1 } BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) @@ -359,8 +316,11 @@ private[ml] class LogisticAggregator( } if (fitIntercept) { - BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, binaryLinearGradSumVec) - binaryLinearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + // localGradientSumArray is of size numFeatures+1, so can not + // be directly used as the output of BLAS.gemv + val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } localGradientSumArray(numFeatures) += vec.values.sum } else { val gradSumVec = new DenseVector(localGradientSumArray) @@ -453,19 +413,15 @@ private[ml] class LogisticAggregator( val localGradientSumArray = gradientSumArray // mat here represents margins, shape: S X C - val mat = if (size == blockSize) { - multinomialAuxiliaryMat - } else { - // the last block within one partition may be of size less than blockSize - new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) - } + val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses)) if (fitIntercept) { + val intercept = coefficientsArray.takeRight(numClasses) var i = 0 while (i < size) { var j = 0 while (j < numClasses) { - mat.update(i, j, multinomialIntercept(j)) + mat.update(i, j, intercept(j)) j += 1 } i += 1 @@ -530,7 +486,6 @@ private[ml] class LogisticAggregator( // block.matrix: S X F, unknown type // mat (multipliers): S X C, dense - // multinomialLinearGradSumMat: F X C, dense // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense block.matrix match { case dm: DenseMatrix if !fitIntercept => @@ -540,11 +495,12 @@ private[ml] class LogisticAggregator( BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) case _ => - // Otherwise, use multinomialLinearGradSumMat as a temp matrix: - // multinomialLinearGradSumMat = matrix.T X mat - BLAS.gemm(1.0, block.matrix.transpose, mat, - 0.0, multinomialLinearGradSumMat) - multinomialLinearGradSumMat.foreachActive { (i, j, v) => + // Otherwise, use linearGradSumVec (F X C) as a temp matrix: + // linearGradSumVec = matrix.T X mat + val linearGradSumMat = new DenseMatrix(numFeatures, numClasses, + Array.ofDim[Double](numFeatures * numClasses)) + BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat) + linearGradSumMat.foreachActive { (i, j, v) => if (v != 0) localGradientSumArray(i * numClasses + j) += v } From 36245b6aeaf1d87d94a01afbe0693f8778a2bc1a Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Jan 2020 10:31:43 +0800 Subject: [PATCH 3/7] nit --- .../apache/spark/ml/optim/aggregator/LogisticAggregator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 52af815a4b52..5e9e33161172 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -495,8 +495,8 @@ private[ml] class LogisticAggregator( BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat) case _ => - // Otherwise, use linearGradSumVec (F X C) as a temp matrix: - // linearGradSumVec = matrix.T X mat + // Otherwise, use linearGradSumMat (F X C) as a temp matrix: + // linearGradSumMat = matrix.T X mat val linearGradSumMat = new DenseMatrix(numFeatures, numClasses, Array.ofDim[Double](numFeatures * numClasses)) BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat) From f0e2e40f1ea7123f9ac942b77b379c31486792b6 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Jan 2020 16:34:02 +0800 Subject: [PATCH 4/7] update default blocksize=1024 --- .../spark/ml/classification/LinearSVC.scala | 2 +- .../classification/LogisticRegression.scala | 2 +- .../ml/optim/aggregator/HingeAggregator.scala | 2 +- .../optim/aggregator/LogisticAggregator.scala | 2 +- .../ml/param/shared/SharedParamsCodeGen.scala | 4 ++-- .../spark/ml/param/shared/sharedParams.scala | 12 +++++------ python/pyspark/ml/classification.py | 20 +++++++++---------- .../ml/param/_shared_params_code_gen.py | 2 +- python/pyspark/ml/param/shared.py | 2 +- 9 files changed, 24 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 6b1cdd8ad396..7631247ddecf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -157,7 +157,7 @@ class LinearSVC @Since("2.2.0") ( /** * Set block size for stacking input data in matrices. - * Default is 4096. + * Default is 1024. * * @group expertSetParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 4815407e2018..1c8160b4e883 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -433,7 +433,7 @@ class LogisticRegression @Since("1.2.0") ( /** * Set block size for stacking input data in matrices. - * Default is 4096. + * Default is 1024. * * @group expertSetParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 9d9180a91004..e3b28bffe42f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -36,7 +36,7 @@ import org.apache.spark.ml.linalg._ private[ml] class HingeAggregator( numFeatures: Int, fitIntercept: Boolean, - blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) + blockSize: Int = 1024)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 5e9e33161172..f22d0e7cf303 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -186,7 +186,7 @@ private[ml] class LogisticAggregator( numClasses: Int, fitIntercept: Boolean, multinomial: Boolean, - blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) + blockSize: Int = 1024)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index eee75e7f5b72..3d1fab8692af 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,10 +104,10 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation."), + "validation"), ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + "stacked within partitions. If block size is more than remaining data in a partition " + - "then it is adjusted to the size of this data.", Some("4096"), + "then it is adjusted to the size of this data", Some("1024"), isValid = "ParamValidators.gt(0)", isExpertParam = true) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 3d1c55a5eb42..7fe8ccd973a7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -570,29 +570,29 @@ trait HasDistanceMeasure extends Params { trait HasValidationIndicatorCol extends Params { /** - * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.. + * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation. * @group param */ - final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.") + final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation") /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } /** - * Trait for shared param blockSize (default: 4096). This trait may be changed or + * Trait for shared param blockSize (default: 1024). This trait may be changed or * removed between minor versions. */ @DeveloperApi trait HasBlockSize extends Params { /** - * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. * @group expertParam */ - final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0)) - setDefault(blockSize, 4096) + setDefault(blockSize, 1024) /** @group expertGetParam */ final def getBlockSize: Int = $(blockSize) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index cd67d4f8ce7e..7221b0f81ae0 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -255,19 +255,19 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, threshold=0.0, aggregationDepth=2, - blockSize=4096) + blockSize=1024) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -276,12 +276,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2, blockSize=4096): + aggregationDepth=2, blockSize=1024): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -640,7 +640,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -649,14 +649,14 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", - blockSize=4096) + blockSize=1024) kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -669,7 +669,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -677,7 +677,7 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=4096): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 3994625c05f1..fb4d55d57a2d 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -167,7 +167,7 @@ def get$Name(self): None, "TypeConverters.toString"), ("blockSize", "block size for stacking input data in matrices. Data is stacked within " "partitions. If block size is more than remaining data in a partition then it is " - "adjusted to the size of this data.", "4096", "TypeConverters.toInt")] + "adjusted to the size of this data.", "1024", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 41ba7b9dc552..456463580878 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -591,7 +591,7 @@ class HasBlockSize(Params): def __init__(self): super(HasBlockSize, self).__init__() - self._setDefault(blockSize=4096) + self._setDefault(blockSize=1024) def getBlockSize(self): """ From 8a6015f85e784f04b164c34b02e1f87ab469d440 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Jan 2020 18:31:17 +0800 Subject: [PATCH 5/7] del unused blockSize in agg --- .../scala/org/apache/spark/ml/classification/LinearSVC.scala | 2 +- .../apache/spark/ml/classification/LogisticRegression.scala | 2 +- .../org/apache/spark/ml/optim/aggregator/HingeAggregator.scala | 3 +-- .../apache/spark/ml/optim/aggregator/LogisticAggregator.scala | 3 +-- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 7631247ddecf..f16648d2abee 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -240,7 +240,7 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_) + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 1c8160b4e883..9b5b36257a58 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -600,7 +600,7 @@ class LogisticRegression @Since("1.2.0") ( val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept), - multinomial = isMultinomial, $(blockSize))(_) + multinomial = isMultinomial)(_) val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { featuresStd(j / numCoefficientSets) } else { diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index e3b28bffe42f..7fc8f320e189 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -35,8 +35,7 @@ import org.apache.spark.ml.linalg._ */ private[ml] class HingeAggregator( numFeatures: Int, - fitIntercept: Boolean, - blockSize: Int = 1024)(bcCoefficients: Broadcast[Vector]) + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index f22d0e7cf303..93c94459d52e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -185,8 +185,7 @@ private[ml] class LogisticAggregator( numFeatures: Int, numClasses: Int, fitIntercept: Boolean, - multinomial: Boolean, - blockSize: Int = 1024)(bcCoefficients: Broadcast[Vector]) + multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging { private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures From a249fdfd262843fede6b58777c58045a49c50165 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Jan 2020 18:33:19 +0800 Subject: [PATCH 6/7] fix py --- python/pyspark/ml/classification.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 7221b0f81ae0..bb9cd034808f 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -216,7 +216,7 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable >>> model.getThreshold() 0.5 >>> model.getBlockSize() - 4096 + 1024 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -571,7 +571,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams, >>> blor.clear(blor.maxIter) >>> blorModel = blor.fit(bdf) >>> blorModel.getBlockSize() - 4096 + 1024 >>> blorModel.setFeaturesCol("features") LogisticRegressionModel... >>> blorModel.setProbabilityCol("newProbability") From c49b379fe697ffb69bb8565eea6644528992f572 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 30 Jan 2020 11:59:34 +0800 Subject: [PATCH 7/7] use Array.fill --- .../spark/ml/optim/aggregator/HingeAggregator.scala | 13 ++++++------- .../ml/optim/aggregator/LogisticAggregator.scala | 13 ++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 7fc8f320e189..292187b3e146 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -123,15 +123,14 @@ private[ml] class HingeAggregator( val localGradientSumArray = gradientSumArray // vec here represents dotProducts - val vec = new DenseVector(Array.ofDim[Double](size)) + val vec = if (fitIntercept && coefficientsArray.last != 0) { + val intercept = coefficientsArray.last + new DenseVector(Array.fill(size)(intercept)) + } else { + new DenseVector(Array.ofDim[Double](size)) + } if (fitIntercept) { - val intercept = coefficientsArray.last - var i = 0 - while (i < size) { - vec.values(i) = intercept - i += 1 - } BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 93c94459d52e..76d21995a2c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -277,15 +277,14 @@ private[ml] class LogisticAggregator( val localGradientSumArray = gradientSumArray // vec here represents margins or negative dotProducts - val vec = new DenseVector(Array.ofDim[Double](size)) + val vec = if (fitIntercept && coefficientsArray.last != 0) { + val intercept = coefficientsArray.last + new DenseVector(Array.fill(size)(intercept)) + } else { + new DenseVector(Array.ofDim[Double](size)) + } if (fitIntercept) { - val intercept = coefficientsArray.last - var i = 0 - while (i < size) { - vec.values(i) = intercept - i += 1 - } BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) } else { BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec)