Skip to content

Commit 94f108b

Browse files
committed
Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping
1 parent a0c0aae commit 94f108b

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ private[streaming] class BlockGenerator(
140140
}
141141
}
142142

143+
// When we arrive here, no data will be added to `currentBuffer`. However, `currentBuffer` may
144+
// not be empty. If so, we should wait until all data in `currentBuffer` is consumed, because
145+
// `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling
146+
// `updateCurrentBuffer` for us.
147+
var isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty }
148+
while(!isCurrentBufferEmpty) {
149+
Thread.sleep(blockIntervalMs)
150+
isCurrentBufferEmpty = synchronized { currentBuffer.isEmpty }
151+
}
152+
143153
// Stop generating blocks and set the state for block pushing thread to start draining the queue
144154
logInfo("Stopping BlockGenerator")
145155
blockIntervalTimer.stop(interruptTimer = false)

streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
184184
// Verify that the final data is present in the final generated block and
185185
// pushed before complete stop
186186
assert(blockGenerator.isStopped() === false) // generator has not stopped yet
187-
clock.advance(blockIntervalMs) // force block generation
188-
failAfter(1 second) {
189-
thread.join()
187+
eventually(timeout(10 seconds), interval(10 milliseconds)) {
188+
// Keep calling `advance` to avoid blocking forever in `clock.waitTillTime`
189+
clock.advance(blockIntervalMs)
190+
assert(thread.isAlive === false)
190191
}
191192
assert(blockGenerator.isStopped() === true) // generator has finally been completely stopped
192193
assert(listener.pushedData === data, "All data not pushed by stop()")

0 commit comments

Comments
 (0)