diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 8e0f82ddb889..110bd0a9a0c4 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo sc.stop() sc = null } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } test("halting by voting") { diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9155159cf6ae..59359dc76d05 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,3 +156,41 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } + +/** + * A [[org.apache.spark.Partitioner]] that partitions records into balanced partitions + * by allocating keys to partitions in a round robin fashion. + */ +class BalancedPartitioner[K : Ordering : ClassTag, V]( + partitions: Int, + @transient rdd: RDD[_ <: Product2[K,V]]) + extends Partitioner { + + // this array keeps track of keys assigned to a partition + // counts[0] refers to # of keys in partition 0 and so on + private val counts: Array[Int] = { + new Array[Int](numPartitions) + } + + def numPartitions = math.abs(partitions) + + var currPartition = 0 + + /* + * Pick current partition for the key in round robin manner + */ + def getPartition(key: Any): Int = { + val partition = currPartition + counts(partition) = counts(partition) + 1 + currPartition = (currPartition + 1) % numPartitions + partition + } + + override def equals(other: Any): Boolean = other match { + case r: BalancedPartitioner[_,_] => + (r.counts.sameElements(counts) + && r.currPartition == currPartition) + case _ => + false + } +} diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 191201582792..d37cd25d8119 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -65,8 +65,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } static class ReverseIntComparator implements Comparator, Serializable { diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index c645e4cbe813..4ab870e75177 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -39,7 +39,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -77,7 +76,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -129,7 +127,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -182,7 +179,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6b2571cd9295..95ba273f16a7 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -124,9 +124,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = new SecurityManager(conf)) - // Will be cleared by LocalSparkContext - System.setProperty("spark.driver.port", boundPort.toString) - val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7c30626a0c42..6cbb54799e53 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,6 +66,34 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(descendingP4 != p4) } + test("BalancedPartitioner equality") { + // Make an RDD where all the elements are the same so that the partition range bounds + // are deterministically all the same. + val rdd = sc.parallelize(1.to(4000)).map(x => (x, x)) + + val p2 = new BalancedPartitioner(2, rdd) + val p4 = new BalancedPartitioner(4, rdd) + val anotherP4 = new BalancedPartitioner(4, rdd) + + assert(p2 === p2) + assert(p4 === p4) + assert(p2 != p4) + assert(p4 != p2) + assert(p4 === anotherP4) + assert(anotherP4 === p4) + } + + test("BalancedPartitioner getPartition") { + val rdd = sc.parallelize(1.to(2000)).map(x => (x, x)) + val partitioner = new BalancedPartitioner(4, rdd) + var expectedPartition = 0 + 1.to(2000).map { element => { + val partition = partitioner.getPartition(element) + assert(partition === expectedPartition) + expectedPartition = (expectedPartition + 1) % 4 + }} + } + test("RangePartitioner getPartition") { val rdd = sc.parallelize(1.to(2000)).map(x => (x, x)) // We have different behaviour of getPartition for partitions with less than 1000 and more than diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 00deecc1c3ca..81bd8257bc15 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -78,8 +78,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { - System.clearProperty("spark.driver.port") - if (store != null) { store.stop() store = null diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index feabca673348..8f4da1cd6a6d 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -57,8 +57,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } @Test diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 51f02f94e00d..47594a800a3b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -38,8 +38,6 @@ trait LocalSparkContext { f(sc) } finally { sc.stop() - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java index d75d3a6b2673..faa675b59cd5 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -42,7 +42,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LogisticRegressionModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index c80b1134ed1b..3cf7c936629a 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -42,7 +42,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } private static final List POINTS = Arrays.asList( diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java index 667f76a1bd55..31b9f3e8d438 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java @@ -41,7 +41,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, SVMModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 49a614bd90ca..aa75a0150c39 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -44,7 +44,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } @Test diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index b150334deb06..bf2365f82044 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -42,7 +42,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java index f725924a2d97..8950b48888b7 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java @@ -41,7 +41,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LassoModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java index 7151e553512b..ce88b1c88b71 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -41,7 +41,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LinearRegressionModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java index 03714ae7e4d0..7266eec23580 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -43,7 +43,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } double predictionError(List validationData, RidgeRegressionModel model) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala index 212fbe9288f0..0d4868f3d9e4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -34,7 +34,6 @@ trait LocalSparkContext extends BeforeAndAfterAll { self: Suite => if (sc != null) { sc.stop() } - System.clearProperty("spark.driver.port") super.afterAll() } } diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 64f2eeb12b4f..685e2961be1c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -49,10 +49,6 @@ def setUp(self): def tearDown(self): self.sc.stop() sys.path = self._old_sys_path - # To avoid Akka rebinding to the same port, since it doesn't unbind - # immediately on shutdown - self.sc._jvm.System.clearProperty("spark.driver.port") - class TestCheckpoint(PySparkTestCase): diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 566d96e16ed8..f996e39bf541 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -50,8 +50,6 @@ class ReplSuite extends FunSuite { if (interp.sparkContext != null) { interp.sparkContext.stop() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") return out.toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 3ad66a3d7f45..18c114e92de1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -58,7 +58,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. - System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index c48a38590e06..5bc839664939 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -186,7 +186,6 @@ object MasterFailureTest extends Logging { setupCalled = true // Setup the streaming computation with the given operation - System.clearProperty("spark.driver.port") val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 849bbf129918..6e1f01900071 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -27,7 +27,6 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.clearProperty("spark.driver.port"); System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); @@ -37,8 +36,5 @@ public void setUp() { public void tearDown() { ssc.stop(); ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 25739956cb88..5790c577c761 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -368,7 +368,6 @@ class CheckpointSuite extends TestSuiteBase { "\n-------------------------------------------\n" ) ssc = new StreamingContext(checkpointDir) - System.clearProperty("spark.driver.port") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 8036f77c973a..cc178fba12c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -153,8 +153,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Default after function for any streaming test suite. Override this // if you want to add your stuff to "after" (i.e., don't call after { } ) def afterFunction() { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") System.clearProperty("spark.streaming.clock") }