Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,60 @@ class NaiveBayesModel private[mllib] (
override def predict(testData: Vector): Double = {
modelType match {
case Multinomial =>
val prob = thetaMatrix.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
val prob = multinomialCalculation(testData)
labels(prob.argmax)
case Bernoulli =>
testData.foreachActive { (index, value) =>
if (value != 0.0 && value != 1.0) {
throw new SparkException(
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
}
}
val prob = thetaMinusNegTheta.get.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
BLAS.axpy(1.0, negThetaSum.get, prob)
val prob = bernoulliCalculation(testData)
labels(prob.argmax)
case _ =>
// This should never happen.
throw new UnknownError(s"Invalid modelType: $modelType.")
}
}

def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predictProbabilities)
}
}

def predictProbabilities(testData: Vector): Vector = {
modelType match {
case Multinomial =>
val prob = multinomialCalculation(testData)
posteriorProbabilities(prob)
case Bernoulli =>
val prob = bernoulliCalculation(testData)
posteriorProbabilities(prob)
}
}

private def multinomialCalculation(testData: Vector): DenseVector = {
val prob = thetaMatrix.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
prob
}

private def bernoulliCalculation(testData: Vector): DenseVector = {
testData.foreachActive { (index, value) =>
if (value != 0.0 && value != 1.0) {
throw new SparkException(
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
}
}
val prob = thetaMinusNegTheta.get.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
BLAS.axpy(1.0, negThetaSum.get, prob)
prob
}

private def posteriorProbabilities(prob: DenseVector): Vector = {
val probArray = prob.toArray
val maxLog = probArray.max
val probabilities = probArray.map(lp => math.exp(lp - maxLog))
val probSum = probabilities.sum
new DenseVector(labels.zip(probabilities.map(_ / probSum)).sortBy(_._1).map(_._2))
}

override def save(sc: SparkContext, path: String): Unit = {
val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType)
NaiveBayesModel.SaveLoadV2_0.save(sc, path, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils

object NaiveBayesSuite {
Expand Down Expand Up @@ -116,6 +117,92 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
}
}

def validatePredictionsProbabilities(predictionsProbabilities: Seq[Array[Double]],
input: Seq[LabeledPoint],
modelType: String = Multinomial) = {
predictionsProbabilities.foreach { probabilities =>
val sum = probabilities.sum
// Check that prediction probabilities sum up to one
// with an epsilon of 10^-5
assert(sum ~== 1.0 relTol 0.00001)
}

val wrongPredictions = predictionsProbabilities.zip(input).count {
case (prediction, expected) =>
prediction.indexOf(prediction.max).toDouble != expected.label
}
// At least 80% of the predictions should be on.
assert(wrongPredictions < input.length / 5)

comparePosteriorsWithR(predictionsProbabilities.take(10), modelType)
}

/**
* The following is the instruction to reproduce the model using R's e1071 package.
*
* First of all, using the following scala code to save the data into `path`.
*
* testRDD.map { x =>
* s"${x.label}, ${x.features.toArray.mkString(", ")}"
* }.saveAsTextFile("path")
*
* Using the following R code to load the data and train the model using e1071 package.
*
* library(e1071)
* data <- read.csv("path", header = FALSE)
* labels <- factor(data$V1)
* features <- data.frame(data$V2, data$V3, data$V4, data$V5)
* model <- naiveBayes(features, labels)
* predictions <- predict(model, features[1:10, -1], type = "raw")
*
*/
def comparePosteriorsWithR(predictionsProbabilities: Seq[Array[Double]],
modelType: String = Multinomial,
epsilon: Double = 0.1) = {
require(predictionsProbabilities.length == 10)

val posteriorsFromR = modelType match {
case Multinomial =>
Array(
Array(2.942994e-07, 5.467545e-11, 9.999997e-01),
Array(2.931850e-07, 4.922381e-12, 9.999997e-01),
Array(9.997708e-01, 3.424392e-06, 2.257879e-04),
Array(9.991757e-01, 6.132008e-04, 2.110877e-04),
Array(9.281650e-14, 8.199463e-17, 1.000000e+00),
Array(8.099445e-01, 3.821142e-05, 1.900173e-01),
Array(2.667884e-01, 7.331288e-01, 8.276015e-05),
Array(9.999776e-01, 2.163690e-06, 2.023486e-05),
Array(9.997814e-01, 2.441990e-06, 2.161960e-04),
Array(8.850206e-14, 6.692205e-18, 1.000000e+00)
)
case Bernoulli =>
Array(
Array(1.048099e-09, 1.000000e+00, 1.578642e-09),
Array(1.831993e-19, 9.999999e-01, 1.190036e-07),
Array(4.664977e-12, 1.000000e+00, 1.291666e-12),
Array(3.224249e-11, 2.433594e-02, 9.756641e-01),
Array(9.610916e-01, 1.256859e-13, 3.890841e-02),
Array(8.318820e-01, 1.097496e-01, 5.836849e-02),
Array(8.318820e-01, 1.097496e-01, 5.836849e-02),
Array(9.610916e-01, 1.256859e-13, 3.890841e-02),
Array(8.318820e-01, 1.097496e-01, 5.836849e-02),
Array(8.318820e-01, 1.097496e-01, 5.836849e-02)
)
}

predictionsProbabilities.zip(posteriorsFromR).foreach {
case (probs, fromR) =>
val p = probs.indexOf(probs.max)
val r = fromR.indexOf(fromR.max)
// Checking that the prediction is the same
if (p == r) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to assert this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing that @ L135, inspired from what has been done @ L99

probs.zip(fromR).foreach {
case (prob, probFromR) => assert(prob > (probFromR - epsilon) && prob < (probFromR + epsilon))
}
}
}
}

test("model types") {
assert(Multinomial === "multinomial")
assert(Bernoulli === "bernoulli")
Expand Down Expand Up @@ -154,6 +241,12 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {

// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)

// Test prediction probabilities on RDD.
validatePredictionsProbabilities(model.predictProbabilities(validationRDD.map(_.features)).map(_.toArray).collect(), validationData)

// Test prediction probabilities on Array.
validatePredictionsProbabilities(validationData.map(row => model.predictProbabilities(row.features)).map(_.toArray), validationData)
}

test("Naive Bayes Bernoulli") {
Expand Down Expand Up @@ -182,6 +275,14 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {

// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)

// Test prediction probabilities on RDD.
validatePredictionsProbabilities(model.predictProbabilities(validationRDD.map(_.features)).map(_.toArray).collect(),
validationData, Bernoulli)

// Test prediction probabilities on Array.
validatePredictionsProbabilities(validationData.map(row => model.predictProbabilities(row.features)).map(_.toArray),
validationData, Bernoulli)
}

test("detect negative values") {
Expand Down