From 8f0b6365274b437299f60fa25195c166750007fb Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Thu, 1 Jan 2015 14:36:02 +0800 Subject: [PATCH 1/3] Wait for all receivers to deregister and the receiver job to terminate when using graceful shutdown --- .../streaming/scheduler/JobGenerator.scala | 23 ++++------ .../streaming/scheduler/JobScheduler.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 42 +++++++++++++++---- .../spark/streaming/util/TimeoutUtils.scala | 36 ++++++++++++++++ 4 files changed, 77 insertions(+), 26 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 39b66e113076..9cfbd1904640 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler import akka.actor.{ActorRef, ActorSystem, Props, Actor} import org.apache.spark.{SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} -import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +import org.apache.spark.streaming.util.{TimeoutUtils, ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} /** Event classes for JobGenerator */ @@ -93,27 +93,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") - val timeWhenStopStarted = System.currentTimeMillis() val stopTimeout = conf.getLong( "spark.streaming.gracefulStopTimeout", 10 * ssc.graph.batchDuration.milliseconds ) - val pollTime = 100 - - // To prevent graceful stop to get stuck permanently - def hasTimedOut = { - val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout - if (timedOut) { - logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")") - } - timedOut - } + // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { - Thread.sleep(pollTime) + if (!TimeoutUtils.waitUntilDone(stopTimeout, + () => !jobScheduler.receiverTracker.hasUnallocatedBlocks)) { + logError("Timeout waiting for unallocated blocks in receiverTracker") } logInfo("Waited for all received blocks to be consumed for job generation") @@ -127,8 +118,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime } logInfo("Waiting for jobs to be processed and checkpoints to be written") - while (!hasTimedOut && !haveAllBatchesBeenProcessed) { - Thread.sleep(pollTime) + if (!TimeoutUtils.waitUntilDone(stopTimeout, () => haveAllBatchesBeenProcessed)) { + logError("Timeout waiting for all batches to be processed") } logInfo("Waited for jobs to be processed and checkpoints to be written") } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index cfa3cd8925c8..d2d241cb6964 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -72,7 +72,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logDebug("Stopping JobScheduler") // First, stop receiving - receiverTracker.stop() + receiverTracker.stop(processAllReceivedData) // Second, stop generating jobs. If it has to process all received data, // then this will wait for all the processing through JobScheduler to be over. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8dbb42a86e3b..7b4ba1ac2dd8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming.scheduler +import org.apache.spark.streaming.util.TimeoutUtils + import scala.collection.mutable.{HashMap, SynchronizedMap} import scala.language.existentials @@ -87,10 +89,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } /** Stop the receiver execution thread. */ - def stop() = synchronized { + def stop(graceful: Boolean) = synchronized { if (!receiverInputStreams.isEmpty && actor != null) { // First, stop the receivers - if (!skipReceiverLaunch) receiverExecutor.stop() + if (!skipReceiverLaunch) receiverExecutor.stop(graceful) // Finally, stop the actor ssc.env.actorSystem.stop(actor) @@ -208,6 +210,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** This thread class runs all the receivers on the cluster. */ class ReceiverLauncher { @transient val env = ssc.env + private var terminated = true + private val conf = ssc.conf @transient val thread = new Thread() { override def run() { try { @@ -223,22 +227,40 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false thread.start() } - def stop() { + def stop(graceful: Boolean) { // Send the stop signal to all the receivers stopReceivers() // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. + val stopTimeout = conf.getLong( + "spark.streaming.gracefulStopTimeout", + 10 * ssc.graph.batchDuration.milliseconds) + + thread.join(10000) // Check if all the receivers have been deregistered or not - if (!receiverInfo.isEmpty) { - logWarning("All of the receivers have not deregistered, " + receiverInfo) + def done = { receiverInfo.isEmpty && terminated } + + if (graceful) { + TimeoutUtils.waitUntilDone(stopTimeout,() => done ) match { + case true => + logInfo("All of the receivers have deregistered successfully and job terminated") + case false => + logError("Timeout waiting for receivers to deregister") + } } else { - logInfo("All of the receivers have deregistered successfully") + if (!receiverInfo.isEmpty) { + logWarning("All of the receivers have not deregistered, " + receiverInfo) + } else { + logInfo("All of the receivers have deregistered successfully") + } } } + + /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. @@ -285,16 +307,18 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") + terminated = false ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) + terminated = true logInfo("All of the receivers have been terminated") } /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.flatMap { info => Option(info.actor)} - .foreach { _ ! StopReceiver } - logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") + val receivers = receiverInfo.values.flatMap { info => Option(info.actor) } + receivers.foreach { _ ! StopReceiver } + logInfo("Sent stop signal to all " + receivers.size + " receivers") } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala new file mode 100644 index 000000000000..27e42b05db9b --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.util + + +private[streaming] object TimeoutUtils { + /** + * Waiting function with a timeout. + * Returns true if done, false for timeout + */ + def waitUntilDone(timeout:Long, done:() => Boolean): Boolean = { + var result = false + val pollTime = 100 + val timeWhenStopStarted = System.currentTimeMillis() + def hasTimedOut = System.currentTimeMillis() - timeWhenStopStarted > timeout + while(!hasTimedOut && !done()) { + Thread.sleep(pollTime) + } + if (done()) result = true + result + } +} From 7fceadfabbcbdfdcdf2b21b9d25c6b7360bc285c Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Sat, 10 Jan 2015 21:13:13 +0800 Subject: [PATCH 2/3] Wrap JobScheduler shudown sequence inside a Future. --- .../streaming/scheduler/JobGenerator.scala | 18 +++-- .../streaming/scheduler/JobScheduler.scala | 67 +++++++++++++------ .../streaming/scheduler/ReceiverTracker.scala | 26 +++---- .../spark/streaming/util/TimeoutUtils.scala | 36 ---------- 4 files changed, 62 insertions(+), 85 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 9cfbd1904640..0f40eee37627 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler import akka.actor.{ActorRef, ActorSystem, Props, Actor} import org.apache.spark.{SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} -import org.apache.spark.streaming.util.{TimeoutUtils, ManualClock, RecurringTimer, Clock} +import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} /** Event classes for JobGenerator */ @@ -93,18 +93,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") - val stopTimeout = conf.getLong( - "spark.streaming.gracefulStopTimeout", - 10 * ssc.graph.batchDuration.milliseconds - ) // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - if (!TimeoutUtils.waitUntilDone(stopTimeout, - () => !jobScheduler.receiverTracker.hasUnallocatedBlocks)) { - logError("Timeout waiting for unallocated blocks in receiverTracker") + while (jobScheduler.receiverTracker.hasUnallocatedBlocks) { + Thread.sleep(100) } logInfo("Waited for all received blocks to be consumed for job generation") @@ -117,10 +112,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { def haveAllBatchesBeenProcessed = { lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime } + logInfo("Waiting for jobs to be processed and checkpoints to be written") - if (!TimeoutUtils.waitUntilDone(stopTimeout, () => haveAllBatchesBeenProcessed)) { - logError("Timeout waiting for all batches to be processed") + + while (!haveAllBatchesBeenProcessed) { + Thread.sleep(100) } + logInfo("Waited for jobs to be processed and checkpoints to be written") } else { logInfo("Stopping JobGenerator immediately") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d2d241cb6964..f48c8b46dab8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.scheduler -import scala.util.{Failure, Success, Try} +import scala.concurrent.{Future, ExecutionContext} +import scala.util.{Failure, Success} import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import akka.actor.{ActorRef, Actor, Props} @@ -68,36 +69,60 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } def stop(processAllReceivedData: Boolean): Unit = synchronized { - if (eventActor == null) return // scheduler has already been stopped - logDebug("Stopping JobScheduler") + val shutdownExecutor = Executors.newFixedThreadPool(1) + implicit val context = ExecutionContext.fromExecutorService(shutdownExecutor) - // First, stop receiving - receiverTracker.stop(processAllReceivedData) + val shutdown = Future { + if (eventActor == null) return // scheduler has already been stopped + logDebug("Stopping JobScheduler") - // Second, stop generating jobs. If it has to process all received data, - // then this will wait for all the processing through JobScheduler to be over. - jobGenerator.stop(processAllReceivedData) + // First, stop receiving + receiverTracker.stop(processAllReceivedData) - // Stop the executor for receiving new jobs - logDebug("Stopping job executor") - jobExecutor.shutdown() + // Second, stop generating jobs. If it has to process all received data, + // then this will wait for all the processing through JobScheduler to be over. + jobGenerator.stop(processAllReceivedData) - // Wait for the queued jobs to complete if indicated + // Stop the executor for receiving new jobs + logDebug("Stopping job executor") + jobExecutor.shutdown() + + // Wait for the queued jobs to complete if indicated + val terminated = if (processAllReceivedData) { + jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time + } else { + jobExecutor.awaitTermination(2, TimeUnit.SECONDS) + } + if (!terminated) { + jobExecutor.shutdownNow() + } + logDebug("Stopped job executor") + + // Stop everything else + listenerBus.stop() + ssc.env.actorSystem.stop(eventActor) + eventActor = null + } + + shutdownExecutor.shutdown() + + // Wait for the JobScheduler shutdown sequence to finish val terminated = if (processAllReceivedData) { - jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time + val gracefulTimeout = ssc.conf.getLong( + "spark.streaming.gracefulStopTimeout", + 100 * ssc.graph.batchDuration.milliseconds + ) + shutdownExecutor.awaitTermination(gracefulTimeout, TimeUnit.MILLISECONDS) } else { - jobExecutor.awaitTermination(2, TimeUnit.SECONDS) + shutdownExecutor.awaitTermination(5, TimeUnit.SECONDS) } if (!terminated) { - jobExecutor.shutdownNow() + logWarning("Timeout waiting for JobScheduler to stop") + shutdownExecutor.shutdownNow() + } else { + logInfo("Stopped JobScheduler") } - logDebug("Stopped job executor") - // Stop everything else - listenerBus.stop() - ssc.env.actorSystem.stop(eventActor) - eventActor = null - logInfo("Stopped JobScheduler") } def submitJobSet(jobSet: JobSet) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 7b4ba1ac2dd8..46fe00b2ace6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -18,8 +18,6 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.streaming.util.TimeoutUtils - import scala.collection.mutable.{HashMap, SynchronizedMap} import scala.language.existentials @@ -211,7 +209,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false class ReceiverLauncher { @transient val env = ssc.env private var terminated = true - private val conf = ssc.conf @transient val thread = new Thread() { override def run() { try { @@ -233,10 +230,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. - val stopTimeout = conf.getLong( - "spark.streaming.gracefulStopTimeout", - 10 * ssc.graph.batchDuration.milliseconds) - thread.join(10000) @@ -244,18 +237,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false def done = { receiverInfo.isEmpty && terminated } if (graceful) { - TimeoutUtils.waitUntilDone(stopTimeout,() => done ) match { - case true => - logInfo("All of the receivers have deregistered successfully and job terminated") - case false => - logError("Timeout waiting for receivers to deregister") + while (!done) { + Thread.sleep(100) } + } + + if (!receiverInfo.isEmpty) { + logWarning(s"All of the receivers have not deregistered, ${receiverInfo}") } else { - if (!receiverInfo.isEmpty) { - logWarning("All of the receivers have not deregistered, " + receiverInfo) - } else { - logInfo("All of the receivers have deregistered successfully") - } + logInfo("All of the receivers have deregistered successfully") } } @@ -318,7 +308,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Signal the receivers to stop val receivers = receiverInfo.values.flatMap { info => Option(info.actor) } receivers.foreach { _ ! StopReceiver } - logInfo("Sent stop signal to all " + receivers.size + " receivers") + logInfo(s"Sent stop signal to all ${receivers.size} receivers") } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala deleted file mode 100644 index 27e42b05db9b..000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.util - - -private[streaming] object TimeoutUtils { - /** - * Waiting function with a timeout. - * Returns true if done, false for timeout - */ - def waitUntilDone(timeout:Long, done:() => Boolean): Boolean = { - var result = false - val pollTime = 100 - val timeWhenStopStarted = System.currentTimeMillis() - def hasTimedOut = System.currentTimeMillis() - timeWhenStopStarted > timeout - while(!hasTimedOut && !done()) { - Thread.sleep(pollTime) - } - if (done()) result = true - result - } -} From 1c50a26120d3aecf583c3fced4e92caab0d50a91 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Sun, 25 Jan 2015 22:31:19 +0800 Subject: [PATCH 3/3] Add graceful shutdown unit test covering slow receiver onStop --- .../streaming/StreamingContextSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9f352bdcb089..cfb04840c1c3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -205,6 +205,33 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("stop slow receiver gracefully") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("spark.cleaner.ttl", "3600") + conf.set("spark.streaming.gracefulStopTimeout", "20000") + sc = new SparkContext(conf) + logInfo("==================================\n\n\n") + ssc = new StreamingContext(sc, Milliseconds(100)) + var runningCount = 0 + SlowTestReceiver.receivedAllRecords = false + //Create test receiver that sleeps in onStop() + val totalNumRecords = 15 + val recordsPerSecond = 1 + val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond)) + input.count().foreachRDD { rdd => + val count = rdd.first() + runningCount += count.toInt + logInfo("Count = " + count + ", Running count = " + runningCount) + } + ssc.start() + ssc.awaitTermination(500) + ssc.stop(stopSparkContext = false, stopGracefully = true) + logInfo("Running count = " + runningCount) + assert(runningCount > 0) + assert(runningCount == totalNumRecords) + Thread.sleep(100) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -319,6 +346,38 @@ object TestReceiver { val counter = new AtomicInteger(1) } +/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ +class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + for(i <- 1 to totalRecords) { + Thread.sleep(recordsPerSecond * 1000) + store(i) + } + SlowTestReceiver.receivedAllRecords = true + logInfo(s"Received all $totalRecords records") + } + } + receivingThreadOption = Some(thread) + thread.start() + } + + def onStop() { + // Simulate slow receiver by waiting for all records to be produced + while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100) + // no cleanup to be done, the receiving thread should stop on it own + } +} + +object SlowTestReceiver { + var receivedAllRecords = false +} + /** Streaming application for testing DStream and RDD creation sites */ package object testPackage extends Assertions { def test() {