diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f51ee36d0dfc..ce3cf9f0faa3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -93,26 +93,60 @@ class NaiveBayesModel private[mllib] ( override def predict(testData: Vector): Double = { modelType match { case Multinomial => - val prob = thetaMatrix.multiply(testData) - BLAS.axpy(1.0, piVector, prob) + val prob = multinomialCalculation(testData) labels(prob.argmax) case Bernoulli => - testData.foreachActive { (index, value) => - if (value != 0.0 && value != 1.0) { - throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.") - } - } - val prob = thetaMinusNegTheta.get.multiply(testData) - BLAS.axpy(1.0, piVector, prob) - BLAS.axpy(1.0, negThetaSum.get, prob) + val prob = bernoulliCalculation(testData) labels(prob.argmax) - case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") } } + def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = { + val bcModel = testData.context.broadcast(this) + testData.mapPartitions { iter => + val model = bcModel.value + iter.map(model.predictProbabilities) + } + } + + def predictProbabilities(testData: Vector): Vector = { + modelType match { + case Multinomial => + val prob = multinomialCalculation(testData) + posteriorProbabilities(prob) + case Bernoulli => + val prob = bernoulliCalculation(testData) + posteriorProbabilities(prob) + } + } + + private def multinomialCalculation(testData: Vector): DenseVector = { + val prob = thetaMatrix.multiply(testData) + BLAS.axpy(1.0, piVector, prob) + prob + } + + private def bernoulliCalculation(testData: Vector): DenseVector = { + testData.foreachActive { (index, value) => + if (value != 0.0 && value != 1.0) { + throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.") + } + } + val prob = thetaMinusNegTheta.get.multiply(testData) + BLAS.axpy(1.0, piVector, prob) + BLAS.axpy(1.0, negThetaSum.get, prob) + prob + } + + private def posteriorProbabilities(prob: DenseVector): Vector = { + val probArray = prob.toArray + val maxLog = probArray.max + val probabilities = probArray.map(lp => math.exp(lp - maxLog)) + val probSum = probabilities.sum + new DenseVector(labels.zip(probabilities.map(_ / probSum)).sortBy(_._1).map(_._2)) + } + override def save(sc: SparkContext, path: String): Unit = { val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType) NaiveBayesModel.SaveLoadV2_0.save(sc, path, data) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index f7fc8730606a..c8c820d50b15 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils object NaiveBayesSuite { @@ -116,6 +117,92 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { } } + def validatePredictionsProbabilities(predictionsProbabilities: Seq[Array[Double]], + input: Seq[LabeledPoint], + modelType: String = Multinomial) = { + predictionsProbabilities.foreach { probabilities => + val sum = probabilities.sum + // Check that prediction probabilities sum up to one + // with an epsilon of 10^-5 + assert(sum ~== 1.0 relTol 0.00001) + } + + val wrongPredictions = predictionsProbabilities.zip(input).count { + case (prediction, expected) => + prediction.indexOf(prediction.max).toDouble != expected.label + } + // At least 80% of the predictions should be on. + assert(wrongPredictions < input.length / 5) + + comparePosteriorsWithR(predictionsProbabilities.take(10), modelType) + } + + /** + * The following is the instruction to reproduce the model using R's e1071 package. + * + * First of all, using the following scala code to save the data into `path`. + * + * testRDD.map { x => + * s"${x.label}, ${x.features.toArray.mkString(", ")}" + * }.saveAsTextFile("path") + * + * Using the following R code to load the data and train the model using e1071 package. + * + * library(e1071) + * data <- read.csv("path", header = FALSE) + * labels <- factor(data$V1) + * features <- data.frame(data$V2, data$V3, data$V4, data$V5) + * model <- naiveBayes(features, labels) + * predictions <- predict(model, features[1:10, -1], type = "raw") + * + */ + def comparePosteriorsWithR(predictionsProbabilities: Seq[Array[Double]], + modelType: String = Multinomial, + epsilon: Double = 0.1) = { + require(predictionsProbabilities.length == 10) + + val posteriorsFromR = modelType match { + case Multinomial => + Array( + Array(2.942994e-07, 5.467545e-11, 9.999997e-01), + Array(2.931850e-07, 4.922381e-12, 9.999997e-01), + Array(9.997708e-01, 3.424392e-06, 2.257879e-04), + Array(9.991757e-01, 6.132008e-04, 2.110877e-04), + Array(9.281650e-14, 8.199463e-17, 1.000000e+00), + Array(8.099445e-01, 3.821142e-05, 1.900173e-01), + Array(2.667884e-01, 7.331288e-01, 8.276015e-05), + Array(9.999776e-01, 2.163690e-06, 2.023486e-05), + Array(9.997814e-01, 2.441990e-06, 2.161960e-04), + Array(8.850206e-14, 6.692205e-18, 1.000000e+00) + ) + case Bernoulli => + Array( + Array(1.048099e-09, 1.000000e+00, 1.578642e-09), + Array(1.831993e-19, 9.999999e-01, 1.190036e-07), + Array(4.664977e-12, 1.000000e+00, 1.291666e-12), + Array(3.224249e-11, 2.433594e-02, 9.756641e-01), + Array(9.610916e-01, 1.256859e-13, 3.890841e-02), + Array(8.318820e-01, 1.097496e-01, 5.836849e-02), + Array(8.318820e-01, 1.097496e-01, 5.836849e-02), + Array(9.610916e-01, 1.256859e-13, 3.890841e-02), + Array(8.318820e-01, 1.097496e-01, 5.836849e-02), + Array(8.318820e-01, 1.097496e-01, 5.836849e-02) + ) + } + + predictionsProbabilities.zip(posteriorsFromR).foreach { + case (probs, fromR) => + val p = probs.indexOf(probs.max) + val r = fromR.indexOf(fromR.max) + // Checking that the prediction is the same + if (p == r) { + probs.zip(fromR).foreach { + case (prob, probFromR) => assert(prob > (probFromR - epsilon) && prob < (probFromR + epsilon)) + } + } + } + } + test("model types") { assert(Multinomial === "multinomial") assert(Bernoulli === "bernoulli") @@ -154,6 +241,12 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test prediction probabilities on RDD. + validatePredictionsProbabilities(model.predictProbabilities(validationRDD.map(_.features)).map(_.toArray).collect(), validationData) + + // Test prediction probabilities on Array. + validatePredictionsProbabilities(validationData.map(row => model.predictProbabilities(row.features)).map(_.toArray), validationData) } test("Naive Bayes Bernoulli") { @@ -182,6 +275,14 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test prediction probabilities on RDD. + validatePredictionsProbabilities(model.predictProbabilities(validationRDD.map(_.features)).map(_.toArray).collect(), + validationData, Bernoulli) + + // Test prediction probabilities on Array. + validatePredictionsProbabilities(validationData.map(row => model.predictProbabilities(row.features)).map(_.toArray), + validationData, Bernoulli) } test("detect negative values") {