From 94f108be1300a7709c3bd1548ee1114a41665b0d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 25 Aug 2015 21:28:07 +0800 Subject: [PATCH 1/5] Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping --- .../spark/streaming/receiver/BlockGenerator.scala | 10 ++++++++++ .../spark/streaming/receiver/BlockGeneratorSuite.scala | 7 ++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 421d60ae359f..a7d6242d932c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -140,6 +140,16 @@ private[streaming] class BlockGenerator( } } + // When we arrive here, no data will be added to `currentBuffer`. However, `currentBuffer` may + // not be empty. If so, we should wait until all data in `currentBuffer` is consumed, because + // `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling + // `updateCurrentBuffer` for us. + var isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } + while(!isCurrentBufferEmpty) { + Thread.sleep(blockIntervalMs) + isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } + } + // Stop generating blocks and set the state for block pushing thread to start draining the queue logInfo("Stopping BlockGenerator") blockIntervalTimer.stop(interruptTimer = false) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index a38cc603f219..2f11b255f110 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -184,9 +184,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { // Verify that the final data is present in the final generated block and // pushed before complete stop assert(blockGenerator.isStopped() === false) // generator has not stopped yet - clock.advance(blockIntervalMs) // force block generation - failAfter(1 second) { - thread.join() + eventually(timeout(10 seconds), interval(10 milliseconds)) { + // Keep calling `advance` to avoid blocking forever in `clock.waitTillTime` + clock.advance(blockIntervalMs) + assert(thread.isAlive === false) } assert(blockGenerator.isStopped() === true) // generator has finally been completely stopped assert(listener.pushedData === data, "All data not pushed by stop()") From a1c9354c964eab811e3307c17293bb911a811e10 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 22 Sep 2015 10:54:25 +0800 Subject: [PATCH 2/5] Make RecurringTimer call callback again after stopping --- .../spark/streaming/receiver/BlockGenerator.scala | 10 ---------- .../apache/spark/streaming/util/RecurringTimer.scala | 2 ++ 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index a7d6242d932c..421d60ae359f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -140,16 +140,6 @@ private[streaming] class BlockGenerator( } } - // When we arrive here, no data will be added to `currentBuffer`. However, `currentBuffer` may - // not be empty. If so, we should wait until all data in `currentBuffer` is consumed, because - // `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling - // `updateCurrentBuffer` for us. - var isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } - while(!isCurrentBufferEmpty) { - Thread.sleep(blockIntervalMs) - isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty } - } - // Stop generating blocks and set the state for block pushing thread to start draining the queue logInfo("Stopping BlockGenerator") blockIntervalTimer.stop(interruptTimer = false) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index dd32ad5ad811..07a84f529ead 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -99,6 +99,8 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } + clock.waitTillTime(nextTime) + callback(nextTime) } catch { case e: InterruptedException => } From 193d1911c4f6c7716e1b7a58fb2275268a99bea3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 22 Sep 2015 22:21:22 +0800 Subject: [PATCH 3/5] Fix RecurringTimer and add unit tests --- .../spark/streaming/util/RecurringTimer.scala | 22 +++-- .../streaming/util/RecurringTimerSuite.scala | 81 +++++++++++++++++++ 2 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index 07a84f529ead..300f6343a140 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -87,24 +87,32 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: prevTime } + private def triggerActionForNextInterval(): Unit = { + clock.waitTillTime(nextTime) + callback(nextTime) + prevTime = nextTime + nextTime += period + logDebug("Callback for " + name + " called at time " + prevTime) + } + /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { - clock.waitTillTime(nextTime) - callback(nextTime) - prevTime = nextTime - nextTime += period - logDebug("Callback for " + name + " called at time " + prevTime) + triggerActionForNextInterval() } - clock.waitTillTime(nextTime) - callback(nextTime) + triggerActionForNextInterval() } catch { case e: InterruptedException => } } + + /* + * Return whether `stop` is called. + */ + def isStopped: Boolean = stopped } private[streaming] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala new file mode 100644 index 000000000000..2b02a453aa1c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import scala.collection.mutable +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.ManualClock + +class RecurringTimerSuite extends SparkFunSuite { + + test("basic") { + val clock = new ManualClock() + val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val timer = new RecurringTimer(clock, 100, time => { + results += time + }, "RecurringTimerSuite-basic") + timer.start(0) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results === Seq(0L)) + } + clock.advance(100) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results === Seq(0L, 100L)) + } + clock.advance(200) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results === Seq(0L, 100L, 200L, 300L)) + } + assert(timer.stop(interruptTimer = true) === 300L) + } + + test("SPARK-10224: call 'callback' after stopping") { + val clock = new ManualClock() + val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val timer = new RecurringTimer(clock, 100, time => { + results += time + }, "RecurringTimerSuite-SPARK-10224") + timer.start(0) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(results === Seq(0L)) + } + @volatile var lastTime = -1L + // Now RecurringTimer is waiting for the next interval + val t = new Thread { + override def run(): Unit = { + lastTime = timer.stop(interruptTimer = false) + } + } + t.start() + // Make sure the `stopped` field has been changed + eventually(timeout(10.seconds), interval(10.millis)) { + assert(timer.isStopped) + } + clock.advance(200) + // When RecurringTimer is awake from clock.waitTillTime, it will call `callback` once. + // Then it will find `stopped` is false and exit the loop, but it should call `callback` again + // before exiting its internal thread. + t.join() + assert(results === Seq(0L, 100L, 200L)) + assert(lastTime === 200L) + } +} From 429728fe0c70b8ef17096c0f7024b35551495265 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 22 Sep 2015 22:56:05 +0800 Subject: [PATCH 4/5] Fix a typo --- .../org/apache/spark/streaming/util/RecurringTimerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala index 2b02a453aa1c..4d094c66bf7b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -72,7 +72,7 @@ class RecurringTimerSuite extends SparkFunSuite { } clock.advance(200) // When RecurringTimer is awake from clock.waitTillTime, it will call `callback` once. - // Then it will find `stopped` is false and exit the loop, but it should call `callback` again + // Then it will find `stopped` is true and exit the loop, but it should call `callback` again // before exiting its internal thread. t.join() assert(results === Seq(0L, 100L, 200L)) From e8e490da14b360c3be39fb66d7cc20a01c06decf Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 08:40:05 +0800 Subject: [PATCH 5/5] Remove isStopped and use PrivateMethod to access it --- .../apache/spark/streaming/util/RecurringTimer.scala | 9 +++------ .../spark/streaming/util/RecurringTimerSuite.scala | 12 +++++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index 300f6343a140..0148cb51c6f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -72,8 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: /** * Stop the timer, and return the last time the callback was made. - * interruptTimer = true will interrupt the callback + * - interruptTimer = true will interrupt the callback * if it is in progress (not guaranteed to give correct time in this case). + * - interruptTimer = false guarantees that there will be at least one callback after `stop` has + * been called. */ def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { @@ -108,11 +110,6 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: case e: InterruptedException => } } - - /* - * Return whether `stop` is called. - */ - def isStopped: Boolean = stopped } private[streaming] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala index 4d094c66bf7b..0544972d95c0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -20,12 +20,13 @@ package org.apache.spark.streaming.util import scala.collection.mutable import scala.concurrent.duration._ +import org.scalatest.PrivateMethodTester import org.scalatest.concurrent.Eventually._ import org.apache.spark.SparkFunSuite import org.apache.spark.util.ManualClock -class RecurringTimerSuite extends SparkFunSuite { +class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { test("basic") { val clock = new ManualClock() @@ -60,21 +61,22 @@ class RecurringTimerSuite extends SparkFunSuite { } @volatile var lastTime = -1L // Now RecurringTimer is waiting for the next interval - val t = new Thread { + val thread = new Thread { override def run(): Unit = { lastTime = timer.stop(interruptTimer = false) } } - t.start() + thread.start() + val stopped = PrivateMethod[RecurringTimer]('stopped) // Make sure the `stopped` field has been changed eventually(timeout(10.seconds), interval(10.millis)) { - assert(timer.isStopped) + assert(timer.invokePrivate(stopped()) === true) } clock.advance(200) // When RecurringTimer is awake from clock.waitTillTime, it will call `callback` once. // Then it will find `stopped` is true and exit the loop, but it should call `callback` again // before exiting its internal thread. - t.join() + thread.join() assert(results === Seq(0L, 100L, 200L)) assert(lastTime === 200L) }