diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index cb53b561b5b4..905789090d62 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -32,7 +32,7 @@ import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.stat.SummaryBuilderImpl._ +import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.sql.{Dataset, Row} @@ -170,7 +170,7 @@ class LinearSVC @Since("2.2.0") ( regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) val (summarizer, labelSummarizer) = instances.treeAggregate( - (createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( + (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)), combOp = (c1: (SummarizerBuffer, MultiClassSummarizer), diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index f44dffa16028..6f7b92c96b36 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -34,7 +34,7 @@ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.stat.SummaryBuilderImpl._ +import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics} @@ -501,7 +501,7 @@ class LogisticRegression @Since("1.2.0") ( fitIntercept) val (summarizer, labelSummarizer) = instances.treeAggregate( - (createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( + (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)), combOp = (c1: (SummarizerBuffer, MultiClassSummarizer), diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 5e3cb25e02a3..811287ff5db3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -31,7 +31,7 @@ import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.stat.SummaryBuilderImpl._ +import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.util.MLUtils @@ -215,7 +215,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val featuresSummarizer = instances.treeAggregate( - createSummarizerBuffer("mean", "std", "count"))( + Summarizer.createSummarizerBuffer("mean", "std", "count"))( seqOp = (c: SummarizerBuffer, v: AFTPoint) => c.add(v.features), combOp = (c1: SummarizerBuffer, c2: SummarizerBuffer) => c1.merge(c2), depth = $(aggregationDepth) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 8d0cf5c9bd39..64e5e191ffd1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -36,7 +36,7 @@ import org.apache.spark.ml.optim.aggregator.{HuberAggregator, LeastSquaresAggreg import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.stat.SummaryBuilderImpl._ +import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.evaluation.RegressionMetrics @@ -358,8 +358,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val (featuresSummarizer, ySummarizer) = instances.treeAggregate( - (createSummarizerBuffer("mean", "std"), - createSummarizerBuffer("mean", "std", "count")))( + (Summarizer.createSummarizerBuffer("mean", "std"), + Summarizer.createSummarizerBuffer("mean", "std", "count")))( seqOp = (c: (SummarizerBuffer, SummarizerBuffer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(Vectors.dense(instance.label), instance.weight)), diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala index bdc722a03b49..64a03347f061 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala @@ -197,6 +197,11 @@ object Summarizer extends Logging { val c1 = metrics(metric).summary(col, weightCol) c1.getField(metric).as(s"$metric($col)") } + + private[spark] def createSummarizerBuffer(requested: String*): SummarizerBuffer = { + val (metrics, computeMetrics) = getRelevantMetrics(requested) + new SummarizerBuffer(metrics, computeMetrics) + } } private[ml] class SummaryBuilderImpl( @@ -218,7 +223,7 @@ private[ml] class SummaryBuilderImpl( } } -private[ml] object SummaryBuilderImpl extends Logging { +private[spark] object SummaryBuilderImpl extends Logging { def implementedMetrics: Seq[String] = allMetrics.map(_._1).sorted @@ -248,11 +253,6 @@ private[ml] object SummaryBuilderImpl extends Logging { StructType(fields) } - private[ml] def createSummarizerBuffer(requested: String*): SummarizerBuffer = { - val (metrics, computeMetrics) = getRelevantMetrics(requested) - new SummarizerBuffer(metrics, computeMetrics) - } - private val vectorUDT = new VectorUDT /** @@ -304,63 +304,154 @@ private[ml] object SummaryBuilderImpl extends Logging { private[stat] case object ComputeMax extends ComputeMetric private[stat] case object ComputeMin extends ComputeMetric - private[ml] class SummarizerBuffer( + + private case class MetricsAggregate( requestedMetrics: Seq[Metric], - requestedCompMetrics: Seq[ComputeMetric] - ) extends Serializable { - - private var n = 0 - private var currMean: Array[Double] = null - private var currM2n: Array[Double] = null - private var currM2: Array[Double] = null - private var currL1: Array[Double] = null - private var totalCnt: Long = 0 - private var totalWeightSum: Double = 0.0 - private var weightSquareSum: Double = 0.0 - private var currWeightSum: Array[Double] = null - private var nnz: Array[Long] = null - private var currMax: Array[Double] = null - private var currMin: Array[Double] = null - - def this() { - this( - Seq(Mean, Sum, Variance, Std, Count, NumNonZeros, - Max, Min, NormL2, NormL1), - Seq(ComputeMean, ComputeM2n, ComputeM2, ComputeL1, - ComputeWeightSum, ComputeNNZ, ComputeMax, ComputeMin) - ) + requestedComputeMetrics: Seq[ComputeMetric], + featuresExpr: Expression, + weightExpr: Expression, + mutableAggBufferOffset: Int, + inputAggBufferOffset: Int) + extends TypedImperativeAggregate[SummarizerBuffer] with ImplicitCastInputTypes { + + override def eval(state: SummarizerBuffer): Any = { + val metrics = requestedMetrics.map { + case Mean => vectorUDT.serialize(state.mean) + case Sum => vectorUDT.serialize(state.sum) + case Variance => vectorUDT.serialize(state.variance) + case Std => vectorUDT.serialize(state.std) + case Count => state.count + case NumNonZeros => vectorUDT.serialize(state.numNonzeros) + case Max => vectorUDT.serialize(state.max) + case Min => vectorUDT.serialize(state.min) + case NormL2 => vectorUDT.serialize(state.normL2) + case NormL1 => vectorUDT.serialize(state.normL1) + } + InternalRow.apply(metrics: _*) } - /** - * Add a new sample to this summarizer, and update the statistical summary. - */ - def add(instance: Vector, weight: Double): this.type = { - require(weight >= 0.0, s"sample weight, $weight has to be >= 0.0") - if (weight == 0.0) return this - - if (n == 0) { - require(instance.size > 0, s"Vector should have dimension larger than zero.") - n = instance.size - - if (requestedCompMetrics.contains(ComputeMean)) { currMean = Array.ofDim[Double](n) } - if (requestedCompMetrics.contains(ComputeM2n)) { currM2n = Array.ofDim[Double](n) } - if (requestedCompMetrics.contains(ComputeM2)) { currM2 = Array.ofDim[Double](n) } - if (requestedCompMetrics.contains(ComputeL1)) { currL1 = Array.ofDim[Double](n) } - if (requestedCompMetrics.contains(ComputeWeightSum)) { - currWeightSum = Array.ofDim[Double](n) - } - if (requestedCompMetrics.contains(ComputeNNZ)) { nnz = Array.ofDim[Long](n) } - if (requestedCompMetrics.contains(ComputeMax)) { - currMax = Array.fill[Double](n)(Double.MinValue) - } - if (requestedCompMetrics.contains(ComputeMin)) { - currMin = Array.fill[Double](n)(Double.MaxValue) - } + override def inputTypes: Seq[DataType] = vectorUDT :: DoubleType :: Nil + + override def children: Seq[Expression] = featuresExpr :: weightExpr :: Nil + + override def update(state: SummarizerBuffer, row: InternalRow): SummarizerBuffer = { + val features = vectorUDT.deserialize(featuresExpr.eval(row)) + val weight = weightExpr.eval(row).asInstanceOf[Double] + state.add(features, weight) + state + } + + override def merge(state: SummarizerBuffer, + other: SummarizerBuffer): SummarizerBuffer = { + state.merge(other) + } + + override def nullable: Boolean = false + + override def createAggregationBuffer(): SummarizerBuffer + = new SummarizerBuffer(requestedMetrics, requestedComputeMetrics) + + override def serialize(state: SummarizerBuffer): Array[Byte] = { + // TODO: Use ByteBuffer to optimize + val bos = new ByteArrayOutputStream() + val oos = new ObjectOutputStream(bos) + oos.writeObject(state) + bos.toByteArray + } + + override def deserialize(bytes: Array[Byte]): SummarizerBuffer = { + // TODO: Use ByteBuffer to optimize + val bis = new ByteArrayInputStream(bytes) + val ois = new ObjectInputStream(bis) + ois.readObject().asInstanceOf[SummarizerBuffer] + } + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): MetricsAggregate = { + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + } + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): MetricsAggregate = { + copy(inputAggBufferOffset = newInputAggBufferOffset) + } + + override lazy val dataType: DataType = structureForMetrics(requestedMetrics) + + override def prettyName: String = "aggregate_metrics" + + } +} + +private[spark] class SummarizerBuffer( + requestedMetrics: Seq[SummaryBuilderImpl.Metric], + requestedCompMetrics: Seq[SummaryBuilderImpl.ComputeMetric]) extends Serializable { + import SummaryBuilderImpl._ + + private var n = 0 + private var currMean: Array[Double] = null + private var currM2n: Array[Double] = null + private var currM2: Array[Double] = null + private var currL1: Array[Double] = null + private var totalCnt: Long = 0 + private var totalWeightSum: Double = 0.0 + private var weightSquareSum: Double = 0.0 + private var currWeightSum: Array[Double] = null + private var nnz: Array[Long] = null + private var currMax: Array[Double] = null + private var currMin: Array[Double] = null + + def this() { + this( + Seq( + SummaryBuilderImpl.Mean, + SummaryBuilderImpl.Sum, + SummaryBuilderImpl.Variance, + SummaryBuilderImpl.Std, + SummaryBuilderImpl.Count, + SummaryBuilderImpl.NumNonZeros, + SummaryBuilderImpl.Max, + SummaryBuilderImpl.Min, + SummaryBuilderImpl.NormL2, + SummaryBuilderImpl.NormL1), + Seq( + SummaryBuilderImpl.ComputeMean, + SummaryBuilderImpl.ComputeM2n, + SummaryBuilderImpl.ComputeM2, + SummaryBuilderImpl.ComputeL1, + SummaryBuilderImpl.ComputeWeightSum, + SummaryBuilderImpl.ComputeNNZ, + SummaryBuilderImpl.ComputeMax, + SummaryBuilderImpl.ComputeMin) + ) + } + + def add(nonZeroIterator: Iterator[(Int, Double)], size: Int, weight: Double): this.type = { + require(weight >= 0.0, s"sample weight, $weight has to be >= 0.0") + if (weight == 0.0) return this + + if (n == 0) { + require(size > 0, s"Vector should have dimension larger than zero.") + n = size + + if (requestedCompMetrics.contains(ComputeMean)) { currMean = Array.ofDim[Double](n) } + if (requestedCompMetrics.contains(ComputeM2n)) { currM2n = Array.ofDim[Double](n) } + if (requestedCompMetrics.contains(ComputeM2)) { currM2 = Array.ofDim[Double](n) } + if (requestedCompMetrics.contains(ComputeL1)) { currL1 = Array.ofDim[Double](n) } + if (requestedCompMetrics.contains(ComputeWeightSum)) { + currWeightSum = Array.ofDim[Double](n) + } + if (requestedCompMetrics.contains(ComputeNNZ)) { nnz = Array.ofDim[Long](n) } + if (requestedCompMetrics.contains(ComputeMax)) { + currMax = Array.fill[Double](n)(Double.MinValue) } + if (requestedCompMetrics.contains(ComputeMin)) { + currMin = Array.fill[Double](n)(Double.MaxValue) + } + } - require(n == instance.size, s"Dimensions mismatch when adding new sample." + - s" Expecting $n but got ${instance.size}.") + require(n == size, s"Dimensions mismatch when adding new sample." + + s" Expecting $n but got $size.") + if (nonZeroIterator.nonEmpty) { val localCurrMean = currMean val localCurrM2n = currM2n val localCurrM2 = currM2 @@ -369,7 +460,7 @@ private[ml] object SummaryBuilderImpl extends Logging { val localNumNonzeros = nnz val localCurrMax = currMax val localCurrMin = currMin - instance.foreachNonZero { (index, value) => + nonZeroIterator.foreach { case (index, value) => if (localCurrMax != null && localCurrMax(index) < value) { localCurrMax(index) = value } @@ -402,303 +493,234 @@ private[ml] object SummaryBuilderImpl extends Logging { localNumNonzeros(index) += 1 } } - - totalWeightSum += weight - weightSquareSum += weight * weight - totalCnt += 1 - this } - def add(instance: Vector): this.type = add(instance, 1.0) - - /** - * Merge another SummarizerBuffer, and update the statistical summary. - * (Note that it's in place merging; as a result, `this` object will be modified.) - * - * @param other The other MultivariateOnlineSummarizer to be merged. - */ - def merge(other: SummarizerBuffer): this.type = { - if (this.totalWeightSum != 0.0 && other.totalWeightSum != 0.0) { - require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " + - s"Expecting $n but got ${other.n}.") - totalCnt += other.totalCnt - totalWeightSum += other.totalWeightSum - weightSquareSum += other.weightSquareSum - var i = 0 - while (i < n) { - if (currWeightSum != null) { - val thisWeightSum = currWeightSum(i) - val otherWeightSum = other.currWeightSum(i) - val totalWeightSum = thisWeightSum + otherWeightSum - - if (totalWeightSum != 0.0) { - if (currMean != null) { - val deltaMean = other.currMean(i) - currMean(i) - // merge mean together - currMean(i) += deltaMean * otherWeightSum / totalWeightSum - - if (currM2n != null) { - // merge m2n together - currM2n(i) += other.currM2n(i) + - deltaMean * deltaMean * thisWeightSum * otherWeightSum / totalWeightSum - } - } - } - currWeightSum(i) = totalWeightSum - } + totalWeightSum += weight + weightSquareSum += weight * weight + totalCnt += 1 + this + } - // merge m2 together - if (currM2 != null) { currM2(i) += other.currM2(i) } - // merge l1 together - if (currL1 != null) { currL1(i) += other.currL1(i) } - // merge max and min - if (currMax != null) { currMax(i) = math.max(currMax(i), other.currMax(i)) } - if (currMin != null) { currMin(i) = math.min(currMin(i), other.currMin(i)) } - if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) } - i += 1 - } - } else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) { - this.n = other.n - if (other.currMean != null) { this.currMean = other.currMean.clone() } - if (other.currM2n != null) { this.currM2n = other.currM2n.clone() } - if (other.currM2 != null) { this.currM2 = other.currM2.clone() } - if (other.currL1 != null) { this.currL1 = other.currL1.clone() } - this.totalCnt = other.totalCnt - this.totalWeightSum = other.totalWeightSum - this.weightSquareSum = other.weightSquareSum - if (other.currWeightSum != null) { this.currWeightSum = other.currWeightSum.clone() } - if (other.nnz != null) { this.nnz = other.nnz.clone() } - if (other.currMax != null) { this.currMax = other.currMax.clone() } - if (other.currMin != null) { this.currMin = other.currMin.clone() } - } - this - } + /** + * Add a new sample to this summarizer, and update the statistical summary. + */ + def add(instance: Vector, weight: Double): this.type = + add(instance.nonZeroIterator, instance.size, weight) - /** - * Sample mean of each dimension. - */ - def mean: Vector = { - require(requestedMetrics.contains(Mean)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + def add(instance: Vector): this.type = add(instance, 1.0) - val realMean = Array.ofDim[Double](n) + /** + * Merge another SummarizerBuffer, and update the statistical summary. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other MultivariateOnlineSummarizer to be merged. + */ + def merge(other: SummarizerBuffer): this.type = { + if (this.totalWeightSum != 0.0 && other.totalWeightSum != 0.0) { + require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " + + s"Expecting $n but got ${other.n}.") + totalCnt += other.totalCnt + totalWeightSum += other.totalWeightSum + weightSquareSum += other.weightSquareSum var i = 0 while (i < n) { - realMean(i) = currMean(i) * (currWeightSum(i) / totalWeightSum) - i += 1 - } - Vectors.dense(realMean) - } - - /** - * Sum of each dimension. - */ - def sum: Vector = { - require(requestedMetrics.contains(Sum)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + if (currWeightSum != null) { + val thisWeightSum = currWeightSum(i) + val otherWeightSum = other.currWeightSum(i) + val totalWeightSum = thisWeightSum + otherWeightSum + + if (totalWeightSum != 0.0) { + if (currMean != null) { + val deltaMean = other.currMean(i) - currMean(i) + // merge mean together + currMean(i) += deltaMean * otherWeightSum / totalWeightSum + + if (currM2n != null) { + // merge m2n together + currM2n(i) += other.currM2n(i) + + deltaMean * deltaMean * thisWeightSum * otherWeightSum / totalWeightSum + } + } + } + currWeightSum(i) = totalWeightSum + } - val realSum = Array.ofDim[Double](n) - var i = 0 - while (i < n) { - realSum(i) = currMean(i) * currWeightSum(i) + // merge m2 together + if (currM2 != null) { currM2(i) += other.currM2(i) } + // merge l1 together + if (currL1 != null) { currL1(i) += other.currL1(i) } + // merge max and min + if (currMax != null) { currMax(i) = math.max(currMax(i), other.currMax(i)) } + if (currMin != null) { currMin(i) = math.min(currMin(i), other.currMin(i)) } + if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) } i += 1 } - Vectors.dense(realSum) - } - - /** - * Unbiased estimate of sample variance of each dimension. - */ - def variance: Vector = { - require(requestedMetrics.contains(Variance)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - - val realVariance = computeVariance - Vectors.dense(realVariance) - } - - /** - * Unbiased estimate of standard deviation of each dimension. - */ - def std: Vector = { - require(requestedMetrics.contains(Std)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - - val realVariance = computeVariance - Vectors.dense(realVariance.map(math.sqrt)) + } else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) { + this.n = other.n + if (other.currMean != null) { this.currMean = other.currMean.clone() } + if (other.currM2n != null) { this.currM2n = other.currM2n.clone() } + if (other.currM2 != null) { this.currM2 = other.currM2.clone() } + if (other.currL1 != null) { this.currL1 = other.currL1.clone() } + this.totalCnt = other.totalCnt + this.totalWeightSum = other.totalWeightSum + this.weightSquareSum = other.weightSquareSum + if (other.currWeightSum != null) { this.currWeightSum = other.currWeightSum.clone() } + if (other.nnz != null) { this.nnz = other.nnz.clone() } + if (other.currMax != null) { this.currMax = other.currMax.clone() } + if (other.currMin != null) { this.currMin = other.currMin.clone() } } + this + } - private def computeVariance: Array[Double] = { - val realVariance = Array.ofDim[Double](n) - val denominator = totalWeightSum - (weightSquareSum / totalWeightSum) - - // Sample variance is computed, if the denominator is less than 0, the variance is just 0. - if (denominator > 0.0) { - val deltaMean = currMean - var i = 0 - val len = currM2n.length - while (i < len) { - // We prevent variance from negative value caused by numerical error. - realVariance(i) = math.max((currM2n(i) + deltaMean(i) * deltaMean(i) * currWeightSum(i) * - (totalWeightSum - currWeightSum(i)) / totalWeightSum) / denominator, 0.0) - i += 1 - } - } - realVariance + /** + * Sample mean of each dimension. + */ + def mean: Vector = { + require(requestedMetrics.contains(Mean)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + + val realMean = Array.ofDim[Double](n) + var i = 0 + while (i < n) { + realMean(i) = currMean(i) * (currWeightSum(i) / totalWeightSum) + i += 1 } + Vectors.dense(realMean) + } - /** - * Sample size. - */ - def count: Long = totalCnt - - /** - * Sum of weights. - */ - def weightSum: Double = totalWeightSum - - /** - * Number of nonzero elements in each dimension. - * - */ - def numNonzeros: Vector = { - require(requestedMetrics.contains(NumNonZeros)) - require(totalCnt > 0, s"Nothing has been added to this summarizer.") - - Vectors.dense(nnz.map(_.toDouble)) + /** + * Sum of each dimension. + */ + def sum: Vector = { + require(requestedMetrics.contains(Sum)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + + val realSum = Array.ofDim[Double](n) + var i = 0 + while (i < n) { + realSum(i) = currMean(i) * currWeightSum(i) + i += 1 } + Vectors.dense(realSum) + } - /** - * Maximum value of each dimension. - */ - def max: Vector = { - require(requestedMetrics.contains(Max)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - - var i = 0 - while (i < n) { - if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 - i += 1 - } - Vectors.dense(currMax) - } + /** + * Unbiased estimate of sample variance of each dimension. + */ + def variance: Vector = { + require(requestedMetrics.contains(Variance)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - /** - * Minimum value of each dimension. - */ - def min: Vector = { - require(requestedMetrics.contains(Min)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + val realVariance = computeVariance + Vectors.dense(realVariance) + } - var i = 0 - while (i < n) { - if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 - i += 1 - } - Vectors.dense(currMin) - } + /** + * Unbiased estimate of standard deviation of each dimension. + */ + def std: Vector = { + require(requestedMetrics.contains(Std)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - /** - * L2 (Euclidean) norm of each dimension. - */ - def normL2: Vector = { - require(requestedMetrics.contains(NormL2)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + val realVariance = computeVariance + Vectors.dense(realVariance.map(math.sqrt)) + } - val realMagnitude = Array.ofDim[Double](n) + private def computeVariance: Array[Double] = { + val realVariance = Array.ofDim[Double](n) + val denominator = totalWeightSum - (weightSquareSum / totalWeightSum) + // Sample variance is computed, if the denominator is less than 0, the variance is just 0. + if (denominator > 0.0) { + val deltaMean = currMean var i = 0 - val len = currM2.length + val len = currM2n.length while (i < len) { - realMagnitude(i) = math.sqrt(currM2(i)) + // We prevent variance from negative value caused by numerical error. + realVariance(i) = math.max((currM2n(i) + deltaMean(i) * deltaMean(i) * currWeightSum(i) * + (totalWeightSum - currWeightSum(i)) / totalWeightSum) / denominator, 0.0) i += 1 } - Vectors.dense(realMagnitude) - } - - /** - * L1 norm of each dimension. - */ - def normL1: Vector = { - require(requestedMetrics.contains(NormL1)) - require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - - Vectors.dense(currL1) } + realVariance } - private case class MetricsAggregate( - requestedMetrics: Seq[Metric], - requestedComputeMetrics: Seq[ComputeMetric], - featuresExpr: Expression, - weightExpr: Expression, - mutableAggBufferOffset: Int, - inputAggBufferOffset: Int) - extends TypedImperativeAggregate[SummarizerBuffer] with ImplicitCastInputTypes { - - override def eval(state: SummarizerBuffer): Any = { - val metrics = requestedMetrics.map { - case Mean => vectorUDT.serialize(state.mean) - case Sum => vectorUDT.serialize(state.sum) - case Variance => vectorUDT.serialize(state.variance) - case Std => vectorUDT.serialize(state.std) - case Count => state.count - case NumNonZeros => vectorUDT.serialize(state.numNonzeros) - case Max => vectorUDT.serialize(state.max) - case Min => vectorUDT.serialize(state.min) - case NormL2 => vectorUDT.serialize(state.normL2) - case NormL1 => vectorUDT.serialize(state.normL1) - } - InternalRow.apply(metrics: _*) - } + /** + * Sample size. + */ + def count: Long = totalCnt - override def inputTypes: Seq[DataType] = vectorUDT :: DoubleType :: Nil + /** + * Sum of weights. + */ + def weightSum: Double = totalWeightSum - override def children: Seq[Expression] = featuresExpr :: weightExpr :: Nil + /** + * Number of nonzero elements in each dimension. + * + */ + def numNonzeros: Vector = { + require(requestedMetrics.contains(NumNonZeros)) + require(totalCnt > 0, s"Nothing has been added to this summarizer.") - override def update(state: SummarizerBuffer, row: InternalRow): SummarizerBuffer = { - val features = vectorUDT.deserialize(featuresExpr.eval(row)) - val weight = weightExpr.eval(row).asInstanceOf[Double] - state.add(features, weight) - state - } + Vectors.dense(nnz.map(_.toDouble)) + } - override def merge(state: SummarizerBuffer, - other: SummarizerBuffer): SummarizerBuffer = { - state.merge(other) + /** + * Maximum value of each dimension. + */ + def max: Vector = { + require(requestedMetrics.contains(Max)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + + var i = 0 + while (i < n) { + if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 + i += 1 } + Vectors.dense(currMax) + } - override def nullable: Boolean = false - - override def createAggregationBuffer(): SummarizerBuffer - = new SummarizerBuffer(requestedMetrics, requestedComputeMetrics) - - override def serialize(state: SummarizerBuffer): Array[Byte] = { - // TODO: Use ByteBuffer to optimize - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(state) - bos.toByteArray + /** + * Minimum value of each dimension. + */ + def min: Vector = { + require(requestedMetrics.contains(Min)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + + var i = 0 + while (i < n) { + if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 + i += 1 } + Vectors.dense(currMin) + } - override def deserialize(bytes: Array[Byte]): SummarizerBuffer = { - // TODO: Use ByteBuffer to optimize - val bis = new ByteArrayInputStream(bytes) - val ois = new ObjectInputStream(bis) - ois.readObject().asInstanceOf[SummarizerBuffer] - } + /** + * L2 (Euclidean) norm of each dimension. + */ + def normL2: Vector = { + require(requestedMetrics.contains(NormL2)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): MetricsAggregate = { - copy(mutableAggBufferOffset = newMutableAggBufferOffset) - } + val realMagnitude = Array.ofDim[Double](n) - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): MetricsAggregate = { - copy(inputAggBufferOffset = newInputAggBufferOffset) + var i = 0 + val len = currM2.length + while (i < len) { + realMagnitude(i) = math.sqrt(currM2(i)) + i += 1 } + Vectors.dense(realMagnitude) + } - override lazy val dataType: DataType = structureForMetrics(requestedMetrics) - - override def prettyName: String = "aggregate_metrics" + /** + * L1 norm of each dimension. + */ + def normL1: Vector = { + require(requestedMetrics.contains(NormL1)) + require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") + Vectors.dense(currL1) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala index e5c2d0d85149..b697d2746ce7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.evaluation import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} +import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} @@ -57,18 +57,17 @@ class RegressionMetrics @Since("2.0.0") ( }) /** - * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors. + * Use SummarizerBuffer to calculate summary statistics of observations and errors. */ - private lazy val summary: MultivariateStatisticalSummary = { - predictionAndObservations.map { + private lazy val summary = { + val weightedVectors = predictionAndObservations.map { case (prediction: Double, observation: Double, weight: Double) => (Vectors.dense(observation, observation - prediction, prediction), weight) case (prediction: Double, observation: Double) => (Vectors.dense(observation, observation - prediction, prediction), 1.0) - }.treeAggregate(new MultivariateOnlineSummarizer())( - (summary, sample) => summary.add(sample._1, sample._2), - (sum1, sum2) => sum1.merge(sum2) - ) + } + Statistics.colStats(weightedVectors, + Seq("mean", "normL1", "normL2", "variance")) } private lazy val SSy = math.pow(summary.normL2(0), 2) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala index 78a783a78772..356ed48e9938 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -46,9 +46,11 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { s"source vector size $numFeatures must be no less than k=$k") val mat = if (numFeatures > 65535) { - val meanVector = Statistics.colStats(sources).mean.asBreeze - val meanCentredRdd = sources.map { rowVector => - Vectors.fromBreeze(rowVector.asBreeze - meanVector) + val summary = Statistics.colStats(sources.map((_, 1.0)), Seq("mean")) + val mean = Vectors.fromML(summary.mean) + val meanCentredRdd = sources.map { row => + BLAS.axpy(-1, mean, row) + row } new RowMatrix(meanCentredRdd) } else { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 7286733934ad..21e01ef36814 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.{StandardScalerModel => NewStandardScalerModel} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD /** @@ -55,12 +55,11 @@ class StandardScaler @Since("1.1.0") (withMean: Boolean, withStd: Boolean) exten @Since("1.1.0") def fit(data: RDD[Vector]): StandardScalerModel = { // TODO: skip computation if both withMean and withStd are false - val summary = data.treeAggregate(new MultivariateOnlineSummarizer)( - (aggregator, data) => aggregator.add(data), - (aggregator1, aggregator2) => aggregator1.merge(aggregator2)) + val summary = Statistics.colStats(data.map((_, 1.0)), Seq("mean", "std")) + new StandardScalerModel( - Vectors.dense(summary.variance.toArray.map(v => math.sqrt(v))), - summary.mean, + Vectors.fromML(summary.std), + Vectors.fromML(summary.mean), withStd, withMean) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 89d9ce852d51..20e26cee9e0d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -29,7 +29,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.mllib.linalg._ -import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} +import org.apache.spark.mllib.stat._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.random.XORShiftRandom @@ -433,11 +433,11 @@ class RowMatrix @Since("1.0.0") ( val n = numCols().toInt checkNumColumns(n) - val summary = computeColumnSummaryStatistics() + val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("count", "mean")) val m = summary.count require(m > 1, s"RowMatrix.computeCovariance called on matrix with only $m rows." + " Cannot compute the covariance of a RowMatrix with <= 1 row.") - val mean = summary.mean + val mean = Vectors.fromML(summary.mean) if (rows.first().isInstanceOf[DenseVector]) { computeDenseVectorCovariance(mean, n, m) @@ -616,7 +616,8 @@ class RowMatrix @Since("1.0.0") ( 10 * math.log(numCols()) / threshold } - columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) + val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("normL2")) + columnSimilaritiesDIMSUM(summary.normL2.toArray, gamma) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 5ebbfb2b6298..d5f34c2630a4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -21,6 +21,7 @@ import scala.annotation.varargs import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} +import org.apache.spark.ml.stat._ import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.regression.LabeledPoint @@ -46,6 +47,21 @@ object Statistics { new RowMatrix(X).computeColumnSummaryStatistics() } + /** + * Computes required column-wise summary statistics for the input RDD[(Vector, Double)]. + * + * @param X an RDD containing vectors and weights for which column-wise summary statistics + * are to be computed. + * @return [[SummarizerBuffer]] object containing column-wise summary statistics. + */ + private[mllib] def colStats(X: RDD[(Vector, Double)], requested: Seq[String]) = { + X.treeAggregate(Summarizer.createSummarizerBuffer(requested: _*))( + seqOp = { case (c, (v, w)) => c.add(v.nonZeroIterator, v.size, w) }, + combOp = { case (c1, c2) => c1.merge(c2) }, + depth = 2 + ) + } + /** * Compute the Pearson correlation matrix for the input RDD of Vectors. * Columns with 0 covariance produce NaN entries in the correlation matrix.