From 122d378675bb2365dd4e8fd5270df3ca6e554d55 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Thu, 7 May 2015 12:45:01 +0530 Subject: [PATCH 1/5] Fixed validation of relativeSD in countApproxDistinct --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 ++++--- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b3b60578c92e8..dd22bd38d55a8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1151,8 +1151,8 @@ abstract class RDD[T: ClassTag]( */ @Experimental def countApproxDistinct(p: Int, sp: Int): Long = withScope { - require(p >= 4, s"p ($p) must be at least 4") - require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(p >= 4, s"p ($p) must be >= 4") + require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val zeroCounter = new HyperLogLogPlus(p, sp) aggregate(zeroCounter)( @@ -1177,8 +1177,9 @@ abstract class RDD[T: ClassTag]( * It must be greater than 0.000017. */ def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope { + require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt - countApproxDistinct(p, 0) + countApproxDistinct(if (p < 4) 4 else p, 0) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ef8c36a28655b..afc11bdc4d6ab 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -89,6 +89,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val simpleRdd = sc.makeRDD(uniformDistro, 10) assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2) assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1) + assert(error(simpleRdd.countApproxDistinct(0.02), size) < 0.1) + assert(error(simpleRdd.countApproxDistinct(0.5), size) < 0.22) } test("SparkContext.union") { From b1b00a3e547bc657ceee92afb719bbb10056eb9e Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Fri, 8 May 2015 11:15:05 +0530 Subject: [PATCH 2/5] Removed relativeSD validation from python API,RDD.scala will do validation --- python/pyspark/rdd.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d254deb527d10..d5d9d8b87a671 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2237,10 +2237,7 @@ def countApproxDistinct(self, relativeSD=0.05): >>> 16 < n < 24 True """ - if relativeSD < 0.000017: - raise ValueError("relativeSD should be greater than 0.000017") - if relativeSD > 0.37: - raise ValueError("relativeSD should be smaller than 0.37") + # the hash space in Java is 2^32 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) From 8ddbfae8cd213dfeaabc6ea49421bf10f69a48c1 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Fri, 8 May 2015 11:19:38 +0530 Subject: [PATCH 3/5] Remove blank line --- python/pyspark/rdd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d5d9d8b87a671..42a452fa56907 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2237,7 +2237,6 @@ def countApproxDistinct(self, relativeSD=0.05): >>> 16 < n < 24 True """ - # the hash space in Java is 2^32 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) From 799976eb79c15c17e4d6a8310f6bf717a0f55042 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Fri, 8 May 2015 12:52:34 +0530 Subject: [PATCH 4/5] Removed testcase to assert IAE when relativeSD>3.7 --- python/pyspark/tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ea63a396da5b8..09de4d159fdcf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -644,7 +644,6 @@ def test_count_approx_distinct(self): self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22) self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001)) - self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5)) def test_histogram(self): # empty From 3a3d59c9a35035dccbac64602d579d503cb35999 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Fri, 8 May 2015 14:55:58 +0530 Subject: [PATCH 5/5] Reverted removal of validation relativeSD<0.000017 --- python/pyspark/rdd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 42a452fa56907..545c5ad20cb96 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2237,6 +2237,8 @@ def countApproxDistinct(self, relativeSD=0.05): >>> 16 < n < 24 True """ + if relativeSD < 0.000017: + raise ValueError("relativeSD should be greater than 0.000017") # the hash space in Java is 2^32 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)