diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 9164c294ac7b..6b88738a894e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -232,11 +232,11 @@ class PythonMLLibAPI extends Serializable { def trainNaiveBayes( data: JavaRDD[LabeledPoint], lambda: Double): java.util.List[java.lang.Object] = { - val model = NaiveBayes.train(data.rdd, lambda) + // val model = NaiveBayes.train(data.rdd, lambda, "local") val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(Vectors.dense(model.labels)) - ret.add(Vectors.dense(model.pi)) - ret.add(model.theta) + // ret.add(Vectors.dense(model.labels)) + // ret.add(Vectors.dense(model.pi)) + // ret.add(model.theta) ret } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 8c8e4a161aa5..bf1ad47db732 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,26 +17,44 @@ package org.apache.spark.mllib.classification +import scala.reflect.ClassTag + import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} +import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.{SparkException, Logging} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** - * Model for Naive Bayes Classifiers. + * Abstract model for a naive bayes classifier. + */ +abstract class NaiveBayesModel extends ClassificationModel with Serializable { + /** + * Predict values for the given data set using the trained model. + * + * @param testData PairRDD with values representing data points to be predicted + * @return an RDD[(K, Double)] where each entry contains the corresponding prediction, + * partitioned consistently with testData. + */ + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] +} + +/** + * Local model for a naive bayes classifier. * * @param labels list of labels * @param pi log of class priors, whose dimension is C, number of labels * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features */ -class NaiveBayesModel private[mllib] ( +private class LocalNaiveBayesModel( val labels: Array[Double], val pi: Array[Double], - val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { + val theta: Array[Array[Double]]) extends NaiveBayesModel { private val brzPi = new BDV[Double](pi) private val brzTheta = new BDM[Double](theta.length, theta(0).length) @@ -54,7 +72,7 @@ class NaiveBayesModel private[mllib] ( } } - override def predict(testData: RDD[Vector]): RDD[Double] = { + def predict(testData: RDD[Vector]): RDD[Double] = { val bcModel = testData.context.broadcast(this) testData.mapPartitions { iter => val model = bcModel.value @@ -62,9 +80,52 @@ class NaiveBayesModel private[mllib] ( } } - override def predict(testData: Vector): Double = { + def predict(testData: Vector): Double = { labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) } + + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = { + val bcModel = testData.context.broadcast(this) + testData.mapValues { test => + bcModel.value.predict(test) + } + } +} + +/** + * Distributed model for a naive bayes classifier. + * + * @param model RDD of (label, pi, theta) rows comprising the model. + */ +private class DistNaiveBayesModel(val model: RDD[(Double, Double, BDV[Double])]) + extends NaiveBayesModel { + + def predict(testData: RDD[Vector]): RDD[Double] = { + val indexed = testData.zipWithIndex().map(_.swap) + // Predict, reorder the results to match the input order, then project the labels. + predictValues(indexed).sortByKey().map(_._2) + } + + def predict(testData: Vector): Double = { + val testBreeze = testData.toBreeze + model.map { case (label, pi, theta) => + (pi + theta.dot(testBreeze), label) + }.max._2 + } + + def predictValues[K: ClassTag](testData: RDD[(K, Vector)]): RDD[(K, Double)] = { + // Pair each test data point with all model rows. + val testXModel = testData.mapValues(_.toBreeze).cartesian(model) + + // Compute the posterior distribution for every test point. + val posterior = testXModel.map { case ((key, test), (label, pi, theta)) => + (key, (pi + theta.dot(test), label)) + } + + // Find the maximum a posteriori value for each test data point, then project labels. + val partitioner = testData.partitioner.getOrElse(defaultPartitioner(posterior)) + posterior.reduceByKey(partitioner, Ordering[(Double, Double)].max _).mapValues(_._2) + } } /** @@ -77,6 +138,8 @@ class NaiveBayesModel private[mllib] ( */ class NaiveBayes private (private var lambda: Double) extends Serializable with Logging { + private var distMode = "local" + def this() = this(1.0) /** Set the smoothing parameter. Default: 1.0. */ @@ -85,6 +148,12 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with this } + /** Set the model distribution mode, either "local" or "dist" (for distributed). */ + def setDistMode(distMode: String): NaiveBayes = { + this.distMode = distMode + this + } + /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * @@ -103,10 +172,8 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with } } - // Aggregates term frequencies per label. - // TODO: Calling combineByKey and collect creates two stages, we can implement something - // TODO: similar to reduceByKeyLocally to save one stage. - val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( + // Sum the document counts and feature frequencies for each label. + val labelAggregates = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( createCombiner = (v: Vector) => { requireNonnegativeValues(v) (1L, v.toBreeze.toDenseVector) @@ -117,7 +184,20 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with }, mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) => (c1._1 + c2._1, c1._2 += c2._2) - ).collect() + ) + + distMode match { + case "local" => trainLocalModel(labelAggregates) + case "dist" => trainDistModel(labelAggregates) + case _ => + throw new SparkException(s"Naive Bayes requires a valid distMode but found $distMode.") + } + } + + private def trainLocalModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = { + // TODO: Calling combineByKey and collect creates two stages, we can implement something + // TODO: similar to reduceByKeyLocally to save one stage. + val aggregated = labelAggregates.collect() val numLabels = aggregated.length var numDocuments = 0L aggregated.foreach { case (_, (n, _)) => @@ -141,7 +221,28 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with i += 1 } - new NaiveBayesModel(labels, pi, theta) + new LocalNaiveBayesModel(labels, pi, theta) + } + + private def trainDistModel(labelAggregates: RDD[(Double, (Long, BDV[Double]))]) = { + // Compute the model's prior (pi) value and conditional (theta) vector for each label. + // NOTE In contrast to the local trainer, the piLogDenom normalization term is omitted here. + // Computing this term requires an additional aggregation on 'aggregated', and because the + // term is an additive constant it does not affect maximum a posteriori model prediction. + val model = labelAggregates.map { case (label, (numDocuments, sumFeatures)) => + val pi = math.log(numDocuments + lambda) + val thetaLogDenom = math.log(brzSum(sumFeatures) + sumFeatures.length * lambda) + val theta = new Array[Double](sumFeatures.length) + sumFeatures.iterator.map(f => math.log(f._2 + lambda) - thetaLogDenom).copyToArray(theta) + (label, pi, new BDV[Double](theta)) + } + + // Materialize and persist the model, check that it is nonempty. + if (model.persist(StorageLevel.MEMORY_AND_DISK).count() == 0) { + throw new SparkException("Naive Bayes requires a nonempty training RDD.") + } + + new DistNaiveBayesModel(model) } } @@ -177,8 +278,9 @@ object NaiveBayes { * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. * @param lambda The smoothing parameter + * @param distMode The model distribution mode, either "local" or "dist" (for distributed) */ - def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { - new NaiveBayes(lambda).run(input) + def train(input: RDD[LabeledPoint], lambda: Double, distMode: String): NaiveBayesModel = { + new NaiveBayes(lambda).setDistMode(distMode).run(input) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index 1c90522a0714..7c47fc0942c5 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -84,7 +84,7 @@ public void runUsingStaticMethods() { int numAccurate1 = validatePrediction(POINTS, model1); Assert.assertEquals(POINTS.size(), numAccurate1); - NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5); + NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5, "local"); int numAccurate2 = validatePrediction(POINTS, model2); Assert.assertEquals(POINTS.size(), numAccurate2); } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 80989bc074e8..5d205cfa9497 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -21,10 +21,13 @@ import scala.util.Random import org.scalatest.FunSuite +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkContext._ import org.apache.spark.SparkException -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext} +import org.apache.spark.rdd.RDD object NaiveBayesSuite { @@ -63,12 +66,19 @@ object NaiveBayesSuite { class NaiveBayesSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { - val numOfPredictions = predictions.zip(input).count { + val numOfWrongPredictions = predictions.zip(input).count { case (prediction, expected) => prediction != expected.label } - // At least 80% of the predictions should be on. - assert(numOfPredictions < input.length / 5) + // At least 80% of the predictions should be correct. + assert(numOfWrongPredictions < input.length / 5) + } + + def validatePairPrediction(predictions: RDD[(Long, Double)], input: RDD[(Long, LabeledPoint)]) { + assert(predictions.partitioner == input.partitioner) + assert(predictions.sortByKey().keys.collect().deep == input.sortByKey().keys.collect().deep) + val joined = predictions.join(input).values + validatePrediction(joined.map(_._1).collect(), joined.map(_._2).collect()) } test("Naive Bayes") { @@ -95,6 +105,63 @@ class NaiveBayesSuite extends FunSuite with LocalSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test prediction on PairRDD. + val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy( + new HashPartitioner(2)) + val predicted = model.predictValues(validationPairRDD.mapValues(_.features)) + validatePairPrediction(predicted, validationPairRDD) + } + + test("distributed naive bayes") { + val nPoints = 10000 + val nLabels = 10 + val nFeatures = 30 + + def logNormalize(s: Seq[Int]) = { + s.map(_.toDouble / s.sum).map(math.log) + } + + val pi = logNormalize(1 to nLabels).toArray + val theta = (for(l <- 1 to nLabels; f <- 1 to nFeatures) + yield if (f == l) 1000 else 1 // Each label is dominated by a different feature. + ).grouped(nFeatures).map(logNormalize).map(_.toArray).toArray + + val trainData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) + val trainRDD = sc.parallelize(trainData, 1) + trainRDD.cache() + + val model = NaiveBayes.train(trainRDD, 1.0, "dist") + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + val shortValData = validationData.take(nPoints / 100) + validatePrediction(shortValData.map(row => model.predict(row.features)), shortValData) + + // Test prediction on PairRDD. + val validationPairRDD = validationRDD.zipWithUniqueId().map(_.swap).partitionBy( + new HashPartitioner(2)) + val predicted = model.predictValues(validationPairRDD.mapValues(_.features)) + validatePairPrediction(predicted, validationPairRDD) + } + + test("distributed naive bayes with empty train RDD") { + val emptyTrainRDD = sc.parallelize(new Array[LabeledPoint](0), 2) + intercept[SparkException] { + NaiveBayes.train(emptyTrainRDD, 1.0, "dist") + } + } + + test("distributed naive bayes with empty test RDD") { + val trainRDD = sc.parallelize(LabeledPoint(1.0, Vectors.dense(2.0)) :: Nil, 2) + val model = NaiveBayes.train(trainRDD, 1.0, "dist") + val emptyTestRDD = sc.parallelize(new Array[Vector](0), 2) + assert(model.predict(emptyTestRDD).count == 0) } test("detect negative values") {