From 4594761dd035d2d01b91fb36a9029bda9f34c4a1 Mon Sep 17 00:00:00 2001 From: Aaron Staple Date: Sun, 21 Sep 2014 22:02:28 -0700 Subject: [PATCH 1/2] [SPARK-1655][MLLIB] Add option for distributed naive bayes model. --- .../mllib/api/python/PythonMLLibAPI.scala | 8 +- .../mllib/classification/NaiveBayes.scala | 130 ++++++++++++++++-- .../classification/JavaNaiveBayesSuite.java | 2 +- .../classification/NaiveBayesSuite.scala | 53 ++++++- 4 files changed, 173 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 9164c294ac7b8..6b88738a894eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -232,11 +232,11 @@ class PythonMLLibAPI extends Serializable { def trainNaiveBayes( data: JavaRDD[LabeledPoint], lambda: Double): java.util.List[java.lang.Object] = { - val model = NaiveBayes.train(data.rdd, lambda) + // val model = NaiveBayes.train(data.rdd, lambda, "local") val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(Vectors.dense(model.labels)) - ret.add(Vectors.dense(model.pi)) - ret.add(model.theta) + // ret.add(Vectors.dense(model.labels)) + // ret.add(Vectors.dense(model.pi)) + // ret.add(model.theta) ret } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 8c8e4a161aa5b..0973069e16c40 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -24,19 +24,25 @@ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** - * Model for Naive Bayes Classifiers. + * Abstract model for a naive bayes classifier. + */ +abstract class NaiveBayesModel extends ClassificationModel with Serializable + +/** + * Local model for a naive bayes classifier. * * @param labels list of labels * @param pi log of class priors, whose dimension is C, number of labels * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features */ -class NaiveBayesModel private[mllib] ( +private class LocalNaiveBayesModel( val labels: Array[Double], val pi: Array[Double], - val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { + val theta: Array[Array[Double]]) extends NaiveBayesModel { private val brzPi = new BDV[Double](pi) private val brzTheta = new BDM[Double](theta.length, theta(0).length) @@ -67,6 +73,54 @@ class NaiveBayesModel private[mllib] ( } } +/** + * One block from a distributed model for a naive bayes classifier. The model is divided into + * blocks, each containing the complete model state for a group of labels. + * + * @param labels array of labels + * @param pi log of class priors, with dimension C, the number of labels in this block + * @param theta log of class conditional probabilities, with dimensions C-by-D, + * where D is the number of features + */ +private case class NBModelBlock(labels: Array[Double], pi: BDV[Double], theta: BDM[Double]) + +/** + * Distributed model for a naive bayes classifier. + * + * @param modelBlocks RDD of NBModelBlock, comprising the model + */ +private class DistNaiveBayesModel(val modelBlocks: RDD[NBModelBlock]) extends NaiveBayesModel { + + override def predict(testData: RDD[Vector]): RDD[Double] = { + // Pair each test data point with all model blocks. + val testXModel = testData.map(_.toBreeze).zipWithIndex().cartesian(modelBlocks) + + // Find the maximum a posteriori label for each (test_data_point, model_block) pair. + val testXModelMaxes = testXModel.map { case ((test, i), model) => { + val posterior = model.pi + model.theta * test + val maxIdx = brzArgmax(posterior) + (i, (posterior(maxIdx), model.labels(maxIdx))) + }} + + // Find the maximum for each test data point, across all model blocks. + val testMaxes = testXModelMaxes.reduceByKey(Ordering[(Double,Double)].max) + + // Reorder based on the original testData index, then project the labels. + testMaxes.sortByKey().map{ case (_, (_, label)) => label } + } + + override def predict(testData: Vector): Double = { + val testBreeze = testData.toBreeze + + // Find the max a posteriori label for each model block, then the max of these block maxes. + modelBlocks.map( m => { + val posterior = m.pi + m.theta * testBreeze + val maxIdx = brzArgmax(posterior) + (posterior(maxIdx), m.labels(maxIdx)) + }).max._2 + } +} + /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * @@ -77,6 +131,8 @@ class NaiveBayesModel private[mllib] ( */ class NaiveBayes private (private var lambda: Double) extends Serializable with Logging { + private var distMode = "local" + def this() = this(1.0) /** Set the smoothing parameter. Default: 1.0. */ @@ -85,6 +141,12 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with this } + /** Set the model distribution mode, either "local" or "dist" (for distributed). */ + def setDistMode(distMode: String) = { + this.distMode = distMode + this + } + /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * @@ -103,10 +165,8 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with } } - // Aggregates term frequencies per label. - // TODO: Calling combineByKey and collect creates two stages, we can implement something - // TODO: similar to reduceByKeyLocally to save one stage. - val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( + // Sum the document counts and feature frequencies for each label. + val labelAggregates = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( createCombiner = (v: Vector) => { requireNonnegativeValues(v) (1L, v.toBreeze.toDenseVector) @@ -117,7 +177,20 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with }, mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) => (c1._1 + c2._1, c1._2 += c2._2) - ).collect() + ) + + distMode match { + case "local" => trainLocalModel(labelAggregates) + case "dist" => trainDistModel(labelAggregates) + case _ => + throw new SparkException(s"Naive Bayes requires a valid distMode but found $distMode.") + } + } + + private def trainLocalModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = { + // TODO: Calling combineByKey and collect creates two stages, we can implement something + // TODO: similar to reduceByKeyLocally to save one stage. + val aggregated = labelAggregates.collect() val numLabels = aggregated.length var numDocuments = 0L aggregated.foreach { case (_, (n, _)) => @@ -141,7 +214,41 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with i += 1 } - new NaiveBayesModel(labels, pi, theta) + new LocalNaiveBayesModel(labels, pi, theta) + } + + private def trainDistModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = { + case class LabelAggregate(label: Double, numDocuments: Long, sumFeatures: BDV[Double]) + val aggregated = labelAggregates.map(x => LabelAggregate(x._1, x._2._1, x._2._2)) + + // Compute the model's prior (pi) vector and conditional (theta) matrix for each batch of + // labels. + // NOTE In contrast to the local trainer, the piLogDenom normalization term is omitted here. + // Computing this term requires an additional aggregation on 'aggregated', and because the + // term is an additive constant it does not affect maximum a posteriori model prediction. + val modelBlocks = aggregated.mapPartitions(p => p.grouped(100).map { batch => + val numFeatures = batch.head.sumFeatures.length + val pi = batch.map(l => math.log(l.numDocuments + lambda)) + + // Assemble values of the theta matrix in row major order. + val theta = new Array[Double](batch.length * numFeatures) + batch.flatMap( l => { + val thetaLogDenom = math.log(brzSum(l.sumFeatures) + numFeatures * lambda) + l.sumFeatures.iterator.map(f => math.log(f._2 + lambda) - thetaLogDenom) + }).copyToArray(theta) + + NBModelBlock(labels = batch.map(_.label).toArray, + pi = new BDV[Double](pi.toArray), + theta = new BDM[Double](batch.length, numFeatures, theta, + offset=0, majorStride=numFeatures, isTranspose=true)) + }) + + // Materialize and persist the model, check that it is nonempty. + if (modelBlocks.persist(StorageLevel.MEMORY_AND_DISK).count() == 0) { + throw new SparkException("Naive Bayes requires a nonempty training RDD.") + } + + new DistNaiveBayesModel(modelBlocks) } } @@ -177,8 +284,9 @@ object NaiveBayes { * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. * @param lambda The smoothing parameter + * @param distMode The model distribution mode, either "local" or "dist" (for distributed) */ - def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { - new NaiveBayes(lambda).run(input) + def train(input: RDD[LabeledPoint], lambda: Double, distMode: String): NaiveBayesModel = { + new NaiveBayes(lambda).setDistMode(distMode).run(input) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index 1c90522a0714a..7c47fc0942c51 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -84,7 +84,7 @@ public void runUsingStaticMethods() { int numAccurate1 = validatePrediction(POINTS, model1); Assert.assertEquals(POINTS.size(), numAccurate1); - NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5); + NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5, "local"); int numAccurate2 = validatePrediction(POINTS, model2); Assert.assertEquals(POINTS.size(), numAccurate2); } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 80989bc074e84..11507f5285543 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.scalatest.FunSuite import org.apache.spark.SparkException -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext} @@ -63,12 +63,12 @@ object NaiveBayesSuite { class NaiveBayesSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { - val numOfPredictions = predictions.zip(input).count { + val numOfWrongPredictions = predictions.zip(input).count { case (prediction, expected) => prediction != expected.label } - // At least 80% of the predictions should be on. - assert(numOfPredictions < input.length / 5) + // At least 80% of the predictions should be correct. + assert(numOfWrongPredictions < input.length / 5) } test("Naive Bayes") { @@ -97,6 +97,51 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } + test("distributed naive bayes") { + val nPoints = 10000 + val nLabels = 150 + val nFeatures = 300 + + def logNormalize(s: Seq[Int]) = { + s.map(_.toDouble / s.sum).map(math.log) + } + + val pi = logNormalize(1 to nLabels).toArray + val theta = (for(l <- 1 to nLabels; f <- 1 to nFeatures) + yield if (f == l) 10000 else 1 // Each label is dominated by a different feature. + ).grouped(nFeatures).map(logNormalize).map(_.toArray).toArray + + val trainData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) + val trainRDD = sc.parallelize(trainData, 1) + trainRDD.cache() + + val model = NaiveBayes.train(trainRDD, 1.0, "dist") + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + val shortValData = validationData.take(nPoints / 10) + validatePrediction(shortValData.map(row => model.predict(row.features)), shortValData) + } + + test("distributed naive bayes with empty train RDD") { + val emptyTrainRDD = sc.parallelize(new Array[LabeledPoint](0), 2) + intercept[SparkException] { + NaiveBayes.train(emptyTrainRDD, 1.0, "dist") + } + } + + test("distributed naive bayes with empty test RDD") { + val trainRDD = sc.parallelize(LabeledPoint(1.0, Vectors.dense(2.0)) :: Nil, 2) + val model = NaiveBayes.train(trainRDD, 1.0, "dist") + val emptyTestRDD = sc.parallelize(new Array[Vector](0), 2) + assert(model.predict(emptyTestRDD).count == 0) + } + test("detect negative values") { val dense = Seq( LabeledPoint(1.0, Vectors.dense(1.0)), From e535d8b7f08fb848ee5687881cbfc6e4c9e798cd Mon Sep 17 00:00:00 2001 From: Aaron Staple Date: Sat, 4 Oct 2014 14:22:44 -0700 Subject: [PATCH 2/2] Remove model batching, add predictValues. --- .../mllib/classification/NaiveBayes.scala | 120 +++++++++--------- .../classification/NaiveBayesSuite.scala | 30 ++++- 2 files changed, 83 insertions(+), 67 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 0973069e16c40..bf1ad47db7327 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,8 +17,11 @@ package org.apache.spark.mllib.classification +import scala.reflect.ClassTag + import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} +import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.{SparkException, Logging} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} @@ -29,7 +32,16 @@ import org.apache.spark.storage.StorageLevel /** * Abstract model for a naive bayes classifier. */ -abstract class NaiveBayesModel extends ClassificationModel with Serializable +abstract class NaiveBayesModel extends ClassificationModel with Serializable { + /** + * Predict values for the given data set using the trained model. + * + * @param testData PairRDD with values representing data points to be predicted + * @return an RDD[(K, Double)] where each entry contains the corresponding prediction, + * partitioned consistently with testData. + */ + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] +} /** * Local model for a naive bayes classifier. @@ -60,7 +72,7 @@ private class LocalNaiveBayesModel( } } - override def predict(testData: RDD[Vector]): RDD[Double] = { + def predict(testData: RDD[Vector]): RDD[Double] = { val bcModel = testData.context.broadcast(this) testData.mapPartitions { iter => val model = bcModel.value @@ -68,56 +80,51 @@ private class LocalNaiveBayesModel( } } - override def predict(testData: Vector): Double = { + def predict(testData: Vector): Double = { labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) } -} -/** - * One block from a distributed model for a naive bayes classifier. The model is divided into - * blocks, each containing the complete model state for a group of labels. - * - * @param labels array of labels - * @param pi log of class priors, with dimension C, the number of labels in this block - * @param theta log of class conditional probabilities, with dimensions C-by-D, - * where D is the number of features - */ -private case class NBModelBlock(labels: Array[Double], pi: BDV[Double], theta: BDM[Double]) + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = { + val bcModel = testData.context.broadcast(this) + testData.mapValues { test => + bcModel.value.predict(test) + } + } +} /** * Distributed model for a naive bayes classifier. * - * @param modelBlocks RDD of NBModelBlock, comprising the model + * @param model RDD of (label, pi, theta) rows comprising the model. */ -private class DistNaiveBayesModel(val modelBlocks: RDD[NBModelBlock]) extends NaiveBayesModel { - - override def predict(testData: RDD[Vector]): RDD[Double] = { - // Pair each test data point with all model blocks. - val testXModel = testData.map(_.toBreeze).zipWithIndex().cartesian(modelBlocks) - - // Find the maximum a posteriori label for each (test_data_point, model_block) pair. - val testXModelMaxes = testXModel.map { case ((test, i), model) => { - val posterior = model.pi + model.theta * test - val maxIdx = brzArgmax(posterior) - (i, (posterior(maxIdx), model.labels(maxIdx))) - }} +private class DistNaiveBayesModel(val model: RDD[(Double, Double, BDV[Double])]) + extends NaiveBayesModel { - // Find the maximum for each test data point, across all model blocks. - val testMaxes = testXModelMaxes.reduceByKey(Ordering[(Double,Double)].max) - - // Reorder based on the original testData index, then project the labels. - testMaxes.sortByKey().map{ case (_, (_, label)) => label } + def predict(testData: RDD[Vector]): RDD[Double] = { + val indexed = testData.zipWithIndex().map(_.swap) + // Predict, reorder the results to match the input order, then project the labels. + predictValues(indexed).sortByKey().map(_._2) } - override def predict(testData: Vector): Double = { + def predict(testData: Vector): Double = { val testBreeze = testData.toBreeze + model.map { case (label, pi, theta) => + (pi + theta.dot(testBreeze), label) + }.max._2 + } - // Find the max a posteriori label for each model block, then the max of these block maxes. - modelBlocks.map( m => { - val posterior = m.pi + m.theta * testBreeze - val maxIdx = brzArgmax(posterior) - (posterior(maxIdx), m.labels(maxIdx)) - }).max._2 + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = { + // Pair each test data point with all model rows. + val testXModel = testData.mapValues(_.toBreeze).cartesian(model) + + // Compute the posterior distribution for every test point. + val posterior = testXModel.map { case ((key, test), (label, pi, theta)) => + (key, (pi + theta.dot(test), label)) + } + + // Find the maximum a posteriori value for each test data point, then project labels. + val partitioner = testData.partitioner.getOrElse(defaultPartitioner(posterior)) + posterior.reduceByKey(partitioner, Ordering[(Double, Double)].max _).mapValues(_._2) } } @@ -142,7 +149,7 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with } /** Set the model distribution mode, either "local" or "dist" (for distributed). */ - def setDistMode(distMode: String) = { + def setDistMode(distMode: String): NaiveBayes = { this.distMode = distMode this } @@ -218,37 +225,24 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with } private def trainDistModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = { - case class LabelAggregate(label: Double, numDocuments: Long, sumFeatures: BDV[Double]) - val aggregated = labelAggregates.map(x => LabelAggregate(x._1, x._2._1, x._2._2)) - - // Compute the model's prior (pi) vector and conditional (theta) matrix for each batch of - // labels. + // Compute the model's prior (pi) value and conditional (theta) vector for each label. // NOTE In contrast to the local trainer, the piLogDenom normalization term is omitted here. // Computing this term requires an additional aggregation on 'aggregated', and because the // term is an additive constant it does not affect maximum a posteriori model prediction. - val modelBlocks = aggregated.mapPartitions(p => p.grouped(100).map { batch => - val numFeatures = batch.head.sumFeatures.length - val pi = batch.map(l => math.log(l.numDocuments + lambda)) - - // Assemble values of the theta matrix in row major order. - val theta = new Array[Double](batch.length * numFeatures) - batch.flatMap( l => { - val thetaLogDenom = math.log(brzSum(l.sumFeatures) + numFeatures * lambda) - l.sumFeatures.iterator.map(f => math.log(f._2 + lambda) - thetaLogDenom) - }).copyToArray(theta) - - NBModelBlock(labels = batch.map(_.label).toArray, - pi = new BDV[Double](pi.toArray), - theta = new BDM[Double](batch.length, numFeatures, theta, - offset=0, majorStride=numFeatures, isTranspose=true)) - }) + val model = labelAggregates.map { case (label, (numDocuments, sumFeatures)) => + val pi = math.log(numDocuments + lambda) + val thetaLogDenom = math.log(brzSum(sumFeatures) + sumFeatures.length * lambda) + val theta = new Array[Double](sumFeatures.length) + sumFeatures.iterator.map(f => math.log(f._2 + lambda) - thetaLogDenom).copyToArray(theta) + (label, pi, new BDV[Double](theta)) + } // Materialize and persist the model, check that it is nonempty. - if (modelBlocks.persist(StorageLevel.MEMORY_AND_DISK).count() == 0) { + if (model.persist(StorageLevel.MEMORY_AND_DISK).count() == 0) { throw new SparkException("Naive Bayes requires a nonempty training RDD.") } - new DistNaiveBayesModel(modelBlocks) + new DistNaiveBayesModel(model) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 11507f5285543..5d205cfa94970 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -21,10 +21,13 @@ import scala.util.Random import org.scalatest.FunSuite +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkContext._ import org.apache.spark.SparkException import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext} +import org.apache.spark.rdd.RDD object NaiveBayesSuite { @@ -71,6 +74,13 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { assert(numOfWrongPredictions < input.length / 5) } + def validatePairPrediction(predictions: RDD[(Long, Double)], input: RDD[(Long, LabeledPoint)]) { + assert(predictions.partitioner == input.partitioner) + assert(predictions.sortByKey().keys.collect().deep == input.sortByKey().keys.collect().deep) + val joined = predictions.join(input).values + validatePrediction(joined.map(_._1).collect(), joined.map(_._2).collect()) + } + test("Naive Bayes") { val nPoints = 10000 @@ -95,12 +105,18 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test prediction on PairRDD. + val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy( + new HashPartitioner(2)) + val predicted = model.predictValues(validationPairRDD.mapValues(_.features)) + validatePairPrediction(predicted, validationPairRDD) } test("distributed naive bayes") { val nPoints = 10000 - val nLabels = 150 - val nFeatures = 300 + val nLabels = 10 + val nFeatures = 30 def logNormalize(s: Seq[Int]) = { s.map(_.toDouble / s.sum).map(math.log) @@ -108,7 +124,7 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { val pi = logNormalize(1 to nLabels).toArray val theta = (for(l <- 1 to nLabels; f <- 1 to nFeatures) - yield if (f == l) 10000 else 1 // Each label is dominated by a different feature. + yield if (f == l) 1000 else 1 // Each label is dominated by a different feature. ).grouped(nFeatures).map(logNormalize).map(_.toArray).toArray val trainData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) @@ -124,8 +140,14 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - val shortValData = validationData.take(nPoints / 10) + val shortValData = validationData.take(nPoints / 100) validatePrediction(shortValData.map(row => model.predict(row.features)), shortValData) + + // Test prediction on PairRDD. + val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy( + new HashPartitioner(2)) + val predicted = model.predictValues(validationPairRDD.mapValues(_.features)) + validatePairPrediction(predicted, validationPairRDD) } test("distributed naive bayes with empty train RDD") {