Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
val stopTimeout = conf.getLong(
"spark.streaming.gracefulStopTimeout",
10 * ssc.graph.batchDuration.milliseconds
)
val pollTime = 100

// To prevent graceful stop to get stuck permanently
def hasTimedOut = {
val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
if (timedOut) {
logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
}
timedOut
}


// Wait until all the received blocks in the network input tracker has
// been consumed by network input DStreams, and jobs have been generated with them
logInfo("Waiting for all received blocks to be consumed for job generation")
while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) {
Thread.sleep(pollTime)
while (jobScheduler.receiverTracker.hasUnallocatedBlocks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space after while.

Thread.sleep(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please unify both the 100 in this function into a single variable, called pollInterval

}
logInfo("Waited for all received blocks to be consumed for job generation")

Expand All @@ -126,10 +112,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
def haveAllBatchesBeenProcessed = {
lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime
}

logInfo("Waiting for jobs to be processed and checkpoints to be written")
while (!hasTimedOut && !haveAllBatchesBeenProcessed) {
Thread.sleep(pollTime)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line.

while (!haveAllBatchesBeenProcessed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space after while.

Thread.sleep(100)
}

logInfo("Waited for jobs to be processed and checkpoints to be written")
} else {
logInfo("Stopping JobGenerator immediately")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.streaming.scheduler

import scala.util.{Failure, Success, Try}
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Failure, Success}
import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
Expand Down Expand Up @@ -68,36 +69,60 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

def stop(processAllReceivedData: Boolean): Unit = synchronized {
if (eventActor == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")
val shutdownExecutor = Executors.newFixedThreadPool(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use org.apache.spark.util.Utils.newDaemonFixedThreadPool(...)
It will have nice thread names and all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its confusing to have another variable "shutdownExecutor" when there is a "jobExecutor". It might be a good idea to rename this to something like "shutdownThreadPool`.

implicit val context = ExecutionContext.fromExecutorService(shutdownExecutor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this as "execContext" or something that disambiguates it from streaming context, which is referred here and there in this code.


// First, stop receiving
receiverTracker.stop()
val shutdown = Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown --> shutdownFuture

if (eventActor == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")

// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
jobGenerator.stop(processAllReceivedData)
// First, stop receiving
receiverTracker.stop(processAllReceivedData)

// Stop the executor for receiving new jobs
logDebug("Stopping job executor")
jobExecutor.shutdown()
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
jobGenerator.stop(processAllReceivedData)

// Wait for the queued jobs to complete if indicated
// Stop the executor for receiving new jobs
logDebug("Stopping job executor")
jobExecutor.shutdown()

// Wait for the queued jobs to complete if indicated
val terminated = if (processAllReceivedData) {
jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time
} else {
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, that is how it was before.

}
if (!terminated) {
jobExecutor.shutdownNow()
}
logDebug("Stopped job executor")

// Stop everything else
listenerBus.stop()
ssc.env.actorSystem.stop(eventActor)
eventActor = null
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand the logic here. Since we created a future, shouldnt we be waiting for the future to complete. Its non-intuitive to wait on the executor. In fact, when gracefully shutting, the necessary and sufficient condition to wait for is the completion of the future (which contains waiting for job generator and job executor to shutdown).

shutdownExecutor.shutdown()

// Wait for the JobScheduler shutdown sequence to finish
val terminated = if (processAllReceivedData) {
jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time
val gracefulTimeout = ssc.conf.getLong(
"spark.streaming.gracefulStopTimeout",
100 * ssc.graph.batchDuration.milliseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number 100 seems quite arbitrary. We would like to expose this timeout, in which case there are two possibilities

  • spark.streaming.gracefulStopTimeout - A fixed interval, say 10 seconds. The system will wait approx that long.
  • spark.streaming.gracefulStopTimeoutMultiplier - A constant, say M. The system will wait for approx (M * batch-duration).

Each has its merits. The first is easier to understand for developers and operational people, because it will not vary from app to app (different apps can have different batch intervals). The second one is more logical as it makes sense scale the timeout based on batch duration - an app with 1 minute batches will obviously take longer to shutdown than another app with 2 second batches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naah, I thought more about it. This is more or less fine. The default should be multiple of the batch duration (so that it scales), but when the app sets it up explicitly, it should be something that the app developer more intuitively understands. Though I recommend reducing it to 10 * batch duration.

)
shutdownExecutor.awaitTermination(gracefulTimeout, TimeUnit.MILLISECONDS)
} else {
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
shutdownExecutor.awaitTermination(5, TimeUnit.SECONDS)
}
if (!terminated) {
jobExecutor.shutdownNow()
logWarning("Timeout waiting for JobScheduler to stop")
shutdownExecutor.shutdownNow()
} else {
logInfo("Stopped JobScheduler")
}
logDebug("Stopped job executor")

// Stop everything else
listenerBus.stop()
ssc.env.actorSystem.stop(eventActor)
eventActor = null
logInfo("Stopped JobScheduler")
}

def submitJobSet(jobSet: JobSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}

/** Stop the receiver execution thread. */
def stop() = synchronized {
def stop(graceful: Boolean) = synchronized {
if (!receiverInputStreams.isEmpty && actor != null) {
// First, stop the receivers
if (!skipReceiverLaunch) receiverExecutor.stop()
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)

// Finally, stop the actor
ssc.env.actorSystem.stop(actor)
Expand Down Expand Up @@ -208,6 +208,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** This thread class runs all the receivers on the cluster. */
class ReceiverLauncher {
@transient val env = ssc.env
private var terminated = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this initialized to true? Should it not be initialized to false. Then line 300 is not necessary.
Also since this var is being modified and checked from multiple threads, best to mark this as volatile. And also transient.

@transient val thread = new Thread() {
override def run() {
try {
Expand All @@ -223,22 +224,33 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
thread.start()
}

def stop() {
def stop(graceful: Boolean) {
// Send the stop signal to all the receivers
stopReceivers()

// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra empty line.

thread.join(10000)

// Check if all the receivers have been deregistered or not
def done = { receiverInfo.isEmpty && terminated }

if (graceful) {
while (!done) {
Thread.sleep(100)
}
}

if (!receiverInfo.isEmpty) {
logWarning("All of the receivers have not deregistered, " + receiverInfo)
logWarning(s"All of the receivers have not deregistered, ${receiverInfo}")
} else {
logInfo("All of the receivers have deregistered successfully")
}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra lines.


/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand Down Expand Up @@ -285,16 +297,18 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
terminated = false
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
terminated = true
logInfo("All of the receivers have been terminated")
}

/** Stops the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
receiverInfo.values.flatMap { info => Option(info.actor)}
.foreach { _ ! StopReceiver }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
val receivers = receiverInfo.values.flatMap { info => Option(info.actor) }
receivers.foreach { _ ! StopReceiver }
logInfo(s"Sent stop signal to all ${receivers.size} receivers")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,33 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}

test("stop slow receiver gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.ttl", "3600")
conf.set("spark.streaming.gracefulStopTimeout", "20000")
sc = new SparkContext(conf)
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
//Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
input.count().foreachRDD { rdd =>
val count = rdd.first()
runningCount += count.toInt
logInfo("Count = " + count + ", Running count = " + runningCount)
}
ssc.start()
ssc.awaitTermination(500)
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
assert(runningCount > 0)
assert(runningCount == totalNumRecords)
Thread.sleep(100)
}

test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down Expand Up @@ -319,6 +346,38 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

var receivingThreadOption: Option[Thread] = None

def onStart() {
val thread = new Thread() {
override def run() {
logInfo("Receiving started")
for(i <- 1 to totalRecords) {
Thread.sleep(recordsPerSecond * 1000)
store(i)
}
SlowTestReceiver.receivedAllRecords = true
logInfo(s"Received all $totalRecords records")
}
}
receivingThreadOption = Some(thread)
thread.start()
}

def onStop() {
// Simulate slow receiver by waiting for all records to be produced
while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
// no cleanup to be done, the receiving thread should stop on it own
}
}

object SlowTestReceiver {
var receivedAllRecords = false
}

/** Streaming application for testing DStream and RDD creation sites */
package object testPackage extends Assertions {
def test() {
Expand Down