diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index be04d0b4b53a..6d96b450d2ce 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -431,11 +431,16 @@ user tests against the normal distribution (`distName="norm"`), but does not pro
parameters, the test initializes to the standard normal distribution and logs an appropriate
message.
+There is also a 2-sample, 2-sided implementation available, which tests the null hypothesis that the
+2 samples are drawn from the same distribution. It is worth noting that the test assumes that all
+elements are unique, both within and across the 2 samples, and thus no ranking ties should occur.
+Given that the test is for continuous distributions this should not be an onerous requirement.
+
[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to
-run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run
-and interpret the hypothesis tests.
+run a 1-sample, 2-sided Kolmogorov-Smirnov test and a 2-sample, 2-sided Kolmogorv-Smirnov test.
+The following example demonstrates how to run and interpret the hypothesis tests.
{% highlight scala %}
import org.apache.spark.SparkContext
@@ -452,6 +457,13 @@ println(testResult) // summary of the test including the p-value, test statistic
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
+
+val data2: RDD[Double] = ... // another RDD of sample data
+// run a KS test for data vs data 2
+// this corresponds to a 2-sample test
+// the statistic provides a test for the null hypothesis that both samples are drawn from the
+// same distribution
+val ksTestResult2 = Statistics.kolmogorovSmirnovTest2Sample(data, data2)
{% endhighlight %}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index ef8d78607048..5b61d7035fe9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -224,4 +224,17 @@ object Statistics {
params: Double*): KolmogorovSmirnovTestResult = {
kolmogorovSmirnovTest(data.rdd.asInstanceOf[RDD[Double]], distName, params: _*)
}
+
+ /**
+ * Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality
+ * The null hypothesis is that both samples come from the same distribution
+ * @param data1 `RDD[Double]` first sample of data
+ * @param data2 `RDD[Double]` second sample of data
+ * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
+ * statistic, p-value, and null hypothesis
+ */
+ def kolmogorovSmirnovTest2Sample(data1: RDD[Double], data2: RDD[Double])
+ : KolmogorovSmirnovTestResult = {
+ KolmogorovSmirnovTest.testTwoSamples(data1, data2)
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index 2b3ed6df486c..8e9652eb4466 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -46,6 +46,18 @@ import org.apache.spark.rdd.RDD
* partition, we can collect and operate locally. Locally, we can now adjust each distance by the
* appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
* thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
+ *
+ * In the case of the 2-sample variant, the approach is slightly different. We calculate 2
+ * empirical CDFs corresponding to the distribution under sample 1 and under sample 2. Within each
+ * partition, we can calculate the maximum difference of the local empirical CDFs, which is off from
+ * the global value by some constant. Similarly to the 1-sample variant, we can simply adjust this
+ * difference once we have collected the possible candidate extrema. However, in this case we don't
+ * collect the number of elements in a partition, but rather an adjustment constant, that we can
+ * cumulatively sum once we've collected results on the driver, and that when divided by
+ * |sample 1| * |sample 2| provides the adjustment necessary to the difference between the 2
+ * empirical CDFs in a given partition and thus the adjustment necessary to the potential extrema
+ * candidates. The constant that we collect per partition thus corresponds to
+ * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|.
*/
private[stat] object KolmogorovSmirnovTest extends Logging {
@@ -53,6 +65,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val OneSampleTwoSided = Value("Sample follows theoretical distribution")
+ val TwoSampleTwoSided = Value("Both samples follow same distribution")
}
/**
@@ -190,5 +203,109 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}
+
+ /**
+ * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples
+ * come from the same distribution
+ * @param data1 `RDD[Double]` first sample of data
+ * @param data2 `RDD[Double]` second sample of data
+ * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+ * statistic, p-value, and appropriate null hypothesis
+ */
+ def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = {
+ val n1 = data1.count().toDouble
+ val n2 = data2.count().toDouble
+ // identifier for sample 1, needed after co-sort
+ val isSample1 = true
+ // combine identified samples
+ val unionedData = data1.map((_, isSample1)).union(data2.map((_, !isSample1)))
+ // co-sort and operate on each partition, returning local extrema to the driver
+ val localData = unionedData.sortByKey().mapPartitions(
+ searchTwoSampleCandidates(_, n1, n2)
+ ).collect()
+ // result: global extreme
+ val ksStat = searchTwoSampleStatistic(localData, n1 * n2)
+ evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+ }
+
+ /**
+ * Calculates maximum distance candidates and counts of elements from each sample within one
+ * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation. Function
+ * is package private for testing convenience.
+ * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs,
+ * each element is additionally tagged with a boolean flag for sample 1 membership
+ * @param n1 `Double` sample 1 size
+ * @param n2 `Double` sample 2 size
+ * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum
+ * distance, the second is an unadjusted maximum distance (both of which will later
+ * be adjusted by a constant to account for elements in prior partitions), and the third is
+ * a count corresponding to the numerator of the adjustment constant coming from this
+ * partition. This last value, the numerator of the adjustment constant, is calculated as
+ * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|. This comes
+ * from the fact that when we adjust for prior partitions, what we are doing is
+ * adding the difference of the fractions (|prior elements in sample 1| / |sample 1| -
+ * |prior elements in sample 2| / |sample 2|). We simply keep track of the numerator
+ * portion that is attributable to each partition so that following partitions can
+ * use it to cumulatively adjust their values.
+ */
+ private[stat] def searchTwoSampleCandidates(
+ partData: Iterator[(Double, Boolean)],
+ n1: Double,
+ n2: Double): Iterator[(Double, Double, Double)] = {
+ // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2
+ case class ExtremaAndRunningIndices(min: Double, max: Double, ix1: Int, ix2: Int)
+ val initAcc = ExtremaAndRunningIndices(Double.MaxValue, Double.MinValue, 0, 0)
+ // traverse the data in the partition and calculate distances and counts
+ val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) =>
+ val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+ val cdf1 = (acc.ix1 + add1) / n1
+ val cdf2 = (acc.ix2 + add2) / n2
+ val dist = cdf1 - cdf2
+ ExtremaAndRunningIndices(
+ math.min(acc.min, dist),
+ math.max(acc.max, dist),
+ acc.ix1 + add1, acc.ix2 + add2
+ )
+ }
+ // If partition has no data, then pResults will match the fold accumulator
+ // we must filter this out to avoid having the statistic spoiled by the accumulation values
+ val results = if (pResults == initAcc) {
+ Array[(Double, Double, Double)]()
+ } else {
+ Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1))
+ }
+ results.iterator
+ }
+
+ /**
+ * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to
+ * the two-sample, two-sided Kolmogorov-Smirnov test. Function is package private for testing
+ * convenience.
+ * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each
+ * partition, along with the numerator for the necessary constant adjustments
+ * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size
+ * of sample 2))
+ * @return The two-sample, two-sided Kolmogorov-Smirnov statistic
+ */
+ private[stat] def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
+ : Double = {
+ // maximum distance and numerator for constant adjustment
+ val initAcc = (Double.MinValue, 0.0)
+ // adjust differences based on the number of elements preceding it, which should provide
+ // the correct distance between the 2 empirical CDFs
+ val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
+ val adjConst = prevCt / n
+ val dist1 = math.abs(minCand + adjConst)
+ val dist2 = math.abs(maxCand + adjConst)
+ val maxVal = Array(prevMax, dist1, dist2).max
+ (maxVal, prevCt + ct)
+ }
+ results._1
+ }
+
+ private def evalTwoSampleP(ksStat: Double, n: Int, m: Int): KolmogorovSmirnovTestResult = {
+ val pval = new CommonMathKolmogorovSmirnovTest().approximateP(ksStat, n, m)
+ new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.TwoSampleTwoSided.toString)
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
index 142b90e764a7..f7ef843d600f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala
@@ -21,12 +21,12 @@ import java.util.Random
import org.apache.commons.math3.distribution.{ExponentialDistribution,
NormalDistribution, UniformRealDistribution}
-import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest}
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.stat.test.ChiSqTest
+import org.apache.spark.mllib.stat.test.{ChiSqTest, KolmogorovSmirnovTest}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -177,7 +177,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
// Use a apache math commons local KS test to verify calculations
- val ksTest = new KolmogorovSmirnovTest()
+ val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05
// Comparing a standard normal sample to a standard normal distribution
@@ -254,4 +254,165 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
}
+
+ test("2 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
+ // Create theoretical distributions
+ val stdNormalDist = new NormalDistribution(0, 1)
+ val normalDist = new NormalDistribution(2, 3)
+ val expDist = new ExponentialDistribution(0.6)
+
+ // create data samples and parallelize
+ val n = 10000
+ // local copies
+ val sampledStdNorm1L = stdNormalDist.sample(n)
+ val sampledStdNorm2L = stdNormalDist.sample(n)
+ val sampledNormL = normalDist.sample(n)
+ val sampledExpL = expDist.sample(n)
+ // distributed
+ val sampledStdNorm1P = sc.parallelize(sampledStdNorm1L, 10)
+ val sampledStdNorm2P = sc.parallelize(sampledStdNorm2L, 10)
+ val sampledNormP = sc.parallelize(sampledNormL, 10)
+ val sampledExpP = sc.parallelize(sampledExpL, 10)
+
+ // Use apache math commons local KS test to verify calculations
+ val ksTest = new CommonMathKolmogorovSmirnovTest()
+ val pThreshold = 0.05
+
+ // Comparing 2 samples from same standard normal distribution
+ val result1 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledStdNorm2P)
+ val refStat1 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledStdNorm2L)
+ val refP1 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledStdNorm2L)
+ assert(result1.statistic ~== refStat1 relTol 1e-4)
+ assert(result1.pValue ~== refP1 relTol 1e-4)
+ assert(result1.pValue > pThreshold) // accept H0
+
+ // Comparing 2 samples from different normal distributions
+ val result2 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledNormP)
+ val refStat2 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledNormL)
+ val refP2 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledNormL)
+ assert(result2.statistic ~== refStat2 relTol 1e-4)
+ assert(result2.pValue ~== refP2 relTol 1e-4)
+ assert(result2.pValue < pThreshold) // reject H0
+
+ // Comparing 1 sample from normal distribution to 1 sample from exponential distribution
+ val result3 = Statistics.kolmogorovSmirnovTest2Sample(sampledNormP, sampledExpP)
+ val refStat3 = ksTest.kolmogorovSmirnovStatistic(sampledNormL, sampledExpL)
+ val refP3 = ksTest.kolmogorovSmirnovTest(sampledNormL, sampledExpL)
+ assert(result3.statistic ~== refStat3 relTol 1e-4)
+ assert(result3.pValue ~== refP3 relTol 1e-4)
+ assert(result3.pValue < pThreshold) // reject H0
+ }
+
+ test("2 sample Kolmogorov-Smirnov test: R implementation equivalence") {
+ /*
+ Comparing results with the R implementation of KS
+ > sessionInfo()
+ R version 3.2.0 (2015-04-16)
+ Platform: x86_64-apple-darwin13.4.0 (64-bit)
+ > set.seed(20)
+ > v <- rnorm(20)
+ > v2 <- rnorm(20)
+ > v
+ [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612
+ [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
+ [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264
+ [19] -0.08578219 0.38921440
+ > v2
+ [1] 0.23668737 -0.14444023 0.72222970 0.36990686 -0.24206631 -1.47206332
+ [7] -0.59615955 -1.14670013 -2.47463643 -0.61350858 -0.21631151 1.59014577
+ [13] 1.55614328 1.10845089 -1.09734184 -1.86060572 -0.91357885 1.24556891
+ [19] 0.08785472 0.42348190
+ */
+ val rData1 = sc.parallelize(
+ Array(
+ 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
+ -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
+ -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
+ -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
+ 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
+ )
+ )
+
+ val rData2 = sc.parallelize(
+ Array(
+ 0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
+ -0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
+ -2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
+ 1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
+ -0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
+ )
+ )
+
+ val rKSStat = 0.15
+ val rKSPval = 0.9831
+ val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2)
+ assert(kSCompResult.statistic ~== rKSStat relTol 1e-4)
+ // we're more lenient with the p-value here since the approximate p-value calculated
+ // by apache math commons is likely to be slightly off given the small sample size
+ assert(kSCompResult.pValue ~== rKSPval relTol 1e-2)
+ }
+
+ test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have no data") {
+ // we use the R data provided in the prior test
+ // Once we have combined and sorted we partitino with a larger number than
+ // the number of elements to guarantee we have empty partitions.
+ // We test various critical package private functions in this circumstance.
+ val rData1 = Array(
+ 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
+ -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
+ -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
+ -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
+ 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
+ )
+
+ val rData2 = Array(
+ 0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
+ -0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
+ -2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
+ 1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
+ -0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
+ )
+
+
+ val n1 = rData1.length
+ val n2 = rData2.length
+ val unioned = (rData1.map((_, true)) ++ rData2.map((_, false))).sortBy(_._1)
+ val parallel = sc.parallelize(unioned, 100)
+ // verify that there are empty partitions
+ assert(parallel.mapPartitions(x => Array(x.size).iterator).collect().contains(0))
+ val localExtrema = parallel.mapPartitions(
+ KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n1, n2)
+ ).collect()
+ val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n1 * n2)
+
+ val rKSStat = 0.15
+ assert(ksCompStat ~== rKSStat relTol 1e-4)
+ }
+
+ test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have only 1 sample") {
+ // Creating 2 samples that don't overlap and request a large number of partitions to guarantee
+ // that there will be partitions with only data from 1 sample. We test critical helper
+ // functions in these circumstances.
+ val n = 100
+ val lower = (1 to n).toArray.map(_.toDouble)
+ val upper = (1 to n).toArray.map(n + _.toDouble * 100)
+
+ val unioned = (lower.map((_, true)) ++ upper.map((_, false))).sortBy(_._1)
+ val parallel = sc.parallelize(unioned, 200)
+ // verify that there is at least 1 partition with only 1 sample
+ assert(parallel.mapPartitions(x =>
+ Array(x.toArray.map(_._1).distinct.length).iterator
+ ).collect().contains(1)
+ )
+ val localExtrema = parallel.mapPartitions(
+ KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n, n)
+ ).collect()
+ val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n * n)
+
+ // Use apache math commons local KS test to verify calculations
+ val ksTest = new CommonMathKolmogorovSmirnovTest()
+
+ val refStat4 = ksTest.kolmogorovSmirnovStatistic(lower, upper)
+ assert(ksCompStat ~== refStat4 relTol 1e-3)
+ }
}