Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
Expand Down Expand Up @@ -340,8 +341,9 @@ class LogisticRegression @Since("1.2.0") (
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), featuresStd, featuresMean, regParamL2)
$(standardization), bcFeaturesStd, regParamL2)

val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
Expand Down Expand Up @@ -436,6 +438,7 @@ class LogisticRegression @Since("1.2.0") (
rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
i += 1
}
bcFeaturesStd.destroy(blocking = false)

if ($(fitIntercept)) {
(Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
Expand Down Expand Up @@ -932,11 +935,15 @@ class BinaryLogisticRegressionSummary private[classification] (
* Two LogisticAggregator can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param numClasses the number of possible outcomes for k classes classification problem in
* Multinomial Logistic Regression.
* @param fitIntercept Whether to fit an intercept term.
*/
private class LogisticAggregator(
val bcCoefficients: Broadcast[Vector],
val bcFeaturesStd: Broadcast[Array[Double]],
private val numFeatures: Int,
numClasses: Int,
fitIntercept: Boolean) extends Serializable {
Expand All @@ -952,29 +959,26 @@ private class LogisticAggregator(
* of the objective function.
*
* @param instance The instance of data point to be added.
* @param coefficients The coefficients corresponding to the features.
* @param featuresStd The standard deviation values of the features.
* @return This LogisticAggregator object.
*/
def add(
instance: Instance,
coefficients: Vector,
featuresStd: Array[Double]): this.type = {
def add(instance: Instance): this.type = {
instance match { case Instance(label, weight, features) =>
require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

if (weight == 0.0) return this

val coefficientsArray = coefficients match {
val coefficientsArray = bcCoefficients.value match {
case dv: DenseVector => dv.values
case _ =>
throw new IllegalArgumentException(
s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
"coefficients only supports dense vector" +
s"but got type ${bcCoefficients.value.getClass}.")
}
val localGradientSumArray = gradientSumArray

val featuresStd = bcFeaturesStd.value
numClasses match {
case 2 =>
// For Binary Logistic Regression.
Expand Down Expand Up @@ -1071,24 +1075,23 @@ private class LogisticCostFun(
numClasses: Int,
fitIntercept: Boolean,
standardization: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double) extends DiffFunction[BDV[Double]] {

val featuresStd = bcFeaturesStd.value

override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val numFeatures = featuresStd.length
val coeffs = Vectors.fromBreeze(coefficients)
val bcCoeffs = instances.context.broadcast(coeffs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explicitly destroy bcCoeffs at the end of calculate by bcCoeffs.destroy(blocking = false) for each iteration.

val n = coeffs.size
val localFeaturesStd = featuresStd


val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) =>
c.add(instance, coeffs, localFeaturesStd)
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)

instances.treeAggregate(
new LogisticAggregator(numFeatures, numClasses, fitIntercept)
new LogisticAggregator(bcCoeffs, bcFeaturesStd, numFeatures, numClasses, fitIntercept)
)(seqOp, combOp)
}

Expand Down Expand Up @@ -1128,6 +1131,7 @@ private class LogisticCostFun(
}
0.5 * regParamL2 * sum
}
bcCoeffs.destroy(blocking = false)

(logisticAggregator.loss + regVal, new BDV(totalGradientArray))
}
Expand Down