Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
{
implicit val defaultTimeout = timeout(10000 millis)
val conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
Expand Down Expand Up @@ -232,7 +232,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
// Verify that checkpoints are NOT cleaned up if the config is not enabled
sc.stop()
val conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("cleanupCheckpoint")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
sc = new SparkContext(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class HeartbeatReceiverSuite
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.dynamicAllocation.testing", "true")
sc = spy(new SparkContext(conf))
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft

test("local mode, FIFO scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test", conf)
sc = new SparkContext("local[4]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand All @@ -58,7 +58,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test", conf)
sc = new SparkContext("local[4]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand Down Expand Up @@ -115,7 +115,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("job group") {
sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
Expand Down Expand Up @@ -145,7 +145,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("inherited job group (SPARK-6629)") {
sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
Expand Down Expand Up @@ -180,7 +180,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("job group with interruption") {
sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
Expand Down Expand Up @@ -215,7 +215,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)

sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem1.release()
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

test("SparkContext property overriding") {
val conf = new SparkConf(false).setMaster("local").setAppName("My app")
sc = new SparkContext("local[2]", "My other app", conf)
assert(sc.master === "local[2]")
sc = new SparkContext("local[4]", "My other app", conf)
assert(sc.master === "local[4]")
assert(sc.appName === "My other app")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim

override def beforeAll() {
super.beforeAll()
sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {

override def beforeEach(): Unit = {
super.beforeEach()
sc = new SparkContext("local[2]", "test")
sc = new SparkContext("local[4]", "test")
}

test("transform storage level") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with
@transient private var _sc: SparkContext = _

val conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class SharedSparkSession implements Serializable {
@Before
public void setUp() throws IOException {
spark = SparkSession.builder()
.master("local[2]")
.master("local[4]")
.appName(getClass().getSimpleName())
.getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class JavaStreamingLogisticRegressionSuite {
@Before
public void setUp() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class JavaStreamingKMeansSuite {
@Before
public void setUp() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class JavaStreamingLinearRegressionSuite {
@Before
public void setUp() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() {
SparkConf conf = new SparkConf()
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
spark = SparkSession.builder()
.master("local[2]")
.master("local[4]")
.appName("JavaStatistics")
.config(conf)
.getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class ALSCleanerSuite extends SparkFunSuite {
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "test", conf)
val sc = new SparkContext("local[4]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Test checkpoint and clean parents
Expand Down Expand Up @@ -590,14 +590,14 @@ class ALSCleanerSuite extends SparkFunSuite {
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "test", conf)
val sc = new SparkContext("local[4]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Generate test data
val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
// Implicitly test the cleaning of parents during ALS training
val spark = SparkSession.builder
.master("local[2]")
.master("local[4]")
.appName("ALSCleanerSuite")
.sparkContext(sc)
.getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[ml] object TreeTests extends SparkFunSuite {
categoricalFeatures: Map[Int, Int],
numClasses: Int): DataFrame = {
val spark = SparkSession.builder()
.master("local[2]")
.master("local[4]")
.appName("TreeTests")
.sparkContext(data.sparkContext)
.getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class ReplSuite extends SparkFunSuite {
}

test("collecting objects of class defined in repl") {
val output = runInterpreter("local[2]",
val output = runInterpreter("local[4]",
"""
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class ReplSuite extends SparkFunSuite {
}

test("collecting objects of class defined in repl") {
val output = runInterpreter("local[2]",
val output = runInterpreter("local[4]",
"""
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { self =>
def this(sparkConf: SparkConf) {
this(new SparkContext("local[2]", "test-sql-context",
this(new SparkContext("local[4]", "test-sql-context",
sparkConf.set("spark.sql.testkey", "true")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public void testQueueStream() {
ssc.stop();
// Create a new JavaStreamingContext without checkpointing
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down Expand Up @@ -1244,7 +1244,15 @@ public void testCountByValue() {
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);

Assert.assertEquals(expected, result);


for (int i=0;i<expected.size();i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only asserts that the actual result is a superset of the expected results. It won't catch the case that it contains unexpected values. You could check both ways.

List<Tuple2<String,Long>> expectedTupleList = expected.get(i);
List<Tuple2<String,Long>> resultTupleList = result.get(i);
Assert.assertTrue(resultTupleList.containsAll(expectedTupleList));
}

//Assert.assertEquals(expected, result);
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we might not need to leave this comment as it is able to be tracked down via blame button or history.

}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1815,7 +1823,7 @@ public void testContextGetOrCreate() throws InterruptedException {
ssc.stop();

final SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("newContext", "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testReceiver() throws InterruptedException {
final AtomicLong dataCounter = new AtomicLong(0);

try {
JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
JavaStreamingContext ssc = new JavaStreamingContext("local[4]", "test", new Duration(200));
JavaReceiverInputDStream<String> input =
ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
JavaDStream<String> mapped = input.map(new Function<String, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName("test")
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReceivedBlockTrackerSuite
var conf: SparkConf = null

before {
conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
conf = new SparkConf().setMaster("local[4]").setAppName("ReceivedBlockTrackerSuite")
checkpointDirectory = Utils.createTempDir()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils

class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {

val master = "local[2]"
val master = "local[4]"
val appName = this.getClass.getSimpleName
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}

test("receiver info reporting") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)

Expand All @@ -146,7 +146,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}

test("output operation reporting") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count())
inputStream.foreachRDD(_.collect())
Expand All @@ -167,15 +167,15 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}

test("don't call ssc.stop in listener") {
ssc = new StreamingContext("local[2]", "ssc", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "ssc", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)

startStreamingContextAndCallStop(ssc)
}

test("onBatchCompleted with successful batch") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)

Expand All @@ -185,7 +185,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}

test("onBatchCompleted with failed batch and one failed job") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
Expand All @@ -200,7 +200,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}

test("onBatchCompleted with failed batch and multiple failed jobs") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
Expand All @@ -223,7 +223,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
test("StreamingListener receives no events after stopping StreamingListenerBus") {
val streamingListener = mock(classOf[StreamingListener])

ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
ssc = new StreamingContext("local[4]", "test", Milliseconds(1000))
ssc.addStreamingListener(streamingListener)
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
def framework: String = this.getClass.getSimpleName

// Master for Spark context
def master: String = "local[2]"
def master: String = "local[4]"

// Batch duration
def batchDuration: Duration = Seconds(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class WriteAheadLogBackedBlockRDDSuite
extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach {

val conf = new SparkConf()
.setMaster("local[2]")
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)

val hadoopConf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
private var ssc: StreamingContext = _

before {
val conf = new SparkConf().setMaster("local[2]").setAppName("DirectStreamTacker")
val conf = new SparkConf().setMaster("local[4]").setAppName("DirectStreamTacker")
if (ssc == null) {
ssc = new StreamingContext(conf, Duration(1000))
}
Expand Down