Skip to content

Commit e103e8a

Browse files
committed
Move ReceiverTracker.stop into ReceiverTracker.stop
1 parent f637142 commit e103e8a

File tree

1 file changed

+32
-27
lines changed

1 file changed

+32
-27
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,30 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
106106
if (isTrackerStarted) {
107107
// First, stop the receivers
108108
trackerState = Stopping
109-
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
109+
if (!skipReceiverLaunch) {
110+
// Send the stop signal to all the receivers
111+
endpoint.askWithRetry[Boolean](StopAllReceivers)
112+
113+
// Wait for the Spark job that runs the receivers to be over
114+
// That is, for the receivers to quit gracefully.
115+
receiverExecutor.awaitTermination(10000)
116+
117+
if (graceful) {
118+
val pollTime = 100
119+
logInfo("Waiting for receiver job to terminate gracefully")
120+
while (receiverInfo.nonEmpty || receiverExecutor.running) {
121+
Thread.sleep(pollTime)
122+
}
123+
logInfo("Waited for receiver job to terminate gracefully")
124+
}
125+
126+
// Check if all the receivers have been deregistered or not
127+
if (receiverInfo.nonEmpty) {
128+
logWarning("Not all of the receivers have deregistered, " + receiverInfo)
129+
} else {
130+
logInfo("All of the receivers have deregistered successfully")
131+
}
132+
}
110133

111134
// Finally, stop the endpoint
112135
ssc.env.rpcEnv.stop(endpoint)
@@ -269,7 +292,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
269292
/** This thread class runs all the receivers on the cluster. */
270293
class ReceiverLauncher {
271294
@transient val env = ssc.env
272-
@volatile @transient private var running = false
295+
@volatile @transient var running = false
273296
@transient val thread = new Thread() {
274297
override def run() {
275298
try {
@@ -285,31 +308,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
285308
thread.start()
286309
}
287310

288-
def stop(graceful: Boolean) {
289-
// Send the stop signal to all the receivers
290-
endpoint.askWithRetry[Boolean](StopAllReceivers)
291-
292-
// Wait for the Spark job that runs the receivers to be over
293-
// That is, for the receivers to quit gracefully.
294-
thread.join(10000)
295-
296-
if (graceful) {
297-
val pollTime = 100
298-
logInfo("Waiting for receiver job to terminate gracefully")
299-
while (receiverInfo.nonEmpty || running) {
300-
Thread.sleep(pollTime)
301-
}
302-
logInfo("Waited for receiver job to terminate gracefully")
303-
}
304-
305-
// Check if all the receivers have been deregistered or not
306-
if (receiverInfo.nonEmpty) {
307-
logWarning("Not all of the receivers have deregistered, " + receiverInfo)
308-
} else {
309-
logInfo("All of the receivers have deregistered successfully")
310-
}
311-
}
312-
313311
/**
314312
* Get the list of executors excluding driver
315313
*/
@@ -402,6 +400,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
402400
}
403401
}
404402

403+
/**
404+
* Wait until the Spark job that runs the receivers is terminated, or return when
405+
* `milliseconds` elapses
406+
*/
407+
def awaitTermination(milliseconds: Long): Unit = {
408+
thread.join(milliseconds)
409+
}
405410
}
406411

407412
/** Check if tracker has been marked for starting */

0 commit comments

Comments
 (0)