From 77cd36b7cdfd1b0eaaff63a19dcafd81525e27ad Mon Sep 17 00:00:00 2001 From: "Kirill A. Korinskiy" Date: Sat, 13 Jun 2015 16:01:09 +0700 Subject: [PATCH 1/3] [SPARK-8341] Significant selector feature transformation Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example for reduce requirement of memory for manipulation on them. This transformation create a model that keep only indices that has different values on fit stage. Example of usage: ``` import org.apache.spark.SparkContext import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD val hashingTF = new HashingTF val localDocs: Seq[(Double, Array[String])] = Seq( (1d, "a a b b b c d".split(" ")), (0d, "a b c d a b c".split(" ")), (1d, "c b a c b a a".split(" "))) val docs = sc.parallelize(localDocs, 2) val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))} // scala> tf.first().features.size // res4: Int = 1048576 val transformer = new SignificantSelector().fit(tf.map(_.features)) val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features))) // scala> transformed_tf.first().features.size // res5: Int = 4 // now you have smallest vector that has same features, // but request less memory for manipulation on DecisionTree for example ``` --- docs/mllib-feature-extraction.md | 40 ++++++ .../mllib/feature/SignificantSelector.scala | 127 ++++++++++++++++++ .../feature/SignificantSelectorTest.scala | 69 ++++++++++ 3 files changed, 236 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 4fe470a8de810..1d5286f706325 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -617,3 +617,43 @@ println("PCA Mean Squared Error = " + MSE_pca) {% endhighlight %} + +## Significant Selector +Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example +for reduce requirement of memory for manipulation on them. + +This transformation create a model that keep only indices that has different values on fit stage. + +### Example +
+
+{% highlight scala %} +import org.apache.spark.SparkContext +import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +val hashingTF = new HashingTF +val localDocs: Seq[(Double, Array[String])] = Seq( + (1d, "a a b b b c d".split(" ")), + (0d, "a b c d a b c".split(" ")), + (1d, "c b a c b a a".split(" "))) + +val docs = sc.parallelize(localDocs, 2) + +val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))} +// scala> tf.first().features.size +// res4: Int = 1048576 + +val transformer = new SignificantSelector().fit(tf.map(_.features)) + +val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features))) +// scala> transformed_tf.first().features.size +// res5: Int = 4 + +// now you have smallest vector that has same features, +// but request less memory for manipulation on DecisionTree for example +{% endhighlight %} +
+
+ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala new file mode 100644 index 0000000000000..6758b41acccdb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.rdd.RDD + +/** + * :: Experimental :: + * Model to extract significant indices from vector. + * + * Significant indices is vector's index that has different value for different vectors. + * + * For example, when you use HashingTF they create big sparse vector, + * and this code convert to smallest vector that don't include same values indices for all vectors. + * + * @param indices array of significant indices. + */ +@Experimental +class SignificantSelectorModel(val indices: Array[Int]) extends VectorTransformer { + + /** + * Applies transformation on a vector. + * + * @param vector vector to be transformed. + * @return transformed vector. + */ + override def transform(vector: Vector): Vector = vector match { + case DenseVector(vs) => + Vectors.dense(indices.map(vs)) + + case SparseVector(s, ids, vs) => + var sv_idx = 0 + var new_idx = 0 + val elements = new mutable.ListBuffer[(Int, Double)]() + + for (idx <- indices) { + while (sv_idx < ids.length && ids(sv_idx) < idx) { + sv_idx += 1 + } + if (sv_idx < ids.length && ids(sv_idx) == idx) { + elements += ((new_idx, vs(sv_idx))) + sv_idx += 1 + } + new_idx += 1 + } + + Vectors.sparse(indices.length, elements) + + case v => + throw new IllegalArgumentException("Don't support vector type " + v.getClass) + } +} + +/** + * :: Experimental :: + * Specialized model for equivalent vectors + */ +@Experimental +class SignificantSelectorEmptyModel extends SignificantSelectorModel(Array[Int]()) { + + val empty_vector = Vectors.dense(Array[Double]()) + + override def transform(vector: Vector): Vector = empty_vector +} + +/** + * :: Experimental :: + * Create Significant selector. + */ +@Experimental +class SignificantSelector() { + + /** + * Returns a significant vector indices selector. + * + * @param sources an `RDD[Vector]` containing the vectors. + */ + def fit(sources: RDD[Vector]): SignificantSelectorModel = { + val sources_count = sources.count() + val significant_indices = sources.flatMap { + case DenseVector(vs) => + vs.zipWithIndex + case SparseVector(_, ids, vs) => + vs.zip(ids) + case v => + throw new IllegalArgumentException("Don't support vector type " + v.getClass) + } + .map(e => (e.swap, 1)) + .reduceByKey(_ + _) + .map { case ((idx, value), count) => (idx, (value, count))} + .groupByKey() + .mapValues { e => + val values = e.groupBy(_._1) + val sum = e.map(_._2).sum + + values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1) + } + .filter(_._2 > 1) + .keys + .collect() + .sorted + + if (significant_indices.nonEmpty) + new SignificantSelectorModel(significant_indices) + else + new SignificantSelectorEmptyModel() + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala new file mode 100644 index 0000000000000..ea2c3453eac6a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.scalatest.FunSuite + +class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { + val dv = Vectors.dense(1, 2, 3, 4, 5) + val sv1 = Vectors.sparse(5, Seq((0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0))) + val sv2 = Vectors.sparse(5, Seq((2, 3.0))) + + test("same result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 1.0, 2.0, 3.0, 4.0), + Vectors.dense(4.0, 5.0, 6.0, 7.0, 8.0) + )) + + val significant = new SignificantSelector().fit(vectors) + assert(significant.transform(dv).toString == dv.toString) + assert(significant.transform(sv1).toString == sv1.toString) + assert(significant.transform(sv2).toString == sv2.toString) + } + + + test("shortest result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))), + Vectors.dense(0.0, 3.0, 5.0, 4.0), + Vectors.dense(0.0, 3.0, 7.0, 4.0) + )) + + val significant = new SignificantSelector().fit(vectors) + assert(significant.transform(dv).toString == "[2.0,3.0,4.0]") + assert(significant.transform(sv1).toString == "(3,[0,1,2],[2.0,3.0,4.0])") + assert(significant.transform(sv2).toString == "(3,[1],[3.0])") + } + + test("empty result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0) + )) + + val significant = new SignificantSelector().fit(vectors) + assert(significant.transform(dv).toString == "[]") + assert(significant.transform(sv1).toString == "[]") + assert(significant.transform(sv2).toString == "[]") + } +} \ No newline at end of file From 62954beb724a8a1e2ed033de25fe880954e8ecac Mon Sep 17 00:00:00 2001 From: "Kirill A. Korinskiy" Date: Fri, 3 Jul 2015 12:31:18 +0700 Subject: [PATCH 2/3] [SPARK-8341] removed `toString` on compare vectors --- .../spark/mllib/feature/SignificantSelectorTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala index ea2c3453eac6a..9a9a77c93cb32 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala @@ -33,9 +33,9 @@ class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { )) val significant = new SignificantSelector().fit(vectors) - assert(significant.transform(dv).toString == dv.toString) - assert(significant.transform(sv1).toString == sv1.toString) - assert(significant.transform(sv2).toString == sv2.toString) + assert(significant.transform(dv) == dv) + assert(significant.transform(sv1) == sv1) + assert(significant.transform(sv2) == sv2) } From 3a34b56392a549708b9655b2f6ec68d49d7ecc9d Mon Sep 17 00:00:00 2001 From: "Kirill A. Korinskiy" Date: Fri, 3 Jul 2015 14:41:36 +0700 Subject: [PATCH 3/3] [SPARK-8341] remove all `.toString` for compare vectors. --- .../feature/SignificantSelectorTest.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala index 9a9a77c93cb32..69ae7d9c33eb8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala @@ -50,9 +50,14 @@ class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { )) val significant = new SignificantSelector().fit(vectors) - assert(significant.transform(dv).toString == "[2.0,3.0,4.0]") - assert(significant.transform(sv1).toString == "(3,[0,1,2],[2.0,3.0,4.0])") - assert(significant.transform(sv2).toString == "(3,[1],[3.0])") + + val significanted_dv = Vectors.dense(2.0, 3.0, 4.0) + val significanted_sv1 = Vectors.sparse(3, Seq((0, 2.0), (1, 3.0), (2, 4.0))) + val significanted_sv2 = Vectors.sparse(3, Seq((1, 3.0))) + + assert(significant.transform(dv) == significanted_dv) + assert(significant.transform(sv1) == significanted_sv1) + assert(significant.transform(sv2) == significanted_sv2) } test("empty result vector") { @@ -62,8 +67,11 @@ class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { )) val significant = new SignificantSelector().fit(vectors) - assert(significant.transform(dv).toString == "[]") - assert(significant.transform(sv1).toString == "[]") - assert(significant.transform(sv2).toString == "[]") + + val empty_vector = Vectors.dense(Array[Double]()) + + assert(significant.transform(dv) == empty_vector) + assert(significant.transform(sv1) == empty_vector) + assert(significant.transform(sv2) == empty_vector) } } \ No newline at end of file