From bfce9479239b6c243cbdac29321b2f18fd29447d Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 9 Sep 2019 13:45:24 +0900 Subject: [PATCH 1/2] [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c4424b3cff87..5e4568d86504 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -368,6 +368,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL } ssc.start() ssc.awaitTerminationOrTimeout(500) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(SlowTestReceiver.initialized) + } ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) assert(runningCount > 0) @@ -974,6 +977,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) } receivingThreadOption = Some(thread) thread.start() + SlowTestReceiver.initialized = true } def onStop() { @@ -986,6 +990,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) } object SlowTestReceiver { + var initialized = false var receivedAllRecords = false } From b5e39c5e6ba7fa372b9ad0632791d8759bd2190f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 11 Sep 2019 10:05:45 +0900 Subject: [PATCH 2/2] Move unnecessarily exposed flag --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5e4568d86504..336ee48735b2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -356,7 +356,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL logInfo("==================================\n\n\n") ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 - SlowTestReceiver.receivedAllRecords = false // Create test receiver that sleeps in onStop() val totalNumRecords = 15 val recordsPerSecond = 1 @@ -962,6 +961,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { var receivingThreadOption: Option[Thread] = None + @volatile var receivedAllRecords = false def onStart() { val thread = new Thread() { @@ -971,7 +971,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) Thread.sleep(1000 / recordsPerSecond) store(i) } - SlowTestReceiver.receivedAllRecords = true + receivedAllRecords = true logInfo(s"Received all $totalRecords records") } } @@ -982,7 +982,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) def onStop() { // Simulate slow receiver by waiting for all records to be produced - while (!SlowTestReceiver.receivedAllRecords) { + while (!receivedAllRecords) { Thread.sleep(100) } // no clean to be done, the receiving thread should stop on it own @@ -991,7 +991,6 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) object SlowTestReceiver { var initialized = false - var receivedAllRecords = false } /** Streaming application for testing DStream and RDD creation sites */