From 93cd3141680174b2a06a40dbe39cbaaa37a21695 Mon Sep 17 00:00:00 2001 From: Lars Albertsson Date: Fri, 6 Jun 2014 22:59:12 +0200 Subject: [PATCH] Mend StreamingContext stop() followed by awaitTermination(). --- .../org/apache/spark/streaming/ContextWaiter.scala | 1 + .../spark/streaming/StreamingContextSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index 86753360a07e4..a0aeacbc733bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -27,6 +27,7 @@ private[streaming] class ContextWaiter { } def notifyStop() = synchronized { + stopped = true notifyAll() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index cd86019f63e7e..7b33d3b235466 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("awaitTermination after stop") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => x).register() + + failAfter(10000 millis) { + ssc.start() + ssc.stop() + ssc.awaitTermination() + } + } + test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc)