From 6441f929f2e302b1c11cc53aaef10598c7397deb Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 12 Nov 2014 23:08:13 -0800 Subject: [PATCH 1/6] Finished SPARK-4431 --- .../apache/spark/mllib/linalg/Vectors.scala | 98 +++++++++++++++++++ .../stat/MultivariateOnlineSummarizer.scala | 45 +++------ .../spark/mllib/linalg/VectorsSuite.scala | 76 ++++++++++++++ 3 files changed, 189 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 60ab2aaa8f27a..26bd004424858 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -76,6 +76,22 @@ sealed trait Vector extends Serializable { def copy: Vector = { throw new NotImplementedError(s"copy is not implemented for ${this.getClass}.") } + + /** + * It will return the iterator for the active elements of dense and sparse vector as + * (index, value) pair. Note that foreach method can be overridden for better performance + * in different vector implementation. + * + * @param skippingZeros Skipping zero elements explicitly if true. It will be useful when we + * iterator through dense vector having lots of zero elements which + * we want to skip. Default is false. + * @return Iterator[(Int, Double)] where the first element in the tuple is the index, + * and the second element is the corresponding value. + */ + private[spark] def activeIterator(skippingZeros: Boolean): Iterator[(Int, Double)] + + private[spark] def activeIterator: Iterator[(Int, Double)] = activeIterator(false) + } /** @@ -273,6 +289,47 @@ class DenseVector(val values: Array[Double]) extends Vector { override def copy: DenseVector = { new DenseVector(values.clone()) } + + private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { + private var i = 0 + private val valuesSize = values.size + + // If zeros are asked to be explicitly skipped, the parent `size` method is called to count + // the number of nonzero elements using `hasNext` and `next` methods. + override lazy val size: Int = if (skippingZeros) super.size else valuesSize + + override def hasNext = { + if (skippingZeros) { + var found = false + while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1 + } + i < valuesSize + } + + override def next = { + val result = (i, values(i)) + i += 1 + result + } + + override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { + var i = 0 + if (skippingZeros) { + while (i < valuesSize) { + if (values(i) != 0.0) { + f(i, values(i)) + } + i += 1 + } + } else { + while (i < valuesSize) { + f(i, values(i)) + i += 1 + } + } + } + } + } /** @@ -309,4 +366,45 @@ class SparseVector( } private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) + + private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { + private var i = 0 + private val valuesSize = values.size + + // If zeros are asked to be explicitly skipped, the parent `size` method is called to count + // the number of nonzero elements using `hasNext` and `next` methods. + override lazy val size: Int = if (skippingZeros) super.size else valuesSize + + def hasNext = { + if (skippingZeros) { + var found = false + while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1 + } + i < valuesSize + } + + def next = { + val result = (indices(i), values(i)) + i += 1 + result + } + + override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { + var i = 0 + if (skippingZeros) { + while (i < valuesSize) { + if (values(i) != 0.0) { + f(indices(i), values(i)) + } + i += 1 + } + } else { + while (i < valuesSize) { + f(indices(i), values(i)) + i += 1 + } + } + } + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 654479ac2dd4f..55f93bc1b52f4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -53,23 +53,21 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S * Adds input value to position i. */ private[this] def add(i: Int, value: Double) = { - if (value != 0.0) { - if (currMax(i) < value) { - currMax(i) = value - } - if (currMin(i) > value) { - currMin(i) = value - } + if (currMax(i) < value) { + currMax(i) = value + } + if (currMin(i) > value) { + currMin(i) = value + } - val prevMean = currMean(i) - val diff = value - prevMean - currMean(i) = prevMean + diff / (nnz(i) + 1.0) - currM2n(i) += (value - currMean(i)) * diff - currM2(i) += value * value - currL1(i) += math.abs(value) + val prevMean = currMean(i) + val diff = value - prevMean + currMean(i) = prevMean + diff / (nnz(i) + 1.0) + currM2n(i) += (value - currMean(i)) * diff + currM2(i) += value * value + currL1(i) += math.abs(value) - nnz(i) += 1.0 - } + nnz(i) += 1.0 } /** @@ -95,21 +93,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S require(n == sample.size, s"Dimensions mismatch when adding new sample." + s" Expecting $n but got ${sample.size}.") - sample match { - case dv: DenseVector => { - var j = 0 - while (j < dv.size) { - add(j, dv.values(j)) - j += 1 - } - } - case sv: SparseVector => - var j = 0 - while (j < sv.indices.size) { - add(sv.indices(j), sv.values(j)) - j += 1 - } - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + sample.activeIterator(true).foreach { + case (index, value) => add(index, value) } totalCnt += 1 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 59cd85eab27d0..0b8bbc670b7cd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -173,4 +173,80 @@ class VectorsSuite extends FunSuite { val v = Vectors.fromBreeze(x(::, 0)) assert(v.size === x.rows) } + + test("activeIterator") { + val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0) + val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0))) + + // Testing if the size of iterator is correct when the zeros are explicitly skipped. + // The default setting will not skip any zero explicitly. + assert(dv.activeIterator.size === 4) + assert(dv.activeIterator(false).size === 4) + assert(dv.activeIterator(true).size === 2) + + assert(sv.activeIterator.size === 3) + assert(sv.activeIterator(false).size === 3) + assert(sv.activeIterator(true).size === 2) + + // Testing `hasNext` and `next` + val dvIter1 = dv.activeIterator(false) + assert(dvIter1.hasNext === true && dvIter1.next === (0, 0.0)) + assert(dvIter1.hasNext === true && dvIter1.next === (1, 1.2)) + assert(dvIter1.hasNext === true && dvIter1.next === (2, 3.1)) + assert(dvIter1.hasNext === true && dvIter1.next === (3, 0.0)) + assert(dvIter1.hasNext === false) + + val dvIter2 = dv.activeIterator(true) + assert(dvIter2.hasNext === true && dvIter2.next === (1, 1.2)) + assert(dvIter2.hasNext === true && dvIter2.next === (2, 3.1)) + assert(dvIter2.hasNext === false) + + val svIter1 = sv.activeIterator(false) + assert(svIter1.hasNext === true && svIter1.next === (1, 1.2)) + assert(svIter1.hasNext === true && svIter1.next === (2, 3.1)) + assert(svIter1.hasNext === true && svIter1.next === (3, 0.0)) + assert(svIter1.hasNext === false) + + val svIter2 = sv.activeIterator(true) + assert(svIter2.hasNext === true && svIter2.next === (1, 1.2)) + assert(svIter2.hasNext === true && svIter2.next === (2, 3.1)) + assert(svIter2.hasNext === false) + + // Testing `foreach` + val dvMap1 = scala.collection.mutable.Map[Int, Double]() + dvIter1.foreach{ + case (index, value) => dvMap1.put(index, value) + } + assert(dvMap1.size === 4) + assert(dvMap1.get(0) === Some(0.0)) + assert(dvMap1.get(1) === Some(1.2)) + assert(dvMap1.get(2) === Some(3.1)) + assert(dvMap1.get(3) === Some(0.0)) + + val dvMap2 = scala.collection.mutable.Map[Int, Double]() + dvIter2.foreach{ + case (index, value) => dvMap2.put(index, value) + } + assert(dvMap2.size === 2) + assert(dvMap2.get(1) === Some(1.2)) + assert(dvMap2.get(2) === Some(3.1)) + + val svMap1 = scala.collection.mutable.Map[Int, Double]() + dvIter1.foreach{ + case (index, value) => svMap1.put(index, value) + } + assert(svMap1.size === 4) + assert(svMap1.get(1) === Some(1.2)) + assert(svMap1.get(2) === Some(3.1)) + assert(svMap1.get(3) === Some(0.0)) + + val svMap2 = scala.collection.mutable.Map[Int, Double]() + svIter2.foreach{ + case (index, value) => svMap2.put(index, value) + } + assert(svMap2.size === 2) + assert(svMap2.get(1) === Some(1.2)) + assert(svMap2.get(2) === Some(3.1)) + + } } From c0cbd5a26c099a3d74534268fcb6514b2fe0fcfd Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Sun, 16 Nov 2014 22:52:10 -0800 Subject: [PATCH 2/6] fix a bug --- .../scala/org/apache/spark/mllib/linalg/VectorsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 0b8bbc670b7cd..2d7c13b7acbb4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -232,10 +232,10 @@ class VectorsSuite extends FunSuite { assert(dvMap2.get(2) === Some(3.1)) val svMap1 = scala.collection.mutable.Map[Int, Double]() - dvIter1.foreach{ + svIter1.foreach{ case (index, value) => svMap1.put(index, value) } - assert(svMap1.size === 4) + assert(svMap1.size === 3) assert(svMap1.get(1) === Some(1.2)) assert(svMap1.get(2) === Some(3.1)) assert(svMap1.get(3) === Some(0.0)) From 98448bbcbbab628c2722496fed065229ca984640 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 18 Nov 2014 15:28:21 -0800 Subject: [PATCH 3/6] Made the override final, and had a local copy of variables which made the accessing a single step operation. --- .../apache/spark/mllib/linalg/Vectors.scala | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 26bd004424858..a7d2624a817f2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -292,38 +292,40 @@ class DenseVector(val values: Array[Double]) extends Vector { private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { private var i = 0 - private val valuesSize = values.size // If zeros are asked to be explicitly skipped, the parent `size` method is called to count // the number of nonzero elements using `hasNext` and `next` methods. - override lazy val size: Int = if (skippingZeros) super.size else valuesSize + final override lazy val size: Int = if (skippingZeros) super.size else values.size - override def hasNext = { + final override def hasNext = { if (skippingZeros) { var found = false - while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1 + while (!found && i < values.size) if (values(i) != 0.0) found = true else i += 1 } - i < valuesSize + i < values.size } - override def next = { + final override def next = { val result = (i, values(i)) i += 1 result } - override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { + final override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { var i = 0 + val localValuesSize = values.size + val localValues = values + if (skippingZeros) { - while (i < valuesSize) { - if (values(i) != 0.0) { - f(i, values(i)) + while (i < localValuesSize) { + if (localValues(i) != 0.0) { + f(i, localValues(i)) } i += 1 } } else { - while (i < valuesSize) { - f(i, values(i)) + while (i < localValuesSize) { + f(i, localValues(i)) i += 1 } } @@ -369,38 +371,41 @@ class SparseVector( private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { private var i = 0 - private val valuesSize = values.size // If zeros are asked to be explicitly skipped, the parent `size` method is called to count // the number of nonzero elements using `hasNext` and `next` methods. - override lazy val size: Int = if (skippingZeros) super.size else valuesSize + final override lazy val size: Int = if (skippingZeros) super.size else values.size - def hasNext = { + final override def hasNext = { if (skippingZeros) { var found = false - while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1 + while (!found && i < values.size) if (values(i) != 0.0) found = true else i += 1 } - i < valuesSize + i < values.size } - def next = { + final override def next = { val result = (indices(i), values(i)) i += 1 result } - override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { + final override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { var i = 0 + val localValuesSize = values.size + val localIndices = indices + val localValues = values + if (skippingZeros) { - while (i < valuesSize) { - if (values(i) != 0.0) { - f(indices(i), values(i)) + while (i < localValuesSize) { + if (localValues(i) != 0.0) { + f(localIndices(i), localValues(i)) } i += 1 } } else { - while (i < valuesSize) { - f(indices(i), values(i)) + while (i < localValuesSize) { + f(localIndices(i), localValues(i)) i += 1 } } From 1907ae122ac0f385e5c408b827bd438e209cd71e Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 20 Nov 2014 14:49:27 -0800 Subject: [PATCH 4/6] address feedback --- .../apache/spark/mllib/linalg/Vectors.scala | 119 +++++------------- .../stat/MultivariateOnlineSummarizer.scala | 4 +- .../spark/mllib/linalg/VectorsSuite.scala | 67 ++++------ 3 files changed, 60 insertions(+), 130 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index a7d2624a817f2..f340f6feae30e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -78,20 +78,15 @@ sealed trait Vector extends Serializable { } /** - * It will return the iterator for the active elements of dense and sparse vector as - * (index, value) pair. Note that foreach method can be overridden for better performance - * in different vector implementation. + * Applies a function `f` to all the active elements of dense and sparse vector. * - * @param skippingZeros Skipping zero elements explicitly if true. It will be useful when we - * iterator through dense vector having lots of zero elements which - * we want to skip. Default is false. - * @return Iterator[(Int, Double)] where the first element in the tuple is the index, - * and the second element is the corresponding value. + * @param f the function takes (Int, Double) as input where the first element + * in the tuple is the index, and the second element is the corresponding value. + * @param skippingZeros if true, skipping zero elements explicitly. It will be useful when + * iterating through dense vector which has lots of zero elements to be + * skipped. Default is false. */ - private[spark] def activeIterator(skippingZeros: Boolean): Iterator[(Int, Double)] - - private[spark] def activeIterator: Iterator[(Int, Double)] = activeIterator(false) - + private[spark] def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) } /** @@ -290,48 +285,25 @@ class DenseVector(val values: Array[Double]) extends Vector { new DenseVector(values.clone()) } - private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { - private var i = 0 - - // If zeros are asked to be explicitly skipped, the parent `size` method is called to count - // the number of nonzero elements using `hasNext` and `next` methods. - final override lazy val size: Int = if (skippingZeros) super.size else values.size - - final override def hasNext = { - if (skippingZeros) { - var found = false - while (!found && i < values.size) if (values(i) != 0.0) found = true else i += 1 - } - i < values.size - } - - final override def next = { - val result = (i, values(i)) - i += 1 - result - } + private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + var i = 0 + val localValuesSize = values.size + val localValues = values - final override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { - var i = 0 - val localValuesSize = values.size - val localValues = values - - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(i, localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { + if (skippingZeros) { + while (i < localValuesSize) { + if (localValues(i) != 0.0) { f(i, localValues(i)) - i += 1 } + i += 1 + } + } else { + while (i < localValuesSize) { + f(i, localValues(i)) + i += 1 } } } - } /** @@ -369,47 +341,24 @@ class SparseVector( private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) - private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] { - private var i = 0 - - // If zeros are asked to be explicitly skipped, the parent `size` method is called to count - // the number of nonzero elements using `hasNext` and `next` methods. - final override lazy val size: Int = if (skippingZeros) super.size else values.size - - final override def hasNext = { - if (skippingZeros) { - var found = false - while (!found && i < values.size) if (values(i) != 0.0) found = true else i += 1 - } - i < values.size - } - - final override def next = { - val result = (indices(i), values(i)) - i += 1 - result - } + private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + var i = 0 + val localValuesSize = values.size + val localIndices = indices + val localValues = values - final override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) { - var i = 0 - val localValuesSize = values.size - val localIndices = indices - val localValues = values - - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(localIndices(i), localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { + if (skippingZeros) { + while (i < localValuesSize) { + if (localValues(i) != 0.0) { f(localIndices(i), localValues(i)) - i += 1 } + i += 1 + } + } else { + while (i < localValuesSize) { + f(localIndices(i), localValues(i)) + i += 1 } } } - } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 55f93bc1b52f4..417564b597914 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -93,9 +93,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S require(n == sample.size, s"Dimensions mismatch when adding new sample." + s" Expecting $n but got ${sample.size}.") - sample.activeIterator(true).foreach { - case (index, value) => add(index, value) - } + sample.foreach(true)(x => add(x._1, x._2)) totalCnt += 1 this diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 2d7c13b7acbb4..81e886b4bd4ee 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -174,47 +174,22 @@ class VectorsSuite extends FunSuite { assert(v.size === x.rows) } - test("activeIterator") { + test("foreach") { val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0) val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0))) - // Testing if the size of iterator is correct when the zeros are explicitly skipped. - // The default setting will not skip any zero explicitly. - assert(dv.activeIterator.size === 4) - assert(dv.activeIterator(false).size === 4) - assert(dv.activeIterator(true).size === 2) - - assert(sv.activeIterator.size === 3) - assert(sv.activeIterator(false).size === 3) - assert(sv.activeIterator(true).size === 2) - - // Testing `hasNext` and `next` - val dvIter1 = dv.activeIterator(false) - assert(dvIter1.hasNext === true && dvIter1.next === (0, 0.0)) - assert(dvIter1.hasNext === true && dvIter1.next === (1, 1.2)) - assert(dvIter1.hasNext === true && dvIter1.next === (2, 3.1)) - assert(dvIter1.hasNext === true && dvIter1.next === (3, 0.0)) - assert(dvIter1.hasNext === false) - - val dvIter2 = dv.activeIterator(true) - assert(dvIter2.hasNext === true && dvIter2.next === (1, 1.2)) - assert(dvIter2.hasNext === true && dvIter2.next === (2, 3.1)) - assert(dvIter2.hasNext === false) - - val svIter1 = sv.activeIterator(false) - assert(svIter1.hasNext === true && svIter1.next === (1, 1.2)) - assert(svIter1.hasNext === true && svIter1.next === (2, 3.1)) - assert(svIter1.hasNext === true && svIter1.next === (3, 0.0)) - assert(svIter1.hasNext === false) - - val svIter2 = sv.activeIterator(true) - assert(svIter2.hasNext === true && svIter2.next === (1, 1.2)) - assert(svIter2.hasNext === true && svIter2.next === (2, 3.1)) - assert(svIter2.hasNext === false) - - // Testing `foreach` + val dvMap0 = scala.collection.mutable.Map[Int, Double]() + dv.foreach() { + case (index: Int, value: Double) => dvMap0.put(index, value) + } + assert(dvMap0.size === 4) + assert(dvMap0.get(0) === Some(0.0)) + assert(dvMap0.get(1) === Some(1.2)) + assert(dvMap0.get(2) === Some(3.1)) + assert(dvMap0.get(3) === Some(0.0)) + val dvMap1 = scala.collection.mutable.Map[Int, Double]() - dvIter1.foreach{ + dv.foreach(false) { case (index, value) => dvMap1.put(index, value) } assert(dvMap1.size === 4) @@ -223,16 +198,25 @@ class VectorsSuite extends FunSuite { assert(dvMap1.get(2) === Some(3.1)) assert(dvMap1.get(3) === Some(0.0)) - val dvMap2 = scala.collection.mutable.Map[Int, Double]() - dvIter2.foreach{ + val dvMap2 = scala.collection .mutable.Map[Int, Double]() + dv.foreach(true) { case (index, value) => dvMap2.put(index, value) } assert(dvMap2.size === 2) assert(dvMap2.get(1) === Some(1.2)) assert(dvMap2.get(2) === Some(3.1)) + val svMap0 = scala.collection.mutable.Map[Int, Double]() + sv.foreach() { + case (index, value) => svMap0.put(index, value) + } + assert(svMap0.size === 3) + assert(svMap0.get(1) === Some(1.2)) + assert(svMap0.get(2) === Some(3.1)) + assert(svMap0.get(3) === Some(0.0)) + val svMap1 = scala.collection.mutable.Map[Int, Double]() - svIter1.foreach{ + sv.foreach(false) { case (index, value) => svMap1.put(index, value) } assert(svMap1.size === 3) @@ -241,12 +225,11 @@ class VectorsSuite extends FunSuite { assert(svMap1.get(3) === Some(0.0)) val svMap2 = scala.collection.mutable.Map[Int, Double]() - svIter2.foreach{ + sv.foreach(true) { case (index, value) => svMap2.put(index, value) } assert(svMap2.size === 2) assert(svMap2.get(1) === Some(1.2)) assert(svMap2.get(2) === Some(3.1)) - } } From 03dd6938e853b1fcc5c4d26fb1decc375ce5ab8c Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 21 Nov 2014 15:04:00 -0800 Subject: [PATCH 5/6] futher performance tunning. --- .../apache/spark/mllib/linalg/Vectors.scala | 44 ++----- .../stat/MultivariateOnlineSummarizer.scala | 110 +++++++++--------- .../spark/mllib/linalg/VectorsSuite.scala | 67 +++-------- 3 files changed, 78 insertions(+), 143 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index f340f6feae30e..c6d5fe5bc678c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -80,13 +80,11 @@ sealed trait Vector extends Serializable { /** * Applies a function `f` to all the active elements of dense and sparse vector. * - * @param f the function takes (Int, Double) as input where the first element - * in the tuple is the index, and the second element is the corresponding value. - * @param skippingZeros if true, skipping zero elements explicitly. It will be useful when - * iterating through dense vector which has lots of zero elements to be - * skipped. Default is false. + * @param f the function takes two parameters where the first parameter is the index of + * the vector with type `Int`, and the second parameter is the corresponding value + * with type `Double`. */ - private[spark] def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) + private[spark] def foreachActive(f: (Int, Double) => Unit) } /** @@ -285,23 +283,14 @@ class DenseVector(val values: Array[Double]) extends Vector { new DenseVector(values.clone()) } - private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + private[spark] override def foreachActive(f: (Int, Double) => Unit) = { var i = 0 val localValuesSize = values.size val localValues = values - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(i, localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { - f(i, localValues(i)) - i += 1 - } + while (i < localValuesSize) { + f(i, localValues(i)) + i += 1 } } } @@ -341,24 +330,15 @@ class SparseVector( private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) - private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + private[spark] override def foreachActive(f: (Int, Double) => Unit) = { var i = 0 val localValuesSize = values.size val localIndices = indices val localValues = values - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(localIndices(i), localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { - f(localIndices(i), localValues(i)) - i += 1 - } + while (i < localValuesSize) { + f(localIndices(i), localValues(i)) + i += 1 } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 417564b597914..198b66a005962 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -17,10 +17,8 @@ package org.apache.spark.mllib.stat -import breeze.linalg.{DenseVector => BDV} - import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector} +import org.apache.spark.mllib.linalg.{Vectors, Vector} /** * :: DeveloperApi :: @@ -40,35 +38,14 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable { private var n = 0 - private var currMean: BDV[Double] = _ - private var currM2n: BDV[Double] = _ - private var currM2: BDV[Double] = _ - private var currL1: BDV[Double] = _ + private var currMean: Array[Double] = _ + private var currM2n: Array[Double] = _ + private var currM2: Array[Double] = _ + private var currL1: Array[Double] = _ private var totalCnt: Long = 0 - private var nnz: BDV[Double] = _ - private var currMax: BDV[Double] = _ - private var currMin: BDV[Double] = _ - - /** - * Adds input value to position i. - */ - private[this] def add(i: Int, value: Double) = { - if (currMax(i) < value) { - currMax(i) = value - } - if (currMin(i) > value) { - currMin(i) = value - } - - val prevMean = currMean(i) - val diff = value - prevMean - currMean(i) = prevMean + diff / (nnz(i) + 1.0) - currM2n(i) += (value - currMean(i)) * diff - currM2(i) += value * value - currL1(i) += math.abs(value) - - nnz(i) += 1.0 - } + private var nnz: Array[Double] = _ + private var currMax: Array[Double] = _ + private var currMin: Array[Double] = _ /** * Add a new sample to this summarizer, and update the statistical summary. @@ -81,19 +58,37 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S require(sample.size > 0, s"Vector should have dimension larger than zero.") n = sample.size - currMean = BDV.zeros[Double](n) - currM2n = BDV.zeros[Double](n) - currM2 = BDV.zeros[Double](n) - currL1 = BDV.zeros[Double](n) - nnz = BDV.zeros[Double](n) - currMax = BDV.fill(n)(Double.MinValue) - currMin = BDV.fill(n)(Double.MaxValue) + currMean = Array.ofDim[Double](n) + currM2n = Array.ofDim[Double](n) + currM2 = Array.ofDim[Double](n) + currL1 = Array.ofDim[Double](n) + nnz = Array.ofDim[Double](n) + currMax = Array.fill[Double](n)(Double.MinValue) + currMin = Array.fill[Double](n)(Double.MaxValue) } require(n == sample.size, s"Dimensions mismatch when adding new sample." + s" Expecting $n but got ${sample.size}.") - sample.foreach(true)(x => add(x._1, x._2)) + sample.foreachActive((index, value) => { + if(value != 0.0){ + if (currMax(index) < value) { + currMax(index) = value + } + if (currMin(index) > value) { + currMin(index) = value + } + + val prevMean = currMean(index) + val diff = value - prevMean + currMean(index) = prevMean + diff / (nnz(index) + 1.0) + currM2n(index) += (value - currMean(index)) * diff + currM2(index) += value * value + currL1(index) += math.abs(value) + + nnz(index) += 1.0 + } + }) totalCnt += 1 this @@ -135,14 +130,14 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } } else if (totalCnt == 0 && other.totalCnt != 0) { this.n = other.n - this.currMean = other.currMean.copy - this.currM2n = other.currM2n.copy - this.currM2 = other.currM2.copy - this.currL1 = other.currL1.copy + this.currMean = other.currMean.clone + this.currM2n = other.currM2n.clone + this.currM2 = other.currM2.clone + this.currL1 = other.currL1.clone this.totalCnt = other.totalCnt - this.nnz = other.nnz.copy - this.currMax = other.currMax.copy - this.currMin = other.currMin.copy + this.nnz = other.nnz.clone + this.currMax = other.currMax.clone + this.currMin = other.currMin.clone } this } @@ -150,19 +145,19 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S override def mean: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realMean = BDV.zeros[Double](n) + val realMean = Array.ofDim[Double](n) var i = 0 while (i < n) { realMean(i) = currMean(i) * (nnz(i) / totalCnt) i += 1 } - Vectors.fromBreeze(realMean) + Vectors.dense(realMean) } override def variance: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realVariance = BDV.zeros[Double](n) + val realVariance = Array.ofDim[Double](n) val denominator = totalCnt - 1.0 @@ -177,8 +172,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S i += 1 } } - - Vectors.fromBreeze(realVariance) + Vectors.dense(realVariance) } override def count: Long = totalCnt @@ -186,7 +180,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S override def numNonzeros: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - Vectors.fromBreeze(nnz) + Vectors.dense(nnz) } override def max: Vector = { @@ -197,7 +191,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 i += 1 } - Vectors.fromBreeze(currMax) + Vectors.dense(currMax) } override def min: Vector = { @@ -208,25 +202,25 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 i += 1 } - Vectors.fromBreeze(currMin) + Vectors.dense(currMin) } override def normL2: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realMagnitude = BDV.zeros[Double](n) + val realMagnitude = Array.ofDim[Double](n) var i = 0 while (i < currM2.size) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } - - Vectors.fromBreeze(realMagnitude) + Vectors.dense(realMagnitude) } override def normL1: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - Vectors.fromBreeze(currL1) + + Vectors.dense(currL1) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 81e886b4bd4ee..41b6d7f518e2d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -178,58 +178,19 @@ class VectorsSuite extends FunSuite { val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0) val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0))) - val dvMap0 = scala.collection.mutable.Map[Int, Double]() - dv.foreach() { - case (index: Int, value: Double) => dvMap0.put(index, value) - } - assert(dvMap0.size === 4) - assert(dvMap0.get(0) === Some(0.0)) - assert(dvMap0.get(1) === Some(1.2)) - assert(dvMap0.get(2) === Some(3.1)) - assert(dvMap0.get(3) === Some(0.0)) - - val dvMap1 = scala.collection.mutable.Map[Int, Double]() - dv.foreach(false) { - case (index, value) => dvMap1.put(index, value) - } - assert(dvMap1.size === 4) - assert(dvMap1.get(0) === Some(0.0)) - assert(dvMap1.get(1) === Some(1.2)) - assert(dvMap1.get(2) === Some(3.1)) - assert(dvMap1.get(3) === Some(0.0)) - - val dvMap2 = scala.collection .mutable.Map[Int, Double]() - dv.foreach(true) { - case (index, value) => dvMap2.put(index, value) - } - assert(dvMap2.size === 2) - assert(dvMap2.get(1) === Some(1.2)) - assert(dvMap2.get(2) === Some(3.1)) - - val svMap0 = scala.collection.mutable.Map[Int, Double]() - sv.foreach() { - case (index, value) => svMap0.put(index, value) - } - assert(svMap0.size === 3) - assert(svMap0.get(1) === Some(1.2)) - assert(svMap0.get(2) === Some(3.1)) - assert(svMap0.get(3) === Some(0.0)) - - val svMap1 = scala.collection.mutable.Map[Int, Double]() - sv.foreach(false) { - case (index, value) => svMap1.put(index, value) - } - assert(svMap1.size === 3) - assert(svMap1.get(1) === Some(1.2)) - assert(svMap1.get(2) === Some(3.1)) - assert(svMap1.get(3) === Some(0.0)) - - val svMap2 = scala.collection.mutable.Map[Int, Double]() - sv.foreach(true) { - case (index, value) => svMap2.put(index, value) - } - assert(svMap2.size === 2) - assert(svMap2.get(1) === Some(1.2)) - assert(svMap2.get(2) === Some(3.1)) + val dvMap = scala.collection.mutable.Map[Int, Double]() + dv.foreachActive((index, value) => dvMap.put(index, value)) + assert(dvMap.size === 4) + assert(dvMap.get(0) === Some(0.0)) + assert(dvMap.get(1) === Some(1.2)) + assert(dvMap.get(2) === Some(3.1)) + assert(dvMap.get(3) === Some(0.0)) + + val svMap = scala.collection.mutable.Map[Int, Double]() + sv.foreachActive((index, value) => svMap.put(index, value)) + assert(svMap.size === 3) + assert(svMap.get(1) === Some(1.2)) + assert(svMap.get(2) === Some(3.1)) + assert(svMap.get(3) === Some(0.0)) } } From 844b0e6e8e022c90fa93ff2781bf77c907cf5bb7 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 21 Nov 2014 15:21:21 -0800 Subject: [PATCH 6/6] formating --- .../mllib/stat/MultivariateOnlineSummarizer.scala | 6 +++--- .../org/apache/spark/mllib/linalg/VectorsSuite.scala | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 198b66a005962..fcc2a148791bd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -70,8 +70,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S require(n == sample.size, s"Dimensions mismatch when adding new sample." + s" Expecting $n but got ${sample.size}.") - sample.foreachActive((index, value) => { - if(value != 0.0){ + sample.foreachActive { (index, value) => + if (value != 0.0) { if (currMax(index) < value) { currMax(index) = value } @@ -88,7 +88,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S nnz(index) += 1.0 } - }) + } totalCnt += 1 this diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 41b6d7f518e2d..9492f604af4d5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -174,12 +174,14 @@ class VectorsSuite extends FunSuite { assert(v.size === x.rows) } - test("foreach") { + test("foreachActive") { val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0) val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0))) val dvMap = scala.collection.mutable.Map[Int, Double]() - dv.foreachActive((index, value) => dvMap.put(index, value)) + dv.foreachActive { (index, value) => + dvMap.put(index, value) + } assert(dvMap.size === 4) assert(dvMap.get(0) === Some(0.0)) assert(dvMap.get(1) === Some(1.2)) @@ -187,7 +189,9 @@ class VectorsSuite extends FunSuite { assert(dvMap.get(3) === Some(0.0)) val svMap = scala.collection.mutable.Map[Int, Double]() - sv.foreachActive((index, value) => svMap.put(index, value)) + sv.foreachActive { (index, value) => + svMap.put(index, value) + } assert(svMap.size === 3) assert(svMap.get(1) === Some(1.2)) assert(svMap.get(2) === Some(3.1))