diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 6724af952505..0f78871ed35a 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -44,7 +44,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So { implicit val defaultTimeout = timeout(10000 millis) val conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") @@ -232,7 +232,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { // Verify that checkpoints are NOT cleaned up if the config is not enabled sc.stop() val conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("cleanupCheckpoint") .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false") sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 915d7a1b8b16..5457a066d3c0 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -67,7 +67,7 @@ class HeartbeatReceiverSuite override def beforeEach(): Unit = { super.beforeEach() val conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.dynamicAllocation.testing", "true") sc = spy(new SparkContext(conf)) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index a3490fc79e45..5b89eaae032a 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -47,7 +47,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft test("local mode, FIFO scheduler") { val conf = new SparkConf().set("spark.scheduler.mode", "FIFO") - sc = new SparkContext("local[2]", "test", conf) + sc = new SparkContext("local[4]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. @@ -58,7 +58,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft val conf = new SparkConf().set("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() conf.set("spark.scheduler.allocation.file", xmlPath) - sc = new SparkContext("local[2]", "test", conf) + sc = new SparkContext("local[4]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. @@ -115,7 +115,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("job group") { - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") // Add a listener to release the semaphore once any tasks are launched. val sem = new Semaphore(0) @@ -145,7 +145,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("inherited job group (SPARK-6629)") { - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") // Add a listener to release the semaphore once any tasks are launched. val sem = new Semaphore(0) @@ -180,7 +180,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("job group with interruption") { - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") // Add a listener to release the semaphore once any tasks are launched. val sem = new Semaphore(0) @@ -215,7 +215,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // make sure the first stage is not finished until cancel is issued val sem1 = new Semaphore(0) - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") sc.addSparkListener(new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart) { sem1.release() diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 83906cff123b..21b2726d7e1d 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -132,8 +132,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("SparkContext property overriding") { val conf = new SparkConf(false).setMaster("local").setAppName("My app") - sc = new SparkContext("local[2]", "My other app", conf) - assert(sc.master === "local[2]") + sc = new SparkContext("local[4]", "My other app", conf) + assert(sc.master === "local[4]") assert(sc.appName === "My other app") } diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 58664e77d24a..ef5845a77c11 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -36,7 +36,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim override def beforeAll() { super.beforeAll() - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") } override def afterAll() { diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala index 2802cd975292..5ff61b35c8bc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala @@ -28,7 +28,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { super.beforeEach() - sc = new SparkContext("local[2]", "test") + sc = new SparkContext("local[4]", "test") } test("transform storage level") { diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index cfedb5a042a3..36ba0bda528d 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -29,7 +29,7 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 1c93079497f6..cd6e4b78ec38 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -43,7 +43,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with @transient private var _sc: SparkContext = _ val conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") diff --git a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java index 43779878890d..6c542bcec167 100644 --- a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java +++ b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java @@ -34,7 +34,7 @@ public abstract class SharedSparkSession implements Serializable { @Before public void setUp() throws IOException { spark = SparkSession.builder() - .master("local[2]") + .master("local[4]") .appName(getClass().getSimpleName()) .getOrCreate(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java index 8c6bced52dd7..e57a7ce49857 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java @@ -43,7 +43,7 @@ public class JavaStreamingLogisticRegressionSuite { @Before public void setUp() { SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaStreamingKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaStreamingKMeansSuite.java index d41fc0e4dca9..52b5e7a958ea 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaStreamingKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaStreamingKMeansSuite.java @@ -42,7 +42,7 @@ public class JavaStreamingKMeansSuite { @Before public void setUp() { SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java index ab554475d59a..bfcdbd2edbd4 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java @@ -42,7 +42,7 @@ public class JavaStreamingLinearRegressionSuite { @Before public void setUp() { SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java index 1abaa39eadc2..eb1c9a300a8b 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java @@ -51,7 +51,7 @@ public void setUp() { SparkConf conf = new SparkConf() .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); spark = SparkSession.builder() - .master("local[2]") + .master("local[4]") .appName("JavaStatistics") .config(conf) .getOrCreate(); diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index b923bacce23c..56df76bb81d6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -558,7 +558,7 @@ class ALSCleanerSuite extends SparkFunSuite { FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet try { conf.set("spark.local.dir", localDir.getAbsolutePath) - val sc = new SparkContext("local[2]", "test", conf) + val sc = new SparkContext("local[4]", "test", conf) try { sc.setCheckpointDir(checkpointDir.getAbsolutePath) // Test checkpoint and clean parents @@ -590,14 +590,14 @@ class ALSCleanerSuite extends SparkFunSuite { FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet try { conf.set("spark.local.dir", localDir.getAbsolutePath) - val sc = new SparkContext("local[2]", "test", conf) + val sc = new SparkContext("local[4]", "test", conf) try { sc.setCheckpointDir(checkpointDir.getAbsolutePath) // Generate test data val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0) // Implicitly test the cleaning of parents during ALS training val spark = SparkSession.builder - .master("local[2]") + .master("local[4]") .appName("ALSCleanerSuite") .sparkContext(sc) .getOrCreate() diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala index c90cb8ca1034..459d211b2b19 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala @@ -43,7 +43,7 @@ private[ml] object TreeTests extends SparkFunSuite { categoricalFeatures: Map[Int, Int], numClasses: Int): DataFrame = { val spark = SparkSession.builder() - .master("local[2]") + .master("local[4]") .appName("TreeTests") .sparkContext(data.sparkContext) .getOrCreate() diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 26b8600c32c1..412cde2a571c 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -342,7 +342,7 @@ class ReplSuite extends SparkFunSuite { } test("collecting objects of class defined in repl") { - val output = runInterpreter("local[2]", + val output = runInterpreter("local[4]", """ |case class Foo(i: Int) |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect() diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 9262e938c2a6..d4e79b3f757d 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -374,7 +374,7 @@ class ReplSuite extends SparkFunSuite { } test("collecting objects of class defined in repl") { - val output = runInterpreter("local[2]", + val output = runInterpreter("local[4]", """ |case class Foo(i: Int) |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 2f247ca3e8b7..e3f16c25c9fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} */ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { self => def this(sparkConf: SparkConf) { - this(new SparkContext("local[2]", "test-sql-context", + this(new SparkContext("local[4]", "test-sql-context", sparkConf.set("spark.sql.testkey", "true"))) } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 648a5abe0b89..4b8fbf54b675 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -368,7 +368,7 @@ public void testQueueStream() { ssc.stop(); // Create a new JavaStreamingContext without checkpointing SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); @@ -1244,7 +1244,15 @@ public void testCountByValue() { JavaTestUtils.attachTestOutputStream(counted); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + + + for (int i=0;i> expectedTupleList = expected.get(i); + List> resultTupleList = result.get(i); + Assert.assertTrue(resultTupleList.containsAll(expectedTupleList)); + } + + //Assert.assertEquals(expected, result); } @SuppressWarnings("unchecked") @@ -1815,7 +1823,7 @@ public void testContextGetOrCreate() throws InterruptedException { ssc.stop(); final SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("newContext", "true"); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbfd85ca..ca2363fdaaa2 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -61,7 +61,7 @@ public void testReceiver() throws InterruptedException { final AtomicLong dataCounter = new AtomicLong(0); try { - JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); + JavaStreamingContext ssc = new JavaStreamingContext("local[4]", "test", new Duration(200)); JavaReceiverInputDStream input = ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); JavaDStream mapped = input.map(new Function() { diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index cfedb5a042a3..36ba0bda528d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -29,7 +29,7 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { SparkConf conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName("test") .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); ssc = new JavaStreamingContext(conf, new Duration(1000)); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 107c3f5dcc08..2cc0f4434039 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -49,7 +49,7 @@ class ReceivedBlockTrackerSuite var conf: SparkConf = null before { - conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") + conf = new SparkConf().setMaster("local[4]").setAppName("ReceivedBlockTrackerSuite") checkpointDirectory = Utils.createTempDir() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f1482e5c06cd..a384a8851cb0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { - val master = "local[2]" + val master = "local[4]" val appName = this.getClass.getSimpleName val batchDuration = Milliseconds(500) val sparkHome = "someDir" diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 0f957a1b5570..f19a9dfe5987 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -121,7 +121,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("receiver info reporting") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) @@ -146,7 +146,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("output operation reporting") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count()) inputStream.foreachRDD(_.collect()) @@ -167,7 +167,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("don't call ssc.stop in listener") { - ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "ssc", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) @@ -175,7 +175,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("onBatchCompleted with successful batch") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) @@ -185,7 +185,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("onBatchCompleted with failed batch and one failed job") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD { _ => throw new RuntimeException("This is a failed job") @@ -200,7 +200,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("onBatchCompleted with failed batch and multiple failed jobs") { - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD { _ => throw new RuntimeException("This is a failed job") @@ -223,7 +223,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { test("StreamingListener receives no events after stopping StreamingListenerBus") { val streamingListener = mock(classOf[StreamingListener]) - ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc = new StreamingContext("local[4]", "test", Milliseconds(1000)) ssc.addStreamingListener(streamingListener) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index fa975a146216..c2833c347f6d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -217,7 +217,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { def framework: String = this.getClass.getSimpleName // Master for Spark context - def master: String = "local[2]" + def master: String = "local[4]" // Batch duration def batchDuration: Duration = Seconds(1) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index ce5a6e00fb2f..a7a808f23dd8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -33,7 +33,7 @@ class WriteAheadLogBackedBlockRDDSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach { val conf = new SparkConf() - .setMaster("local[2]") + .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) val hadoopConf = new Configuration() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala index a7e365649d3e..ce7e53da984d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala @@ -27,7 +27,7 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter { private var ssc: StreamingContext = _ before { - val conf = new SparkConf().setMaster("local[2]").setAppName("DirectStreamTacker") + val conf = new SparkConf().setMaster("local[4]").setAppName("DirectStreamTacker") if (ssc == null) { ssc = new StreamingContext(conf, Duration(1000)) }