Skip to content

Commit e8e490d

Browse files
committed
Remove isStopped and use PrivateMethod to access it
1 parent 429728f commit e8e490d

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
7272

7373
/**
7474
* Stop the timer, and return the last time the callback was made.
75-
* interruptTimer = true will interrupt the callback
75+
* - interruptTimer = true will interrupt the callback
7676
* if it is in progress (not guaranteed to give correct time in this case).
77+
* - interruptTimer = false guarantees that there will be at least one callback after `stop` has
78+
* been called.
7779
*/
7880
def stop(interruptTimer: Boolean): Long = synchronized {
7981
if (!stopped) {
@@ -108,11 +110,6 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
108110
case e: InterruptedException =>
109111
}
110112
}
111-
112-
/*
113-
* Return whether `stop` is called.
114-
*/
115-
def isStopped: Boolean = stopped
116113
}
117114

118115
private[streaming]

streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark.streaming.util
2020
import scala.collection.mutable
2121
import scala.concurrent.duration._
2222

23+
import org.scalatest.PrivateMethodTester
2324
import org.scalatest.concurrent.Eventually._
2425

2526
import org.apache.spark.SparkFunSuite
2627
import org.apache.spark.util.ManualClock
2728

28-
class RecurringTimerSuite extends SparkFunSuite {
29+
class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
2930

3031
test("basic") {
3132
val clock = new ManualClock()
@@ -60,21 +61,22 @@ class RecurringTimerSuite extends SparkFunSuite {
6061
}
6162
@volatile var lastTime = -1L
6263
// Now RecurringTimer is waiting for the next interval
63-
val t = new Thread {
64+
val thread = new Thread {
6465
override def run(): Unit = {
6566
lastTime = timer.stop(interruptTimer = false)
6667
}
6768
}
68-
t.start()
69+
thread.start()
70+
val stopped = PrivateMethod[RecurringTimer]('stopped)
6971
// Make sure the `stopped` field has been changed
7072
eventually(timeout(10.seconds), interval(10.millis)) {
71-
assert(timer.isStopped)
73+
assert(timer.invokePrivate(stopped()) === true)
7274
}
7375
clock.advance(200)
7476
// When RecurringTimer is awake from clock.waitTillTime, it will call `callback` once.
7577
// Then it will find `stopped` is true and exit the loop, but it should call `callback` again
7678
// before exiting its internal thread.
77-
t.join()
79+
thread.join()
7880
assert(results === Seq(0L, 100L, 200L))
7981
assert(lastTime === 200L)
8082
}

0 commit comments

Comments
 (0)