Skip to content
16 changes: 14 additions & 2 deletions docs/mllib-statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,16 @@ user tests against the normal distribution (`distName="norm"`), but does not pro
parameters, the test initializes to the standard normal distribution and logs an appropriate
message.

There is also a 2-sample, 2-sided implementation available, which tests the null hypothesis that the
2 samples are drawn from the same distribution. It is worth noting that the test assumes that all
elements are unique, both within and across the 2 samples, and thus no ranking ties should occur.
Given that the test is for continuous distributions this should not be an onerous requirement.

<div class="codetabs">
<div data-lang="scala" markdown="1">
[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to
run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run
and interpret the hypothesis tests.
run a 1-sample, 2-sided Kolmogorov-Smirnov test and a 2-sample, 2-sided Kolmogorv-Smirnov test.
The following example demonstrates how to run and interpret the hypothesis tests.

{% highlight scala %}
import org.apache.spark.SparkContext
Expand All @@ -452,6 +457,13 @@ println(testResult) // summary of the test including the p-value, test statistic
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)

val data2: RDD[Double] = ... // another RDD of sample data
// run a KS test for data vs data 2
// this corresponds to a 2-sample test
// the statistic provides a test for the null hypothesis that both samples are drawn from the
// same distribution
val ksTestResult2 = Statistics.kolmogorovSmirnovTest2Sample(data, data2)
{% endhighlight %}
</div>
</div>
Expand Down
13 changes: 13 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,17 @@ object Statistics {
params: Double*): KolmogorovSmirnovTestResult = {
kolmogorovSmirnovTest(data.rdd.asInstanceOf[RDD[Double]], distName, params: _*)
}

/**
* Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality
* The null hypothesis is that both samples come from the same distribution
* @param data1 `RDD[Double]` first sample of data
* @param data2 `RDD[Double]` second sample of data
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
* statistic, p-value, and null hypothesis
*/
def kolmogorovSmirnovTest2Sample(data1: RDD[Double], data2: RDD[Double])
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testTwoSamples(data1, data2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,26 @@ import org.apache.spark.rdd.RDD
* partition, we can collect and operate locally. Locally, we can now adjust each distance by the
* appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
* thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
*
* In the case of the 2-sample variant, the approach is slightly different. We calculate 2
* empirical CDFs corresponding to the distribution under sample 1 and under sample 2. Within each
* partition, we can calculate the maximum difference of the local empirical CDFs, which is off from
* the global value by some constant. Similarly to the 1-sample variant, we can simply adjust this
* difference once we have collected the possible candidate extrema. However, in this case we don't
* collect the number of elements in a partition, but rather an adjustment constant, that we can
* cumulatively sum once we've collected results on the driver, and that when divided by
* |sample 1| * |sample 2| provides the adjustment necessary to the difference between the 2
* empirical CDFs in a given partition and thus the adjustment necessary to the potential extrema
* candidates. The constant that we collect per partition thus corresponds to
* |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|.
*/
private[stat] object KolmogorovSmirnovTest extends Logging {

// Null hypothesis for the type of KS test to be included in the result.
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val OneSampleTwoSided = Value("Sample follows theoretical distribution")
val TwoSampleTwoSided = Value("Both samples follow same distribution")
}

/**
Expand Down Expand Up @@ -190,5 +203,109 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}

/**
* Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples
* come from the same distribution
* @param data1 `RDD[Double]` first sample of data
* @param data2 `RDD[Double]` second sample of data
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
* statistic, p-value, and appropriate null hypothesis
*/
def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = {
val n1 = data1.count().toDouble
val n2 = data2.count().toDouble
// identifier for sample 1, needed after co-sort
val isSample1 = true
// combine identified samples
val unionedData = data1.map((_, isSample1)).union(data2.map((_, !isSample1)))
// co-sort and operate on each partition, returning local extrema to the driver
val localData = unionedData.sortByKey().mapPartitions(
searchTwoSampleCandidates(_, n1, n2)
).collect()
// result: global extreme
val ksStat = searchTwoSampleStatistic(localData, n1 * n2)
evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
}

/**
* Calculates maximum distance candidates and counts of elements from each sample within one
* partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation. Function
* is package private for testing convenience.
* @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs,
* each element is additionally tagged with a boolean flag for sample 1 membership
* @param n1 `Double` sample 1 size
* @param n2 `Double` sample 2 size
* @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum
* distance, the second is an unadjusted maximum distance (both of which will later
* be adjusted by a constant to account for elements in prior partitions), and the third is
* a count corresponding to the numerator of the adjustment constant coming from this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a little clarification in the top level documentation for the class on the approach. I.e. so users can understand what the adjustment constant is, as well as what the denominator is that will be put under the numerator calculated here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for feedback

* partition. This last value, the numerator of the adjustment constant, is calculated as
* |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|. This comes
* from the fact that when we adjust for prior partitions, what we are doing is
* adding the difference of the fractions (|prior elements in sample 1| / |sample 1| -
* |prior elements in sample 2| / |sample 2|). We simply keep track of the numerator
* portion that is attributable to each partition so that following partitions can
* use it to cumulatively adjust their values.
*/
private[stat] def searchTwoSampleCandidates(
partData: Iterator[(Double, Boolean)],
n1: Double,
n2: Double): Iterator[(Double, Double, Double)] = {
// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2
case class ExtremaAndRunningIndices(min: Double, max: Double, ix1: Int, ix2: Int)
val initAcc = ExtremaAndRunningIndices(Double.MaxValue, Double.MinValue, 0, 0)
// traverse the data in the partition and calculate distances and counts
val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) =>
val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
val cdf1 = (acc.ix1 + add1) / n1
val cdf2 = (acc.ix2 + add2) / n2
val dist = cdf1 - cdf2
ExtremaAndRunningIndices(
math.min(acc.min, dist),
math.max(acc.max, dist),
acc.ix1 + add1, acc.ix2 + add2
)
}
// If partition has no data, then pResults will match the fold accumulator
// we must filter this out to avoid having the statistic spoiled by the accumulation values
val results = if (pResults == initAcc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do things go bad if we don't special case here? Or it's just less efficient?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things go bad. The accumulator for the fold has Double.MaxValue and Double.MinValue, to allow proper accumulator of minimums and maximums, respectively. If the partition is empty, the foldLeft still returns the accumulator. So if we don't check and replace with an empty array (on line 247), then the statistic is ruined by the max double.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a comment on what situations we would expect to hit this case in?

Array[(Double, Double, Double)]()
} else {
Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1))
}
results.iterator
}

/**
* Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to
* the two-sample, two-sided Kolmogorov-Smirnov test. Function is package private for testing
* convenience.
* @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each
* partition, along with the numerator for the necessary constant adjustments
* @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size
* of sample 2))
* @return The two-sample, two-sided Kolmogorov-Smirnov statistic
*/
private[stat] def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
: Double = {
// maximum distance and numerator for constant adjustment
val initAcc = (Double.MinValue, 0.0)
// adjust differences based on the number of elements preceding it, which should provide
// the correct distance between the 2 empirical CDFs
val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
val adjConst = prevCt / n
val dist1 = math.abs(minCand + adjConst)
val dist2 = math.abs(maxCand + adjConst)
val maxVal = Array(prevMax, dist1, dist2).max
(maxVal, prevCt + ct)
}
results._1
}

private def evalTwoSampleP(ksStat: Double, n: Int, m: Int): KolmogorovSmirnovTestResult = {
val pval = new CommonMathKolmogorovSmirnovTest().approximateP(ksStat, n, m)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.TwoSampleTwoSided.toString)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import java.util.Random

import org.apache.commons.math3.distribution.{ExponentialDistribution,
NormalDistribution, UniformRealDistribution}
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest}

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.test.ChiSqTest
import org.apache.spark.mllib.stat.test.{ChiSqTest, KolmogorovSmirnovTest}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._

Expand Down Expand Up @@ -177,7 +177,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
val sampledUnif = sc.parallelize(unifDist.sample(n), 10)

// Use a apache math commons local KS test to verify calculations
val ksTest = new KolmogorovSmirnovTest()
val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05

// Comparing a standard normal sample to a standard normal distribution
Expand Down Expand Up @@ -254,4 +254,165 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there weird edge cases here with the partitioning? E.g. where a partition has no elements? Or only elements from one sample? Can we provide tests for them?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The edge case of partitions with no elements does indeed exist, which is why we have that odd check for whether a result after folding in a partition matches the initial accumulator. The test involving nonOverlap1P and nonOverlap2P should test for situations where a partition has only 1 sample. I'll break these out into separate tests, since they probably deserve to be singled out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to test for this than using this round-about method? Particularly since the functions that would likely run into an issue (e.g. KolmogorovSmirnovTest.searchTwoSampleCandidates are currently private). Or should I expose them (just make them package private) instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If making the methods package private would simplify the test, I think it's reasonable to do so

test("2 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
// Create theoretical distributions
val stdNormalDist = new NormalDistribution(0, 1)
val normalDist = new NormalDistribution(2, 3)
val expDist = new ExponentialDistribution(0.6)

// create data samples and parallelize
val n = 10000
// local copies
val sampledStdNorm1L = stdNormalDist.sample(n)
val sampledStdNorm2L = stdNormalDist.sample(n)
val sampledNormL = normalDist.sample(n)
val sampledExpL = expDist.sample(n)
// distributed
val sampledStdNorm1P = sc.parallelize(sampledStdNorm1L, 10)
val sampledStdNorm2P = sc.parallelize(sampledStdNorm2L, 10)
val sampledNormP = sc.parallelize(sampledNormL, 10)
val sampledExpP = sc.parallelize(sampledExpL, 10)

// Use apache math commons local KS test to verify calculations
val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05

// Comparing 2 samples from same standard normal distribution
val result1 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledStdNorm2P)
val refStat1 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledStdNorm2L)
val refP1 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledStdNorm2L)
assert(result1.statistic ~== refStat1 relTol 1e-4)
assert(result1.pValue ~== refP1 relTol 1e-4)
assert(result1.pValue > pThreshold) // accept H0

// Comparing 2 samples from different normal distributions
val result2 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledNormP)
val refStat2 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledNormL)
val refP2 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledNormL)
assert(result2.statistic ~== refStat2 relTol 1e-4)
assert(result2.pValue ~== refP2 relTol 1e-4)
assert(result2.pValue < pThreshold) // reject H0

// Comparing 1 sample from normal distribution to 1 sample from exponential distribution
val result3 = Statistics.kolmogorovSmirnovTest2Sample(sampledNormP, sampledExpP)
val refStat3 = ksTest.kolmogorovSmirnovStatistic(sampledNormL, sampledExpL)
val refP3 = ksTest.kolmogorovSmirnovTest(sampledNormL, sampledExpL)
assert(result3.statistic ~== refStat3 relTol 1e-4)
assert(result3.pValue ~== refP3 relTol 1e-4)
assert(result3.pValue < pThreshold) // reject H0
}

test("2 sample Kolmogorov-Smirnov test: R implementation equivalence") {
/*
Comparing results with the R implementation of KS
> sessionInfo()
R version 3.2.0 (2015-04-16)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
> set.seed(20)
> v <- rnorm(20)
> v2 <- rnorm(20)
> v
[1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612
[7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
[13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264
[19] -0.08578219 0.38921440
> v2
[1] 0.23668737 -0.14444023 0.72222970 0.36990686 -0.24206631 -1.47206332
[7] -0.59615955 -1.14670013 -2.47463643 -0.61350858 -0.21631151 1.59014577
[13] 1.55614328 1.10845089 -1.09734184 -1.86060572 -0.91357885 1.24556891
[19] 0.08785472 0.42348190
*/
val rData1 = sc.parallelize(
Array(
1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
-0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
-0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
-0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
)
)

val rData2 = sc.parallelize(
Array(
0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
-0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
-2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
-0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
)
)

val rKSStat = 0.15
val rKSPval = 0.9831
val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2)
assert(kSCompResult.statistic ~== rKSStat relTol 1e-4)
// we're more lenient with the p-value here since the approximate p-value calculated
// by apache math commons is likely to be slightly off given the small sample size
assert(kSCompResult.pValue ~== rKSPval relTol 1e-2)
}

test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have no data") {
// we use the R data provided in the prior test
// Once we have combined and sorted we partitino with a larger number than
// the number of elements to guarantee we have empty partitions.
// We test various critical package private functions in this circumstance.
val rData1 = Array(
1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
-0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
-0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
-0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
)

val rData2 = Array(
0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
-0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
-2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
-0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
)


val n1 = rData1.length
val n2 = rData2.length
val unioned = (rData1.map((_, true)) ++ rData2.map((_, false))).sortBy(_._1)
val parallel = sc.parallelize(unioned, 100)
// verify that there are empty partitions
assert(parallel.mapPartitions(x => Array(x.size).iterator).collect().contains(0))
val localExtrema = parallel.mapPartitions(
KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n1, n2)
).collect()
val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n1 * n2)

val rKSStat = 0.15
assert(ksCompStat ~== rKSStat relTol 1e-4)
}

test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have only 1 sample") {
// Creating 2 samples that don't overlap and request a large number of partitions to guarantee
// that there will be partitions with only data from 1 sample. We test critical helper
// functions in these circumstances.
val n = 100
val lower = (1 to n).toArray.map(_.toDouble)
val upper = (1 to n).toArray.map(n + _.toDouble * 100)

val unioned = (lower.map((_, true)) ++ upper.map((_, false))).sortBy(_._1)
val parallel = sc.parallelize(unioned, 200)
// verify that there is at least 1 partition with only 1 sample
assert(parallel.mapPartitions(x =>
Array(x.toArray.map(_._1).distinct.length).iterator
).collect().contains(1)
)
val localExtrema = parallel.mapPartitions(
KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n, n)
).collect()
val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n * n)

// Use apache math commons local KS test to verify calculations
val ksTest = new CommonMathKolmogorovSmirnovTest()

val refStat4 = ksTest.kolmogorovSmirnovStatistic(lower, upper)
assert(ksCompStat ~== refStat4 relTol 1e-3)
}
}