diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialMapper.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialMapper.scala new file mode 100644 index 0000000000000..8e75267f1db77 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialMapper.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.UnaryTransformer +import org.apache.spark.ml.param.{IntParam, ParamMap} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.sql.types.DataType + +/** + * :: AlphaComponent :: + * Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion, + * which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an + * expansion of a product of sums expresses it as a sum of products by using the fact that + * multiplication distributes over addition". Take a 2-variable feature vector as an example: + * `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`. + */ +@AlphaComponent +class PolynomialMapper extends UnaryTransformer[Vector, Vector, PolynomialMapper] { + + /** + * The polynomial degree to expand, which should be larger than 1. + * @group param + */ + val degree = new IntParam(this, "degree", "the polynomial degree to expand") + setDefault(degree -> 2) + + /** @group getParam */ + def getDegree: Int = getOrDefault(degree) + + /** @group setParam */ + def setDegree(value: Int): this.type = set(degree, value) + + override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = { v => + val d = paramMap(degree) + // PolynomialMapper.transform(getDegree) + PolynomialMapperV2.expand(v, d) + } + + override protected def outputDataType: DataType = new VectorUDT() +} + +object PolynomialMapper { + /** + * The number that combines k items from N items without repeat, i.e. the binomial coefficient. + */ + private def binomialCoefficient(N: Int, k: Int): Int = { + (N - k + 1 to N).product / (1 to k).product + } + + /** + * The number of monomials of a `numVariables` vector after expanding at a specific polynomial + * degree `degree`. + */ + private def numMonomials(degree: Int, numVariables: Int): Int = { + binomialCoefficient(numVariables + degree - 1, degree) + } + + /** + * The number of monomials of a `numVariables` vector after expanding from polynomial degree 1 to + * polynomial degree `degree`. + */ + private def numExpandedDims(degree: Int, numVariables: Int): Int = { + binomialCoefficient(numVariables + degree, numVariables) - 1 + } + + /** + * Given a pre-built array of Double, fill it with expanded monomials to a given polynomial + * degree. The function works for dense vector. + * @param values the array of Double, which represents a dense vector. + * @param prevStart the start offset of elements that filled in the last function call. + * @param prevLen the length of elements that filled in the last function. + * @param currDegree the current degree that we want to expand. + * @param finalDegree the final expected degree that we want to expand. + * @param nVariables number of variables in the original feature vector. + */ + @tailrec + private def fillDenseVector(values: Array[Double], prevStart: Int, prevLen: Int, currDegree: Int, + finalDegree: Int, nVariables: Int): Unit = { + + if (currDegree > finalDegree) { + return + } + + val currExpandedVecFrom = prevStart + prevLen + val currExpandedVecLen = numMonomials(currDegree, nVariables) + + var leftIndex = 0 + var currIndex = currExpandedVecFrom + + while (leftIndex < nVariables) { + val numToKeep = numMonomials(currDegree - 1, nVariables - leftIndex) + val prevVecStartIndex = prevStart + prevLen - numToKeep + + var rightIndex = 0 + while (rightIndex < numToKeep) { + values(currIndex) = + values(leftIndex) * values(prevVecStartIndex + rightIndex) + currIndex += 1 + rightIndex += 1 + } + + leftIndex += 1 + } + + fillDenseVector(values, currExpandedVecFrom, currExpandedVecLen, currDegree + 1, finalDegree, + nVariables) + } + + /** + * Given an array buffer, fill it with expanded monomials to a given polynomial degree. The + * function works for sparse vector. + * @param indices the indices buffer, which represents the indices of the expanded sparse vector. + * @param values the values buffer, which contains the monomials. + * @param prevStart the start offset that filled in the last function call. + * @param prevLen the length of elements that filled in the last function call. + * @param currDegree the current degree that we want to expand. + * @param finalDegree the final expected degree that we want to expand. + * @param nVariables number of variables in the original feature vector. + * @param originalLen the length of the original sparse vector. + */ + @tailrec + private def fillSparseVector ( + indices: ArrayBuffer[Int], + values: ArrayBuffer[Double], + prevStart: Int, + prevLen: Int, + currDegree: Int, + finalDegree: Int, + nVariables: Int, + originalLen: Int): Unit = { + + if (currDegree > finalDegree) { + return + } + + val currStart = prevStart + prevLen + var currLen = 0 + var leftIndex = 0 + while (leftIndex < originalLen) { + var rightIndex = 0 + while (rightIndex < prevLen) { + val targetIdx = indexSparseVector(nVariables, currDegree, indices(leftIndex), + indices(prevStart + rightIndex)) + if (targetIdx == -1) { + // ignore the invalid indices composition. + } else { + indices += targetIdx + values += values(leftIndex) * values(prevStart + rightIndex) + currLen += 1 + } + rightIndex += 1 + } + leftIndex += 1 + } + + fillSparseVector(indices, values, currStart, currLen, currDegree + 1, finalDegree, nVariables, + originalLen) + } + + /** + * Compute the target index of monomial for sparse vector. The function simulates the process of + * `x * x's degree-1 polynomial expansion = x's degree polynomial expansion`. + * @param nVariables number of variables in the original feature vector. + * @param degree current degree that we want to expand. + * @param leftIdx the index of the LHS vector, i.e. `x`. + * @param rightIdx the index of the RHS vector, i.e. `x's degree-1 polynomial expansion`. + * @return target monomial's index. -1 represents invalid composition of elements. + */ + private def indexSparseVector(nVariables: Int, degree: Int, leftIdx: Int, rightIdx: Int): Int = { + val startIdxOfRightVector = numExpandedDims(degree - 2, nVariables) + val lenOfRightVector = numMonomials(degree - 1, nVariables) + val startIdxOfTargetVector = numExpandedDims(degree - 1, nVariables) + val rightResetIdx = rightIdx - startIdxOfRightVector + val rightIgnoredLen = lenOfRightVector - numMonomials(degree - 1, nVariables - leftIdx) + val targetResetIdx = rightResetIdx - rightIgnoredLen + val targetComplementaryLen = (0 until leftIdx) + .map(x => numMonomials(degree - 1, nVariables - x)).sum + val targetIdx = if (targetResetIdx >= 0) { + startIdxOfTargetVector + targetComplementaryLen + targetResetIdx + } else { + -1 + } + targetIdx + } + + /** + * Transform a vector of variables into a larger vector which stores the polynomial expansion from + * degree 1 to degree `degree`. + */ + private def transform(degree: Int)(feature: Vector): Vector = { + val expectedDims = numExpandedDims(degree, feature.size) + + feature match { + case f: DenseVector => + val originalLen = f.size + + val values = Array.fill[Double](expectedDims)(0.0) + + for (i <- 0 until f.size) { + values(i) = f(i) + } + + fillDenseVector(values, 0, originalLen, 2, degree, originalLen) + + Vectors.dense(values) + + case f: SparseVector => + val originalLen = f.indices.size + + val indices = ArrayBuffer.empty[Int] + val values = ArrayBuffer.empty[Double] + + for (i <- 0 until f.indices.size) { + indices += f.indices(i) + values += f.values(i) + } + + fillSparseVector(indices, values, 0, originalLen, 2, degree, feature.size, originalLen) + + Vectors.sparse(expectedDims, indices.toArray, values.toArray) + } + } +} + +/** + * The expansion is done via recursion. Given n features and degree d, the size after expansion is + * (n + d choose d) (including 1 and first-order values). For example, let f([a, b, c], 3) be the + * function that expands [a, b, c] to their monomials of degree 3. We have the following recursion: + * + * {{{ + * f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3] + * }}} + * + * To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember the + * current index and increment it properly for sparse input. + */ +object PolynomialMapperV2 { + + private def choose(n: Int, k: Int): Int = { + Range(n, n - k, -1).product / Range(k, 1, -1).product + } + + private def getPolySize(numFeatures: Int, degree: Int): Int = choose(numFeatures + degree, degree) + + private def expandDense( + values: Array[Double], + lastIdx: Int, + degree: Int, + multiplier: Double, + polyValues: Array[Double], + curPolyIdx: Int): Int = { + if (multiplier == 0.0) { + // do nothing + } else if (degree == 0 || lastIdx < 0) { + polyValues(curPolyIdx) = multiplier + } else { + val v = values(lastIdx) + val lastIdx1 = lastIdx - 1 + var alpha = multiplier + var i = 0 + var curStart = curPolyIdx + while (i <= degree && alpha != 0.0) { + curStart = expandDense(values, lastIdx1, degree - i, alpha, polyValues, curStart) + i += 1 + alpha *= v + } + } + curPolyIdx + getPolySize(lastIdx + 1, degree) + } + + private def expandSparse( + indices: Array[Int], + values: Array[Double], + lastIdx: Int, + lastFeatureIdx: Int, + degree: Int, + multiplier: Double, + polyIndices: mutable.ArrayBuilder[Int], + polyValues: mutable.ArrayBuilder[Double], + curPolyIdx: Int): Int = { + if (multiplier == 0.0) { + // do nothing + } else if (degree == 0 || lastIdx < 0) { + polyIndices += curPolyIdx + polyValues += multiplier + } else { + // Skip all zeros at the tail. + val v = values(lastIdx) + val lastIdx1 = lastIdx - 1 + val lastFeatureIdx1 = indices(lastIdx) - 1 + var alpha = multiplier + var curStart = curPolyIdx + var i = 0 + while (i <= degree && alpha != 0.0) { + curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, degree - i, alpha, + polyIndices, polyValues, curStart) + i += 1 + alpha *= v + } + } + curPolyIdx + getPolySize(lastFeatureIdx + 1, degree) + } + + private def expand(dv: DenseVector, degree: Int): DenseVector = { + val n = dv.size + val polySize = getPolySize(n, degree) + val polyValues = new Array[Double](polySize) + expandDense(dv.values, n - 1, degree, 1.0, polyValues, 0) + new DenseVector(polyValues) + } + + private def expand(sv: SparseVector, degree: Int): SparseVector = { + val polySize = getPolySize(sv.size, degree) + val nnz = sv.values.length + val nnzPolySize = getPolySize(nnz, degree) + val polyIndices = mutable.ArrayBuilder.make[Int] + polyIndices.sizeHint(nnzPolySize) + val polyValues = mutable.ArrayBuilder.make[Double] + polyValues.sizeHint(nnzPolySize) + expandSparse( + sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, polyValues, 0) + new SparseVector(polySize, polyIndices.result(), polyValues.result()) + } + + def expand(v: Vector, degree: Int): Vector = { + v match { + case dv: DenseVector => expand(dv, degree) + case sv: SparseVector => expand(sv, degree) + case _ => throw new IllegalArgumentException + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialMapperSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialMapperSuite.scala new file mode 100644 index 0000000000000..5efbbf0791a29 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialMapperSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +class PolynomialMapperSuite extends FunSuite with MLlibTestSparkContext { + + def collectResult(result: DataFrame): Array[Vector] = { + result.select("poly_features").collect().map { + case Row(features: Vector) => features + } + } + + def assertTypeOfVector(lhs: Array[Vector], rhs: Array[Vector]): Unit = { + assert((lhs, rhs).zipped.forall { + case (v1: DenseVector, v2: DenseVector) => true + case (v1: SparseVector, v2: SparseVector) => true + case _ => false + }, "The vector type should be preserved after normalization.") + } + + def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { + assert((lhs, rhs).zipped.forall { (vector1, vector2) => + vector1 ~== vector2 absTol 1E-1 + }, "The vector value is not correct after normalization.") + } + + test("Polynomial expansion with default parameter") { + val data = Array( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(-2.0, 2.3), + Vectors.dense(0.0, 0.0, 0.0), + Vectors.dense(0.6, -1.1, -3.0), + Vectors.sparse(3, Seq()) + ) + + val sqlContext = new SQLContext(sc) + val dataFrame = sqlContext + .createDataFrame(sc.parallelize(data, 2).map(Tuple1.apply)).toDF("features") + + val polynomialMapper = new PolynomialMapper() + .setInputCol("features") + .setOutputCol("poly_features") + + val twoDegreeExpansion: Array[Vector] = Array( + Vectors.sparse(9, Array(0, 1, 3, 4, 6), Array(-2.0, 2.3, 4.0, -4.6, 5.29)), + Vectors.dense(-2.0, 2.3, 4.0, -4.6, 5.29), + Vectors.dense(Array.fill[Double](9)(0.0)), + Vectors.dense(0.6, -1.1, -3.0, 0.36, -0.66, -1.8, 1.21, 3.3, 9.0), + Vectors.sparse(9, Array.empty[Int], Array.empty[Double])) + + val result = collectResult(polynomialMapper.transform(dataFrame)) + + println(result.mkString("\n")) + + assertTypeOfVector(data, result) + + assertValues(result, twoDegreeExpansion) + } + + test("Polynomial expansion with setter") { + val data = Array( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(-2.0, 2.3), + Vectors.dense(0.0, 0.0, 0.0), + Vectors.dense(0.6, -1.1, -3.0), + Vectors.sparse(3, Seq()) + ) + + val sqlContext = new SQLContext(sc) + val dataFrame = sqlContext + .createDataFrame(sc.parallelize(data, 2).map(Tuple1.apply)).toDF("features") + + val polynomialMapper = new PolynomialMapper() + .setInputCol("features") + .setOutputCol("poly_features") + .setDegree(3) + + val threeDegreeExpansion: Array[Vector] = Array( + Vectors.sparse(19, Array(0, 1, 3, 4, 6, 9, 10, 12, 15), + Array(-2.0, 2.3, 4.0, -4.6, 5.29, -8.0, 9.2, -10.58, 12.167)), + Vectors.dense(-2.0, 2.3, 4.0, -4.6, 5.29, -8.0, 9.2, -10.58, 12.167), + Vectors.dense(Array.fill[Double](19)(0.0)), + Vectors.dense(0.6, -1.1, -3.0, 0.36, -0.66, -1.8, 1.21, 3.3, 9.0, 0.216, -0.396, -1.08, 0.73, + 1.98, 5.4, -1.33, -3.63, -9.9, -27.0), + Vectors.sparse(19, Array.empty[Int], Array.empty[Double])) + + val result = collectResult(polynomialMapper.transform(dataFrame)) + + assertTypeOfVector(data, result) + + assertValues(result, threeDegreeExpansion) + } +} +