Skip to content

Commit 12e1bba

Browse files
committed
### What changes were proposed in this pull request? Revert #27360 #27396 #27374 #27389 ### Why are the changes needed? BLAS need more performace tests, specially on sparse datasets. Perfermance test of LogisticRegression (#27374) on sparse dataset shows that blockify vectors to matrices and use BLAS will cause performance regression. LinearSVC and LinearRegression were also updated in the same way as LogisticRegression, so we need to revert them to make sure no regression. ### Does this PR introduce any user-facing change? remove newly added param blockSize ### How was this patch tested? reverted testsuites Closes #27487 from zhengruifeng/revert_blockify_ii. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
1 parent a7451f4 commit 12e1bba

File tree

27 files changed

+260
-1071
lines changed

27 files changed

+260
-1071
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
502502
"org.apache.spark.ml.attribute.NumericAttribute",
503503

504504
"org.apache.spark.ml.feature.Instance",
505-
"org.apache.spark.ml.feature.InstanceBlock",
506505
"org.apache.spark.ml.feature.LabeledPoint",
507506
"org.apache.spark.ml.feature.OffsetInstance",
508507
"org.apache.spark.ml.linalg.DenseMatrix",

mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable {
682682

683683
val xTemp = xValues(k) * alpha
684684
while (i < indEnd) {
685+
val rowIndex = Arows(i)
685686
yValues(Arows(i)) += Avals(i) * xTemp
686687
i += 1
687688
}

mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
2626
import org.apache.spark.SparkException
2727
import org.apache.spark.annotation.Since
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
29+
import org.apache.spark.ml.feature.Instance
3030
import org.apache.spark.ml.linalg._
3131
import org.apache.spark.ml.optim.aggregator.HingeAggregator
3232
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
4141
/** Params for linear SVM Classifier. */
4242
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
4343
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
44-
with HasAggregationDepth with HasThreshold with HasBlockSize {
44+
with HasAggregationDepth with HasThreshold {
4545

4646
/**
4747
* Param for threshold in binary classification prediction.
@@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") (
155155
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
156156
setDefault(aggregationDepth -> 2)
157157

158-
/**
159-
* Set block size for stacking input data in matrices.
160-
* Default is 1024.
161-
*
162-
* @group expertSetParam
163-
*/
164-
@Since("3.0.0")
165-
def setBlockSize(value: Int): this.type = set(blockSize, value)
166-
167158
@Since("2.2.0")
168159
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
169160

170161
override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
162+
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
163+
164+
val instances = extractInstances(dataset)
165+
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
166+
171167
instr.logPipelineStage(this)
172168
instr.logDataset(dataset)
173169
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
174-
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
175-
176-
val sc = dataset.sparkSession.sparkContext
177-
val instances = extractInstances(dataset)
170+
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
178171

179172
val (summarizer, labelSummarizer) = instances.treeAggregate(
180173
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
@@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") (
215208
throw new SparkException(msg)
216209
}
217210

218-
val featuresStd = summarizer.std.compressed
219-
val bcFeaturesStd = sc.broadcast(featuresStd)
211+
val featuresStd = summarizer.std.toArray
212+
val getFeaturesStd = (j: Int) => featuresStd(j)
220213
val regParamL2 = $(regParam)
214+
val bcFeaturesStd = instances.context.broadcast(featuresStd)
221215
val regularization = if (regParamL2 != 0.0) {
222216
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
223217
Some(new L2Regularization(regParamL2, shouldApply,
224-
if ($(standardization)) None else Some(featuresStd.apply)))
218+
if ($(standardization)) None else Some(getFeaturesStd)))
225219
} else {
226220
None
227221
}
228222

229-
val standardized = instances.map {
230-
case Instance(label, weight, features) =>
231-
val featuresStd = bcFeaturesStd.value
232-
val array = Array.ofDim[Double](numFeatures)
233-
features.foreachNonZero { (i, v) =>
234-
val std = featuresStd(i)
235-
if (std != 0) array(i) = v / std
236-
}
237-
Instance(label, weight, Vectors.dense(array))
238-
}
239-
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
240-
.persist(StorageLevel.MEMORY_AND_DISK)
241-
.setName(s"training dataset (blockSize=${$(blockSize)})")
242-
243-
val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
244-
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
223+
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
224+
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
245225
$(aggregationDepth))
246226

247227
def regParamL1Fun = (index: Int) => 0D
@@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") (
258238
scaledObjectiveHistory += state.adjustedValue
259239
}
260240

261-
blocks.unpersist()
262241
bcFeaturesStd.destroy()
263242
if (state == null) {
264243
val msg = s"${optimizer.getClass.getName} failed."
@@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") (
289268
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
290269
}
291270

271+
if (handlePersistence) instances.unpersist()
272+
292273
copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
293274
}
294275
}

mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
2828
import org.apache.spark.SparkException
2929
import org.apache.spark.annotation.Since
3030
import org.apache.spark.internal.Logging
31-
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
31+
import org.apache.spark.ml.feature.Instance
3232
import org.apache.spark.ml.linalg._
3333
import org.apache.spark.ml.optim.aggregator.LogisticAggregator
3434
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
5050
*/
5151
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
5252
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
53-
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
54-
with HasBlockSize {
53+
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
5554

5655
import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames
5756

@@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
431430
@Since("2.2.0")
432431
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)
433432

434-
/**
435-
* Set block size for stacking input data in matrices.
436-
* Default is 1024.
437-
*
438-
* @group expertSetParam
439-
*/
440-
@Since("3.0.0")
441-
def setBlockSize(value: Int): this.type = set(blockSize, value)
442-
443433
private def assertBoundConstrainedOptimizationParamsValid(
444434
numCoefficientSets: Int,
445435
numFeatures: Int): Unit = {
@@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
492482
this
493483
}
494484

495-
override protected[spark] def train(
496-
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
485+
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
486+
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
487+
train(dataset, handlePersistence)
488+
}
489+
490+
protected[spark] def train(
491+
dataset: Dataset[_],
492+
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
493+
val instances = extractInstances(dataset)
494+
495+
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
496+
497497
instr.logPipelineStage(this)
498498
instr.logDataset(dataset)
499499
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
500500
probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
501501
fitIntercept)
502502

503-
val sc = dataset.sparkSession.sparkContext
504-
val instances = extractInstances(dataset)
505-
506503
val (summarizer, labelSummarizer) = instances.treeAggregate(
507504
(Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
508505
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
@@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
585582
s"dangerous ground, so the algorithm may not converge.")
586583
}
587584

588-
val featuresMean = summarizer.mean.compressed
589-
val featuresStd = summarizer.std.compressed
590-
val bcFeaturesStd = sc.broadcast(featuresStd)
585+
val featuresMean = summarizer.mean.toArray
586+
val featuresStd = summarizer.std.toArray
591587

592588
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
593589
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
599595
val regParamL1 = $(elasticNetParam) * $(regParam)
600596
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
601597

602-
val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
598+
val bcFeaturesStd = instances.context.broadcast(featuresStd)
599+
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
603600
multinomial = isMultinomial)(_)
604601
val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
605602
featuresStd(j / numCoefficientSets)
@@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
615612
None
616613
}
617614

618-
val standardized = instances.map {
619-
case Instance(label, weight, features) =>
620-
val featuresStd = bcFeaturesStd.value
621-
val array = Array.ofDim[Double](numFeatures)
622-
features.foreachNonZero { (i, v) =>
623-
val std = featuresStd(i)
624-
if (std != 0) array(i) = v / std
625-
}
626-
Instance(label, weight, Vectors.dense(array))
627-
}
628-
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
629-
.persist(StorageLevel.MEMORY_AND_DISK)
630-
.setName(s"training dataset (blockSize=${$(blockSize)})")
631-
632-
val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
615+
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
633616
$(aggregationDepth))
634617

635618
val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
@@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
823806
state = states.next()
824807
arrayBuilder += state.adjustedValue
825808
}
826-
blocks.unpersist()
827809
bcFeaturesStd.destroy()
828810

829811
if (state == null) {
@@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
893875
}
894876
}
895877

878+
if (handlePersistence) instances.unpersist()
879+
896880
val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
897881
numClasses, isMultinomial))
898882

mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
3434

3535
/** Params for Multilayer Perceptron. */
3636
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
37-
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
37+
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
3838

3939
import MultilayerPerceptronClassifier._
4040

@@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
5454
@Since("1.5.0")
5555
final def getLayers: Array[Int] = $(layers)
5656

57+
/**
58+
* Block size for stacking input data in matrices to speed up the computation.
59+
* Data is stacked within partitions. If block size is more than remaining data in
60+
* a partition then it is adjusted to the size of this data.
61+
* Recommended size is between 10 and 1000.
62+
* Default: 128
63+
*
64+
* @group expertParam
65+
*/
66+
@Since("1.5.0")
67+
final val blockSize: IntParam = new IntParam(this, "blockSize",
68+
"Block size for stacking input data in matrices. Data is stacked within partitions." +
69+
" If block size is more than remaining data in a partition then " +
70+
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
71+
ParamValidators.gt(0))
72+
73+
/** @group expertGetParam */
74+
@Since("1.5.0")
75+
final def getBlockSize: Int = $(blockSize)
76+
5777
/**
5878
* The solver algorithm for optimization.
5979
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".

0 commit comments

Comments
 (0)