diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 39b66e113076..0f40eee37627 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -93,27 +93,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") - val timeWhenStopStarted = System.currentTimeMillis() - val stopTimeout = conf.getLong( - "spark.streaming.gracefulStopTimeout", - 10 * ssc.graph.batchDuration.milliseconds - ) - val pollTime = 100 - - // To prevent graceful stop to get stuck permanently - def hasTimedOut = { - val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout - if (timedOut) { - logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")") - } - timedOut - } + // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { - Thread.sleep(pollTime) + while (jobScheduler.receiverTracker.hasUnallocatedBlocks) { + Thread.sleep(100) } logInfo("Waited for all received blocks to be consumed for job generation") @@ -126,10 +112,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { def haveAllBatchesBeenProcessed = { lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime } + logInfo("Waiting for jobs to be processed and checkpoints to be written") - while (!hasTimedOut && !haveAllBatchesBeenProcessed) { - Thread.sleep(pollTime) + + while (!haveAllBatchesBeenProcessed) { + Thread.sleep(100) } + logInfo("Waited for jobs to be processed and checkpoints to be written") } else { logInfo("Stopping JobGenerator immediately") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index cfa3cd8925c8..f48c8b46dab8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.scheduler -import scala.util.{Failure, Success, Try} +import scala.concurrent.{Future, ExecutionContext} +import scala.util.{Failure, Success} import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import akka.actor.{ActorRef, Actor, Props} @@ -68,36 +69,60 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } def stop(processAllReceivedData: Boolean): Unit = synchronized { - if (eventActor == null) return // scheduler has already been stopped - logDebug("Stopping JobScheduler") + val shutdownExecutor = Executors.newFixedThreadPool(1) + implicit val context = ExecutionContext.fromExecutorService(shutdownExecutor) - // First, stop receiving - receiverTracker.stop() + val shutdown = Future { + if (eventActor == null) return // scheduler has already been stopped + logDebug("Stopping JobScheduler") - // Second, stop generating jobs. If it has to process all received data, - // then this will wait for all the processing through JobScheduler to be over. - jobGenerator.stop(processAllReceivedData) + // First, stop receiving + receiverTracker.stop(processAllReceivedData) - // Stop the executor for receiving new jobs - logDebug("Stopping job executor") - jobExecutor.shutdown() + // Second, stop generating jobs. If it has to process all received data, + // then this will wait for all the processing through JobScheduler to be over. + jobGenerator.stop(processAllReceivedData) - // Wait for the queued jobs to complete if indicated + // Stop the executor for receiving new jobs + logDebug("Stopping job executor") + jobExecutor.shutdown() + + // Wait for the queued jobs to complete if indicated + val terminated = if (processAllReceivedData) { + jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time + } else { + jobExecutor.awaitTermination(2, TimeUnit.SECONDS) + } + if (!terminated) { + jobExecutor.shutdownNow() + } + logDebug("Stopped job executor") + + // Stop everything else + listenerBus.stop() + ssc.env.actorSystem.stop(eventActor) + eventActor = null + } + + shutdownExecutor.shutdown() + + // Wait for the JobScheduler shutdown sequence to finish val terminated = if (processAllReceivedData) { - jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time + val gracefulTimeout = ssc.conf.getLong( + "spark.streaming.gracefulStopTimeout", + 100 * ssc.graph.batchDuration.milliseconds + ) + shutdownExecutor.awaitTermination(gracefulTimeout, TimeUnit.MILLISECONDS) } else { - jobExecutor.awaitTermination(2, TimeUnit.SECONDS) + shutdownExecutor.awaitTermination(5, TimeUnit.SECONDS) } if (!terminated) { - jobExecutor.shutdownNow() + logWarning("Timeout waiting for JobScheduler to stop") + shutdownExecutor.shutdownNow() + } else { + logInfo("Stopped JobScheduler") } - logDebug("Stopped job executor") - // Stop everything else - listenerBus.stop() - ssc.env.actorSystem.stop(eventActor) - eventActor = null - logInfo("Stopped JobScheduler") } def submitJobSet(jobSet: JobSet) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8dbb42a86e3b..46fe00b2ace6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -87,10 +87,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } /** Stop the receiver execution thread. */ - def stop() = synchronized { + def stop(graceful: Boolean) = synchronized { if (!receiverInputStreams.isEmpty && actor != null) { // First, stop the receivers - if (!skipReceiverLaunch) receiverExecutor.stop() + if (!skipReceiverLaunch) receiverExecutor.stop(graceful) // Finally, stop the actor ssc.env.actorSystem.stop(actor) @@ -208,6 +208,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** This thread class runs all the receivers on the cluster. */ class ReceiverLauncher { @transient val env = ssc.env + private var terminated = true @transient val thread = new Thread() { override def run() { try { @@ -223,22 +224,33 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false thread.start() } - def stop() { + def stop(graceful: Boolean) { // Send the stop signal to all the receivers stopReceivers() // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. + thread.join(10000) // Check if all the receivers have been deregistered or not + def done = { receiverInfo.isEmpty && terminated } + + if (graceful) { + while (!done) { + Thread.sleep(100) + } + } + if (!receiverInfo.isEmpty) { - logWarning("All of the receivers have not deregistered, " + receiverInfo) + logWarning(s"All of the receivers have not deregistered, ${receiverInfo}") } else { logInfo("All of the receivers have deregistered successfully") } } + + /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. @@ -285,16 +297,18 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") + terminated = false ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) + terminated = true logInfo("All of the receivers have been terminated") } /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.flatMap { info => Option(info.actor)} - .foreach { _ ! StopReceiver } - logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") + val receivers = receiverInfo.values.flatMap { info => Option(info.actor) } + receivers.foreach { _ ! StopReceiver } + logInfo(s"Sent stop signal to all ${receivers.size} receivers") } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9f352bdcb089..cfb04840c1c3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -205,6 +205,33 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("stop slow receiver gracefully") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("spark.cleaner.ttl", "3600") + conf.set("spark.streaming.gracefulStopTimeout", "20000") + sc = new SparkContext(conf) + logInfo("==================================\n\n\n") + ssc = new StreamingContext(sc, Milliseconds(100)) + var runningCount = 0 + SlowTestReceiver.receivedAllRecords = false + //Create test receiver that sleeps in onStop() + val totalNumRecords = 15 + val recordsPerSecond = 1 + val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond)) + input.count().foreachRDD { rdd => + val count = rdd.first() + runningCount += count.toInt + logInfo("Count = " + count + ", Running count = " + runningCount) + } + ssc.start() + ssc.awaitTermination(500) + ssc.stop(stopSparkContext = false, stopGracefully = true) + logInfo("Running count = " + runningCount) + assert(runningCount > 0) + assert(runningCount == totalNumRecords) + Thread.sleep(100) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -319,6 +346,38 @@ object TestReceiver { val counter = new AtomicInteger(1) } +/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ +class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + for(i <- 1 to totalRecords) { + Thread.sleep(recordsPerSecond * 1000) + store(i) + } + SlowTestReceiver.receivedAllRecords = true + logInfo(s"Received all $totalRecords records") + } + } + receivingThreadOption = Some(thread) + thread.start() + } + + def onStop() { + // Simulate slow receiver by waiting for all records to be produced + while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100) + // no cleanup to be done, the receiving thread should stop on it own + } +} + +object SlowTestReceiver { + var receivedAllRecords = false +} + /** Streaming application for testing DStream and RDD creation sites */ package object testPackage extends Assertions { def test() {