Skip to content
Closed
8 changes: 4 additions & 4 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,9 @@ test_that("column binary mathfunctions", {
expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01)
expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01)
expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
})

test_that("string operators", {
Expand Down Expand Up @@ -1458,8 +1458,8 @@ test_that("sampleBy() on a DataFrame", {
fractions <- list("0" = 0.1, "1" = 0.2)
sample <- sampleBy(df, "key", fractions, 0)
result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
expect_identical(as.list(result[1, ]), list(key = "0", count = 2))
expect_identical(as.list(result[2, ]), list(key = "1", count = 10))
expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one really confused me ... I thought I could figure out what the right values should be by running the equivalent thing in scala, but I got totally different answers.

case class Data(val key: String)
val data = (0 to 99).map{x => Data((x % 3).toString)}
val dataDF = sqlContext.createDataFrame(data)
scala> dataDF.stat.sampleBy("key", Map("0" -> 0.1, "1" -> 0.2), 0).groupBy("key").count().show()
+---+-----+
|key|count|
+---+-----+
|  0|    5|
|  1|    5|
+---+-----+

but whatever, these vals work :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the default number of partitions is different in Scala/Python/R tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, great, glad there is a good explanation for this :)

})

test_that("SQL error message is returned from JVM", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) {
private[spark] object XORShiftRandom {

/** Hash seeds to have 0/1 bits throughout. */
private def hashSeed(seed: Long): Long = {
private[random] def hashSeed(seed: Long): Long = {
val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array()
MurmurHash3.bytesHash(bytes)
val lowBits = MurmurHash3.bytesHash(bytes)
val highBits = MurmurHash3.bytesHash(bytes, lowBits)
(highBits.toLong << 32) | (lowBits.toLong & 0xFFFFFFFFL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although normally it might not be 100% valid to make a 64-bit hash out of two 32-bit hashes this way (I'm not clear that reusing the seed doesn't connect the bits in some subtle way), it's certainly a big improvement and probably entirely fine for this purpose. I'd still like to remove XORShiftRandom, but that can be for another day.

}

/**
Expand Down
20 changes: 14 additions & 6 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,29 @@ public void intersection() {
public void sample() {
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> rdd = sc.parallelize(ints);
JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 3);
// the seeds here are "magic" to make this work out nicely
JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8);
Assert.assertEquals(2, sample20.count());
JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 5);
JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2);
Assert.assertEquals(2, sample20WithoutReplacement.count());
}

@Test
public void randomSplit() {
List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> ints = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
ints.add(i);
}
JavaRDD<Integer> rdd = sc.parallelize(ints);
JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31);
// the splits aren't perfect -- not enough data for them to be -- just check they're about right
Assert.assertEquals(3, splits.length);
Assert.assertEquals(1, splits[0].count());
Assert.assertEquals(2, splits[1].count());
Assert.assertEquals(7, splits[2].count());
long s0 = splits[0].count();
long s1 = splits[1].count();
long s2 = splits[2].count();
Assert.assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250);
Assert.assertTrue(s1 + " not within expected range", s1 > 250 && s0 < 350);
Assert.assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.rdd

import org.apache.commons.math3.distribution.{PoissonDistribution, BinomialDistribution}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred._
import org.apache.hadoop.util.Progressable
Expand Down Expand Up @@ -578,17 +579,36 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
}

def checkSize(exact: Boolean,
withReplacement: Boolean,
expected: Long,
actual: Long,
p: Double): Boolean = {
def assertBinomialSample(
exact: Boolean,
actual: Int,
trials: Int,
p: Double): Unit = {
if (exact) {
assert(actual == math.ceil(p * trials).toInt)
} else {
val dist = new BinomialDistribution(trials, p)
val q = dist.cumulativeProbability(actual)
withClue(s"p = $p: trials = $trials") {
assert(q >= 0.001 && q <= 0.999)
}
}
}

def assertPoissonSample(
exact: Boolean,
actual: Int,
trials: Int,
p: Double): Unit = {
if (exact) {
return expected == actual
assert(actual == math.ceil(p * trials).toInt)
} else {
val dist = new PoissonDistribution(p * trials)
val q = dist.cumulativeProbability(actual)
withClue(s"p = $p: trials = $trials") {
assert(q >= 0.001 && q <= 0.999)
}
}
val stdev = if (withReplacement) math.sqrt(expected) else math.sqrt(expected * p * (1 - p))
// Very forgiving margin since we're dealing with very small sample sizes most of the time
math.abs(actual - expected) <= 6 * stdev
}

def testSampleExact(stratifiedData: RDD[(String, Int)],
Expand All @@ -613,8 +633,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
samplingRate: Double,
seed: Long,
n: Long): Unit = {
val expectedSampleSize = stratifiedData.countByKey()
.mapValues(count => math.ceil(count * samplingRate).toInt)
val trials = stratifiedData.countByKey()
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
val sample = if (exact) {
stratifiedData.sampleByKeyExact(false, fractions, seed)
Expand All @@ -623,8 +642,10 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}
val sampleCounts = sample.countByKey()
val takeSample = sample.collect()
sampleCounts.foreach { case(k, v) =>
assert(checkSize(exact, false, expectedSampleSize(k), v, samplingRate)) }
sampleCounts.foreach { case (k, v) =>
assertBinomialSample(exact = exact, actual = v.toInt, trials = trials(k).toInt,
p = samplingRate)
}
assert(takeSample.size === takeSample.toSet.size)
takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
}
Expand All @@ -635,6 +656,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
samplingRate: Double,
seed: Long,
n: Long): Unit = {
val trials = stratifiedData.countByKey()
val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
math.ceil(count * samplingRate).toInt)
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
Expand All @@ -646,7 +668,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
val sampleCounts = sample.countByKey()
val takeSample = sample.collect()
sampleCounts.foreach { case (k, v) =>
assert(checkSize(exact, true, expectedSampleSize(k), v, samplingRate))
assertPoissonSample(exact, actual = v.toInt, trials = trials(k).toInt, p = samplingRate)
}
val groupedByKey = takeSample.groupBy(_._1)
for ((key, v) <- groupedByKey) {
Expand All @@ -657,7 +679,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
if (exact) {
assert(v.toSet.size <= expectedSampleSize(key))
} else {
assert(checkSize(false, true, expectedSampleSize(key), v.toSet.size, samplingRate))
assertPoissonSample(false, actual = v.toSet.size, trials(key).toInt, p = samplingRate)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ class XORShiftRandomSuite extends SparkFunSuite with Matchers {
val random = new XORShiftRandom(0L)
assert(random.nextInt() != 0)
}

test ("hashSeed has random bits throughout") {
val totalBitCount = (0 until 10).map { seed =>
val hashed = XORShiftRandom.hashSeed(seed)
val bitCount = java.lang.Long.bitCount(hashed)
// make sure we have roughly equal numbers of 0s and 1s. Mostly just check that we
// don't have all 0s or 1s in the high bits
bitCount should be > 20
bitCount should be < 44
bitCount
}.sum
// and over all the seeds, very close to equal numbers of 0s & 1s
totalBitCount should be > (32 * 10 - 30)
totalBitCount should be < (32 * 10 + 30)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp

val xMean = Array(5.843, 3.057, 3.758, 1.199)
val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
// the input seed is somewhat magic, to make this test pass
val rdd = sc.parallelize(generateMultinomialLogisticInput(
coefficients, xMean, xVariance, true, nPoints, 42), 2)
coefficients, xMean, xVariance, true, nPoints, 1), 2)
val dataFrame = sqlContext.createDataFrame(rdd).toDF("label", "features")
val numClasses = 3
val numIterations = 100
val layers = Array[Int](4, 5, 4, numClasses)
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(1)
.setSeed(11L)
.setSeed(11L) // currently this seed is ignored
.setMaxIter(numIterations)
val model = trainer.fit(dataFrame)
val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
// copied model must have the same parent.
MLTestingUtils.checkCopy(model)

// These expectations are just magic values, characterizing the current
// behavior. The test needs to be updated to be more general, see SPARK-11502
val magicExp = Vectors.dense(0.30153007534417237, -0.6833061711354689, 0.5116530778733167)
model.transform(docDF).select("result", "expected").collect().foreach {
case Row(vector1: Vector, vector2: Vector) =>
assert(vector1 ~== vector2 absTol 1E-5, "Transformed vector is different with expected.")
assert(vector1 ~== magicExp absTol 1E-5, "Transformed vector is different with expected.")
}
}

Expand Down Expand Up @@ -99,8 +102,15 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
val realVectors = model.getVectors.sort("word").select("vector").map {
case Row(v: Vector) => v
}.collect()
// These expectations are just magic values, characterizing the current
// behavior. The test needs to be updated to be more general, see SPARK-11502
val magicExpected = Seq(
Vectors.dense(0.3326166272163391, -0.5603077411651611, -0.2309209555387497),
Vectors.dense(0.32463887333869934, -0.9306551218032837, 1.393115520477295),
Vectors.dense(-0.27150997519493103, 0.4372006058692932, -0.13465698063373566)
)

realVectors.zip(expectedVectors).foreach {
realVectors.zip(magicExpected).foreach {
case (real, expected) =>
assert(real ~== expected absTol 1E-5, "Actual vector is different from expected.")
}
Expand All @@ -122,7 +132,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
.setSeed(42L)
.fit(docDF)

val expectedSimilarity = Array(0.2789285076917586, -0.6336972059851644)
val expectedSimilarity = Array(0.18032623242822343, -0.5717976464798823)
val (synonyms, similarity) = model.findSynonyms("a", 2).map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
runStreams(ssc, numBatches, numBatches)

// check that estimated centers are close to true centers
// NOTE exact assignment depends on the initialization!
assert(centers(0) ~== kMeans.latestModel().clusterCenters(0) absTol 1E-1)
assert(centers(1) ~== kMeans.latestModel().clusterCenters(1) absTol 1E-1)
// cluster ordering is arbitrary, so choose closest cluster
val d0 = Vectors.sqdist(kMeans.latestModel().clusterCenters(0), centers(0))
val d1 = Vectors.sqdist(kMeans.latestModel().clusterCenters(0), centers(1))
val (c0, c1) = if (d0 < d1) {
(centers(0), centers(1))
} else {
(centers(1), centers(0))
}
assert(c0 ~== kMeans.latestModel().clusterCenters(0) absTol 1E-1)
assert(c1 ~== kMeans.latestModel().clusterCenters(1) absTol 1E-1)
}

test("detecting dying clusters") {
Expand Down
20 changes: 10 additions & 10 deletions python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -1788,21 +1788,21 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has
+----+--------------------+
|word| vector|
+----+--------------------+
| a|[-0.3511952459812...|
| b|[0.29077222943305...|
| c|[0.02315592765808...|
| a|[0.09461779892444...|
| b|[1.15474212169647...|
| c|[-0.3794820010662...|
+----+--------------------+
...
>>> model.findSynonyms("a", 2).show()
+----+-------------------+
|word| similarity|
+----+-------------------+
| b|0.29255685145799626|
| c|-0.5414068302988307|
+----+-------------------+
+----+--------------------+
|word| similarity|
+----+--------------------+
| b| 0.16782984556103436|
| c|-0.46761559092107646|
+----+--------------------+
...
>>> model.transform(doc).head().model
DenseVector([-0.0422, -0.5138, -0.2546, 0.6885, 0.276])
DenseVector([0.5524, -0.4995, -0.3599, 0.0241, 0.3461])

.. versionadded:: 1.4.0
"""
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/ml/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
>>> test = sqlContext.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"])
>>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
>>> predictions[0]
Row(user=0, item=2, prediction=0.39...)
Row(user=0, item=2, prediction=-0.13807615637779236)
>>> predictions[1]
Row(user=1, item=0, prediction=3.19...)
Row(user=1, item=0, prediction=2.6258413791656494)
>>> predictions[2]
Row(user=2, item=0, prediction=-1.15...)
Row(user=2, item=0, prediction=-1.5018409490585327)

.. versionadded:: 1.4.0
"""
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):

>>> model = ALS.train(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
3.8...
3.73...

>>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)])
>>> model = ALS.train(df, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
3.8...
3.73...

>>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def sample(self, withReplacement, fraction, seed=None):
"""Returns a sampled subset of this :class:`DataFrame`.

>>> df.sample(False, 0.5, 42).count()
1
2
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
seed = seed if seed is not None else random.randint(0, sys.maxsize)
Expand All @@ -463,8 +463,8 @@ def sampleBy(self, col, fractions, seed=None):
+---+-----+
|key|count|
+---+-----+
| 0| 3|
| 1| 8|
| 0| 5|
| 1| 9|
+---+-----+

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.spark.SparkFunSuite
class RandomSuite extends SparkFunSuite with ExpressionEvalHelper {

test("random") {
checkDoubleEvaluation(Rand(30), 0.7363714192755834 +- 0.001)
checkDoubleEvaluation(Randn(30), 0.5181478766595276 +- 0.001)
checkDoubleEvaluation(Rand(30), 0.31429268272540556 +- 0.001)
checkDoubleEvaluation(Randn(30), -0.4798519469521663 +- 0.001)
}

test("SPARK-9127 codegen with long seed") {
checkDoubleEvaluation(Rand(5419823303878592871L), 0.4061913198963727 +- 0.001)
checkDoubleEvaluation(Randn(5419823303878592871L), -0.24417152005343168 +- 0.001)
checkDoubleEvaluation(Rand(5419823303878592871L), 0.2304755080444375 +- 0.001)
checkDoubleEvaluation(Randn(5419823303878592871L), -1.2824262718225607 +- 0.001)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ public void testSampleBy() {
DataFrame df = context.range(0, 100, 1, 2).select(col("id").mod(3).as("key"));
DataFrame sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L);
Row[] actual = sampled.groupBy("key").count().orderBy("key").collect();
Row[] expected = {RowFactory.create(0, 5), RowFactory.create(1, 8)};
Assert.assertArrayEquals(expected, actual);
Assert.assertEquals(0, actual[0].getLong(0));
Assert.assertTrue(0 <= actual[0].getLong(1) && actual[0].getLong(1) <= 8);
Assert.assertEquals(1, actual[1].getLong(0));
Assert.assertTrue(2 <= actual[1].getLong(1) && actual[1].getLong(1) <= 13);
}
}
Loading