diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/DensityKernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/DensityKernel.scala new file mode 100644 index 0000000000000..8ee4a45556cf0 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/DensityKernel.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Abstract class which can be extended to + * implement various Multivariate Density + * Kernels. + */ +trait DensityKernel extends Kernel with Serializable { + protected val mu: Double + protected val r: Double + + def eval(x: Vector):Double + + override def evaluate(x: LabeledPoint, y: LabeledPoint): Double = + this.eval(Vectors.fromBreeze(x.features.toBreeze.-=(y.features.toBreeze))) + + protected def derivative(n: Int, x: Double): Double + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/GaussianDensityKernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/GaussianDensityKernel.scala new file mode 100644 index 0000000000000..23260b74f6516 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/GaussianDensityKernel.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.kernels + +import breeze.linalg.{norm, DenseVector} +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.rdd.RDD + +class GaussianDensityKernel + extends DensityKernel + with KernelEstimator + with Logging + with Serializable { + private val exp = scala.math.exp _ + private val pow = scala.math.pow _ + private val sqrt = scala.math.sqrt _ + private val Pi = scala.math.Pi + protected var bandwidth: Vector = Vectors.zeros(10) + override protected val mu = (1/4)*(1/sqrt(Pi)) + override protected val r = (1/2)*(1/sqrt(Pi)) + + private def evalForDimension(x: Double, pilot: Double): Double = + exp(-1*pow(x/pilot, 2)/2)/sqrt(Pi * 2) + + private def evalWithBandwidth(x: Vector, b: Vector): Double = { + assert(x.size == b.size, + "Dimensions of vector x and the bandwidth of the kernel must match") + val buff = x.toBreeze + val bw = b.toBreeze + val normalizedbuff: breeze.linalg.DenseVector[Double] = DenseVector.tabulate( + bw.size)( + (i) => buff(i)/bw(i) + ) + exp(-1*pow(norm(normalizedbuff), 2)/2)/pow(sqrt(Pi * 2), b.size) + } + + /* + * Calculate the value of the hermite polynomials + * tail recursively. This is needed to calculate + * the Gaussian derivatives at a point x. + * */ + private def hermite(n: Int, x: Double): Double = { + def hermiteHelper(k: Int, x: Double, a: Double, b: Double): Double = + k match { + case 0 => a + case 1 => b + case _ => hermiteHelper(k-1, x, b, x*b - (k-1)*a) + } + hermiteHelper(n, x, 1, x) + } + + def setBandwidth(b: Vector): Unit = { + this.bandwidth = b + } + + override def eval(x: Vector):Double = evalWithBandwidth(x, this.bandwidth) + + /** + * Calculates the derivative at point x for the Gaussian + * Density Kernel, for only one dimension. + * + * @param n The number of times the gaussian has to be differentiated + * @param x The point x at which the derivative has to evaluated + * @return The value of the nth derivative at the point x + * */ + override def derivative(n: Int, x: Double): Double = { + (1/sqrt(2*Pi))*(1/pow(-1.0,n))*exp(-1*pow(x,2)/2)*hermite(n, x) + } + + /** + * Implementation of the estimator for the R integral + * for a multivariate Gaussian Density Kernel. + * Evaluates R(D_r(f(x))). + * + * @param r the degree of the derivative of the kernel + * + * @param N The size of the original data set from which + * kernel matrix [[RDD]] was constructed. + * + * @param pilot The pilot bandwidth to be used to calculate + * the kernel values. (Note that we have not calculated + * the AMISE bandwidth yet and we use this estimator + * as a means to get the AMISE bandwidth) + * + * @param kernel The RDD containing the matrix + * consisting of pairs Xi - Xj, where Xi and Xj + * are drawn from the original data set. + * + * @return R the estimated value of the integral of the square + * of the rth derivative of the kernel over the Real domain. + * */ + override protected def R( + r: Int, N: Long, pilot: breeze.linalg.Vector[Double], + kernel: RDD[((Long, Long), Vector)]): breeze.linalg.Vector[Double] = { + + /* + * Apply map to get values of the derivative of the kernel + * at various point pairs. + * */ + val kernelNormalized = kernel.map((couple) => + (couple._1, Vectors.fromBreeze(DenseVector.tabulate(pilot.size) + ((i) => (1/(pow(N, 2)*pow(pilot(i), r + 1)))* + this.derivative(r, couple._2.toBreeze(i)/pilot(i))) + ))) + + /* + * Sum up all the individual values to get the estimated + * value of the integral + * */ + val integralvalue = kernelNormalized.reduce((a,b) => + ((0,0), Vectors.fromBreeze(a._2.toBreeze + b._2.toBreeze))) + + integralvalue._2.toBreeze + } + + /** + * Use the Sheather and Jones plug-in + * method to calculate the optimal bandwidth + * http://bit.ly/1EoBY7q + * + * */ + override def optimalBandwidth(data: RDD[Vector]): Unit = { + val dataSize: Long = data.count() + + // First calculate variance of all dimensions + val columnStats = Statistics.colStats(data) + // And then the standard deviation + val colvar = columnStats.variance.toBreeze + val colstd = colvar.map((v) => sqrt(v)) + + // Now calculate the initial estimates of R(f^6) and R(f^8) + + val Rf8: DenseVector[Double] = DenseVector.tabulate(colstd.size)( + (i) => 105*pow(colstd(i), -9.0)/(32*sqrt(Pi))) + + /* + * Use the earlier result to calculate + * h2, the bandwidth for each dimension + * */ + + val h2: DenseVector[Double] = DenseVector.tabulate(colstd.size)((i) => + pow(-2*this.derivative(6, 0.0)/(dataSize*this.mu*Rf8(i)), 1/9)) + + + /* + * Use h2 to calculate more + * refined estimates of R(f^6) + * */ + + // Get an 0-indexed version of the original data set + val mappedData = SVMKernel.indexedRDD(data) + + /* + * Apply cartesian product on the indexed data set + * and then map it to a RDD of type [(i,j), Xi - Xj] + * */ + val kernel = mappedData.cartesian(mappedData) + .map((prod) => ((prod._1._1, prod._2._1), + Vectors.fromBreeze(prod._1._2.toBreeze - + prod._2._2.toBreeze)) + ) + kernel.cache() + + val newRf6: breeze.linalg.Vector[Double] = this.R(8, dataSize, h2, kernel) + + val hAMSE: breeze.linalg.Vector[Double] = DenseVector.tabulate(colstd.size)((i) => + pow((-2*this.derivative(4, 0.0))/(dataSize*this.mu*newRf6(i)), 1/7)) + + val newRf4: breeze.linalg.Vector[Double] = this.R(4, dataSize, hAMSE, kernel) + + val hAMISE: breeze.linalg.Vector[Double] = DenseVector.tabulate(colstd.size)((i) => + pow(this.r/(dataSize*this.mu*this.mu*newRf4(i)), 1/5)) + + this.bandwidth = Vectors.fromBreeze(hAMISE) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/Kernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/Kernel.scala new file mode 100644 index 0000000000000..13f50077744e3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/Kernel.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Declares a trait Kernel which would serve + * as a base trait for all classes implementing + * Machine Learning Kernels. + * */ +trait Kernel { + + /** + * Evaluates the value of the kernel given two + * vectorial parameters + * + * @param x a local Vector. + * @param y a local Vector. + * + * @return the value of the Kernel function. + * */ + def evaluate(x: LabeledPoint, y: LabeledPoint): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/KernelEstimator.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/KernelEstimator.scala new file mode 100644 index 0000000000000..1af34acd8668c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/KernelEstimator.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.kernels + +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD + +/** + * Trait defining the basic behavior + * of a Kernel density estimator + */ +trait KernelEstimator extends Logging { + + protected def R( + r: Int, N: Long, pilot: breeze.linalg.Vector[Double], + kernel: RDD[((Long, Long), Vector)]): breeze.linalg.Vector[Double] + + /** + * Calculate the AMISE (Asymptotic Mean Integrated Square Error) + * optimal bandwidth assignment by 'solve the equation plug in method' + * */ + def optimalBandwidth(data: RDD[Vector]): Unit + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/PolynomialKernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/PolynomialKernel.scala new file mode 100644 index 0000000000000..828aca0b48570 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/PolynomialKernel.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Standard Polynomial SVM Kernel + * of the form K(Xi,Xj) = (Xi^T * Xj + d)^r + */ +class PolynomialKernel( + private var degree: Int, + private var offset: Double) + extends SVMKernel[RDD[((Long, Long), Double)]] + with Logging + with Serializable{ + + def setDegree(d: Int): Unit = { + this.degree = d + } + + def setOffset(o: Int): Unit = { + this.offset = o + } + + override def evaluate(x: LabeledPoint, y: LabeledPoint): Double = + Math.pow(x.features.toBreeze dot y.features.toBreeze + this.offset, this.degree) + + override def buildKernelMatrixasRDD( + mappedData: RDD[(Long, LabeledPoint)], + length: Long): KernelMatrix[RDD[((Long, Long), Double)]] = + SVMKernel.buildSVMKernelMatrix(mappedData, length, this.evaluate) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/RBFKernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/RBFKernel.scala new file mode 100644 index 0000000000000..3b78b159d43b1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/RBFKernel.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Standard RBF Kernel of the form + * K(Xi,Xj) = exp(-||Xi - Xj||**2/2*bandwidth**2) + */ + +class RBFKernel(private var bandwidth: Double) + extends SVMKernel[RDD[((Long, Long), Double)]] + with Logging + with Serializable { + + def setBandwidth(d: Double): Unit = { + this.bandwidth = d + } + + override def evaluate(x: LabeledPoint, y: LabeledPoint): Double = { + val diff: Vector = Vectors.fromBreeze(x.features.toBreeze - y.features.toBreeze) + Math.exp(-1*Math.pow(Vectors.norm(diff, 2.0), 2)/(2*Math.pow(bandwidth, 2))) + } + + override def buildKernelMatrixasRDD( + mappedData: RDD[(Long, LabeledPoint)], + length: Long): KernelMatrix[RDD[((Long, Long), Double)]] = + SVMKernel.buildSVMKernelMatrix(mappedData, length, this.evaluate) + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/kernels/SVMKernel.scala b/mllib/src/main/scala/org/apache/spark/mllib/kernels/SVMKernel.scala new file mode 100644 index 0000000000000..4c26ca07c560e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/kernels/SVMKernel.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import breeze.linalg.{DenseVector, DenseMatrix} +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +/** + * Defines an abstract class outlines the basic + * functionality requirements of an SVM Kernel + */ +abstract class SVMKernel[T] extends Kernel with Logging with Serializable { + + /** + * Build the kernel matrix of the prototype vectors + * + * @param mappedData The prototype vectors/points + * + * @param length The number of points + * + * @return A [[KernelMatrix]] object + * + * + * */ + def buildKernelMatrixasRDD( + mappedData: RDD[(Long, LabeledPoint)], + length: Long): KernelMatrix[T] + + /** + * Builds an approximate nonlinear feature map + * which corresponds to an SVM Kernel. This is + * done using the Nystrom method i.e. approximating + * the eigenvalues and eigenvectors of the Kernel + * matrix of a given RDD + * + * For each data point, + * calculate m dimensions of the + * feature map where m is the number + * of eigenvalues/vectors obtained from + * the Eigen Decomposition. + * + * phi_i(x) = (1/sqrt(eigenvalue(i)))*Sum(k, 1, m, K(k, x)*eigenvector(i)(k)) + * + * @param decomposition The Eigenvalue decomposition calculated + * from the kernel matrix of the prototype + * subset. + * @param prototypes The prototype subset. + * + * @param data The dataset [[RDD]] on which the feature map + * is to be applied. + * + * */ + def featureMapping(decomposition: (DenseVector[Double], DenseMatrix[Double])) + (prototypes: RDD[(Long, LabeledPoint)]) + (data: RDD[(Long, LabeledPoint)]) + : RDD[(Long, LabeledPoint)] = { + + logInfo("Calculating the Non Linear feature map of data set") + + data.cartesian(prototypes) + .map((couple) => { + val y: DenseVector[Double] = DenseVector.tabulate(decomposition._1.length){i => + var eigenvector = 0.0 + if (couple._2._1.toInt < decomposition._1.length) { + eigenvector = decomposition._2(couple._2._1.toInt, i) + } + + val eigenvalue = decomposition._1(i) + this.evaluate(couple._1._2, couple._2._2) * eigenvector/Math.sqrt(eigenvalue) + } + (couple._1._1, (couple._1._2.label, y)) + }).reduceByKey((veca, vecb) => (veca._1, veca._2 + vecb._2)) + .map((p) => (p._1, new LabeledPoint(p._2._1, Vectors.fromBreeze(p._2._2)))) + } +} + +/** + * Defines a global singleton object + * [[SVMKernel]] which has useful functions + * while working with [[RDD]] of [[LabeledPoint]] + * + * */ +object SVMKernel extends Logging with Serializable { + + /** + * Defines a function value which + * calculates the multiplication of + * the Kernel Matrix with a Breeze + * Vector and returns the result as a + * Breeze DenseVector. + * */ + def multiplyKernelMatrixBy(kernel: RDD[((Long, Long), Double)]) + (v :breeze.linalg.DenseVector[Double]): + DenseVector[Double] = { + val vbr = kernel.context.broadcast(v) + val result: DenseVector[Double] = + DenseVector.tabulate(v.length)( + (i) => { + // Get row number i of kernel + val row = DenseVector.apply(kernel + .filter((point) => i == point._1._1) + .map((p) => p._2) + .collect()) + // dot product with v + vbr.value.t * row + } + ) + result + } + + /** + * Returns an indexed [[RDD]] from a non indexed [[RDD]] of [[LabeledPoint]] + * + * @param data : An [[RDD]] of [[LabeledPoint]] + * + * @return An (Int, LabeledPoint) Key-Value RDD indexed + * from 0 to data.count() - 1 + * */ + def indexedRDD[T](data: RDD[T]): RDD[(Long, T)] = + data.zipWithIndex().map((p) => (p._2, p._1)) + + /** + * This function constructs an [[SVMKernelMatrix]] + * + * @param mappedData The indexed [[RDD]] of [[LabeledPoint]] + * @param length Length of the indexed [[RDD]] + * @param eval A function which calculates the value of the Kernel + * given two Labeled Points [[LabeledPoint]]. + * + * @return An [[SVMKernelMatrix]] object. + * + * */ + def buildSVMKernelMatrix( + mappedData: RDD[(Long, LabeledPoint)], + length: Long, + eval: (LabeledPoint, LabeledPoint) => Double): + KernelMatrix[RDD[((Long, Long), Double)]] = { + + logInfo("Constructing key-value representation of kernel matrix.") + logInfo("Dimension: " + length + " x " + length) + + val labels = mappedData.map((p) => (p._1, p._2.label)) + val kernel = mappedData.cartesian(mappedData) + .map((prod) => ((prod._1._1, prod._2._1), + eval(prod._1._2, prod._2._2))) + kernel.cache() + new SVMKernelMatrix(kernel, length, labels) + } + + def zipVectorsWithLabels( + mappedData: RDD[(Long, Vector)], + labels: RDD[(Long, Double)]): RDD[LabeledPoint] = + mappedData.join(labels).map((point) => + new LabeledPoint(point._2._2, point._2._1)) + + def unzipIndexedData(mappedData: RDD[(Long, LabeledPoint)]): + RDD[LabeledPoint] = mappedData.map((p) => p._2) +} + +/** + * Defines a trait which outlines the basic + * functionality of Kernel Matrices. + * */ +trait KernelMatrix[T] extends Serializable { + protected val kernel: T + + def eigenDecomposition(dimensions: Int): (DenseVector[Double], DenseMatrix[Double]) + + def getKernelMatrix(): T = this.kernel +} + +class SVMKernelMatrix( + override protected val kernel: RDD[((Long, Long), Double)], + private val dimension: Long, + private val labels: RDD[(Long, Double)]) + extends KernelMatrix[RDD[((Long, Long), Double)]] + with Logging + with Serializable { + + /** + * Builds an approximate nonlinear feature map + * which corresponds to an SVM Kernel. This is + * done using the Nystrom method i.e. approximating + * the eigenvalues and eigenvectors of the Kernel + * matrix of a given RDD + * + * @param dimensions The effective number of dimensions + * to be calculated in the feature map + * + * @return An RDD containing the non linear feature map + * of all the data points passed to the function. + * + * */ + override def eigenDecomposition(dimensions: Int = this.dimension.toInt): + (DenseVector[Double], DenseMatrix[Double]) = { + logInfo("Eigenvalue decomposition of the kernel matrix using ARPACK.") + EigenValueDecomposition + .symmetricEigs( + SVMKernel.multiplyKernelMatrixBy(kernel), + dimension.toInt, dimensions, + 0.0001, 300) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropyMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropyMeasure.scala new file mode 100644 index 0000000000000..78ffbda08b3d8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropyMeasure.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.prototype + +import org.apache.spark.mllib.kernels.DensityKernel +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Models a general entropy measure. + * Any entropy measure would require a + * probability distribution + */ +abstract class EntropyMeasure extends Measure[LabeledPoint] with Serializable { + + protected val density: DensityKernel + + /** + * Given a probability distribution for + * the data set, calculate the entropy of + * the data set with respect to the given + * distribution. + * + * @param data The data set whose entropy is + * required. + * + * @return The entropy of the data set. + * */ + + def entropy[K](data: RDD[(K, LabeledPoint)]): Double + + override def evaluate[K](data: RDD[(K, LabeledPoint)]): Double = this.entropy(data) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropySelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropySelector.scala new file mode 100644 index 0000000000000..3a0245a3c2853 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/prototype/EntropySelector.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.prototype + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Basic skeleton of an entropy based + * subset selector + */ +abstract class EntropySelector + extends SubsetSelector[(Long, LabeledPoint)] + with Serializable + with Logging { + protected val measure: EntropyMeasure + protected val delta: Double + protected val MAX_ITERATIONS: Int +} + +class GreedyEntropySelector( + m: EntropyMeasure, + del: Double = 0.0001, + max: Int = 5000) + extends EntropySelector + with Serializable + with Logging { + + override protected val measure: EntropyMeasure = m + override protected val delta: Double = del + override protected val MAX_ITERATIONS: Int = max + + override def selectPrototypes( + data: RDD[(Long, LabeledPoint)], + M: Int): RDD[(Long, LabeledPoint)] = { + + /* + * Draw an initial sample of M points + * from data without replacement. + * + * Define a working set which we + * will use as a prototype set to + * to each iteration + * */ + logInfo("Initializing the working set, by drawing randomly from the training set") + val workingset = data.keys.takeSample(false, M) + + val r = scala.util.Random + var it: Int = 0 + + // All the elements not in the working set + var newDataset: RDD[Long] = data.keys.filter((p) => !workingset.contains(p)) + // Existing best value of the entropy + var oldEntropy: Double = this.measure.evaluate(data.filter((point) => + workingset.contains(point._1))) + // Store the value of entropy after an element swap + var newEntropy: Double = 0.0 + var d: Double = Double.NegativeInfinity + var rand: Int = 0 + logInfo("Starting iterative, entropy based greedy subset selection") + do { + /* + * Randomly select a point from + * the working set as well as data + * and then swap them. + * */ + rand = r.nextInt(workingset.length - 1) + val point1 = workingset.apply(rand) + + val point2 = newDataset.takeSample(false, 1).apply(0) + + // Update the working set + workingset(rand) = point2 + // Calculate the new entropy + newEntropy = this.measure.evaluate(data.filter((p) => + workingset.contains(p._1))) + + /* + * Calculate the change in entropy, + * if it has improved then keep the + * swap, otherwise revert to existing + * working set. + * */ + d = newEntropy - oldEntropy + + if(d > 0) { + /* + * Improvement in entropy so + * keep the updated working set + * as it is and update the + * variable 'newDataset' + * */ + oldEntropy = newEntropy + newDataset = data.keys.filter((p) => !workingset.contains(p)) + } else { + /* + * No improvement in entropy + * so revert the working set + * to its initial state. Leave + * the variable newDataset as + * it is. + * */ + workingset(rand) = point1 + } + + it += 1 + } while(math.abs(d) >= this.delta && + it <= this.MAX_ITERATIONS) + logInfo("Working set obtained, now starting process of packaging it as an RDD") + // Time to return the final working set + data.filter((p) => workingset.contains(p._1)) + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/prototype/Measure.scala b/mllib/src/main/scala/org/apache/spark/mllib/prototype/Measure.scala new file mode 100644 index 0000000000000..80d466fb18ee3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/prototype/Measure.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.prototype + +import org.apache.spark.rdd.RDD + +/** + * Trait which outlines basic behavior + * of a subset utility measure. + */ +trait Measure[T] { + def evaluate[K](data: RDD[(K, T)]): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/prototype/QuadraticRenyiEntropy.scala b/mllib/src/main/scala/org/apache/spark/mllib/prototype/QuadraticRenyiEntropy.scala new file mode 100644 index 0000000000000..3613dba8a723e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/prototype/QuadraticRenyiEntropy.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.prototype + +import breeze.linalg.DenseVector +import org.apache.spark.Logging +import org.apache.spark.mllib.kernels.DensityKernel +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Implements the quadratic Renyi Entropy + */ +class QuadraticRenyiEntropy(dist: DensityKernel) + extends EntropyMeasure + with Serializable + with Logging { + + val log_e = scala.math.log _ + val sqrt = scala.math.sqrt _ + override protected val density: DensityKernel = dist + + /** + * Calculate the quadratic Renyi entropy + * within a distribution specific + * proportionality constant. This can + * be used to compare the entropy values of + * different sets of data on the same + * distribution. + * + * @param data The data set whose entropy is + * required. + * @return The entropy of the dataset assuming + * it is distributed as given by the value + * parameter 'density'. + * */ + + override def entropy[K](data: RDD[(K, LabeledPoint)]): Double = { + val dim = data.first()._2.features.size + val root_two: breeze.linalg.Vector[Double] = DenseVector.fill(dim, sqrt(2)) + -1*log_e(data.cartesian(data).map((couple) => + density.eval(Vectors.fromBreeze(couple._1._2.features.toBreeze :/ root_two - + couple._2._2.features.toBreeze :/ root_two))).reduce((a,b) => a + b)) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/prototype/SubsetSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/prototype/SubsetSelector.scala new file mode 100644 index 0000000000000..c96bcb0dd3a3e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/prototype/SubsetSelector.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.prototype + +import org.apache.spark.rdd.RDD + +/** + * Defines the characteristics of + * a subset selector + */ +trait SubsetSelector[T] extends Serializable{ + def selectPrototypes(data: RDD[T], M: Int): RDD[T] +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/kernels/KernelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/kernels/KernelSuite.scala new file mode 100644 index 0000000000000..b45980f7bd972 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/kernels/KernelSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.kernels + +import org.scalatest.FunSuite +import org.apache.spark.mllib.classification.SVMSuite +import org.apache.spark.mllib.prototype.{QuadraticRenyiEntropy, GreedyEntropySelector} +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class KernelSuite extends FunSuite with MLlibTestSparkContext { + + test("Testing evaluate function of Polynomial and RBF Functions"){ + + val nPoints = 100 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val testRDD = sc.parallelize(testData) + + val rbf = new RBFKernel(1.00) + val poly = new PolynomialKernel(2, 1.5) + + val mappedData = SVMKernel.indexedRDD(testRDD) + + val kernelMatrix1 = poly.buildKernelMatrixasRDD(mappedData, nPoints) + val kernelMatrix2 = rbf.buildKernelMatrixasRDD(mappedData, nPoints) + + assert(mappedData.count() == nPoints) + assert(kernelMatrix1.getKernelMatrix().filter((point) => + point._2.isNaN || point._2.isInfinite) + .count() == 0) + assert(kernelMatrix2.getKernelMatrix().filter((point) => + point._2.isNaN || point._2.isInfinite) + .count() == 0) + + } + + test("Testing building of feature map from the kernel matrix"){ + val nPoints = 100 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val rbf = new RBFKernel(1.00) + val poly = new PolynomialKernel(5, 1.5) + val mappedData = SVMKernel.indexedRDD(testRDD) + + mappedData.cache() + val kernelMatrixpoly = poly.buildKernelMatrixasRDD(mappedData, nPoints) + val kernelMatrixRBF = rbf.buildKernelMatrixasRDD(mappedData, nPoints) + + assert(mappedData.count() == nPoints) + val mappedFeaturespoly = poly.featureMapping( + kernelMatrixpoly.eigenDecomposition(99) + )(mappedData)(mappedData) + val mappedFeaturesrbf = rbf.featureMapping( + kernelMatrixRBF.eigenDecomposition(99) + )(mappedData)(mappedData) + + assert(mappedFeaturespoly.filter((point) => point._2.features.size == 99).count() == 100) + assert(mappedFeaturesrbf.filter((point) => point._2.features.size == 99).count() == 100) + } + + test("Testing optimal bandwidth calculation on Gaussian Kernel" + + " and maximum entropy subset selection"){ + val nPoints = 1000 + val subsetSize = 100 + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + val newtestRDD = testRDD.map((p) => p.features) + newtestRDD.cache() + val kern = new GaussianDensityKernel() + kern.optimalBandwidth(newtestRDD) + assert(kern.eval(newtestRDD.first()) != Double.NaN) + + val newIndexedRDD = SVMKernel.indexedRDD(newtestRDD) + newIndexedRDD.cache() + newtestRDD.unpersist() + + val entropy = new QuadraticRenyiEntropy(kern) + val subsetsel = new GreedyEntropySelector(entropy) + + val subsetRDD = subsetsel.selectPrototypes( + SVMKernel.indexedRDD(testRDD), + subsetSize) + + assert(subsetRDD.count() == subsetSize) + } + + test("Testing rbf kernel with subset selection and feature map extraction") { + val nPoints = 1000 + val nDimensions = 5 + val subsetSize = 100 + val unZip = SVMKernel.unzipIndexedData _ + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput( + A, + Array[Double](B, C), + nPoints, + 42) + + val testRDD = sc.parallelize(testData, 2) + + val newtestRDD = testRDD.map(_.features) + newtestRDD.cache() + val kern = new GaussianDensityKernel() + kern.optimalBandwidth(newtestRDD) + newtestRDD.unpersist() + val mappedData = SVMKernel.indexedRDD(testRDD) + mappedData.cache() + + val entropy = new QuadraticRenyiEntropy(kern) + val subsetsel = new GreedyEntropySelector(entropy) + val subsetRDD = subsetsel.selectPrototypes( + mappedData, + subsetSize) + + val rbf = new RBFKernel(0.8) + subsetRDD.cache() + + val kernelMatrixRBF = rbf.buildKernelMatrixasRDD( + SVMKernel.indexedRDD(unZip(subsetRDD)), + subsetSize) + + val featureMap = rbf.featureMapping( + kernelMatrixRBF.eigenDecomposition(nDimensions) + )(subsetRDD) _ + + val mappedFeaturesrbf = featureMap(mappedData) + + mappedFeaturesrbf.cache() + mappedData.unpersist() + + assert(mappedFeaturesrbf.count() == nPoints) + assert(mappedFeaturesrbf.first()._2.features.size == nDimensions) + + } +}