Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.util
import java.util.concurrent._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

Expand Down Expand Up @@ -86,4 +87,62 @@ private[spark] object ThreadUtils {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadScheduledExecutor(threadFactory)
}

/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
* method for clarity. The exception stack traces will be like the following
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
* at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
* at CallerClass.caller-method (sourcefile.scala)
* ...
*/
def runInNewThread[T](
threadName: String,
isDaemon: Boolean = true)(body: => T): T = {
@volatile var exception: Option[Throwable] = None
@volatile var result: T = null.asInstanceOf[T]

val thread = new Thread(threadName) {
override def run(): Unit = {
try {
result = body
} catch {
case NonFatal(e) =>
exception = Some(e)
}
}
}
thread.setDaemon(isDaemon)
thread.start()
thread.join()

exception match {
case Some(realException) =>
// Remove the part of the stack that shows method calls into this helper method
// This means drop everything from the top until the stack element
// ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)

// Remove the part of the new thread stack that shows methods call from this helper method
val extraStackTrace = realException.getStackTrace.takeWhile(
! _.getClassName.contains(this.getClass.getSimpleName))

// Combine the two stack traces, with a place holder just specifying that there
// was a helper method used, without any further details of the helper
val placeHolderStackElem = new StackTraceElement(
s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
" ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace

// Update the stack trace and rethrow the exception in the caller thread
realException.setStackTrace(finalStackTrace)
throw realException
case None =>
result
}
}
}
24 changes: 23 additions & 1 deletion core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.util

import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random

import org.apache.spark.SparkFunSuite

Expand Down Expand Up @@ -66,4 +67,25 @@ class ThreadUtilsSuite extends SparkFunSuite {
val futureThreadName = Await.result(f, 10.seconds)
assert(futureThreadName === callerThreadName)
}

test("runInNewThread") {
import ThreadUtils._
assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name")
assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true)
assert(
runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false
)
val uniqueExceptionMessage = "test" + Random.nextInt()
val exception = intercept[IllegalArgumentException] {
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
}
assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage)
assert(exception.getStackTrace.mkString("\n").contains(
"... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true,
"stack trace does not contain expected place holder"
)
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
"stack trace contains unexpected references to ThreadUtils"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -202,6 +202,12 @@ class StreamingContext private[streaming] (

private var shutdownHookRef: AnyRef = _

// The streaming scheduler and other threads started by the StreamingContext
// should not inherit jobs group and job descriptions from the thread that
// start the context. This configuration allows jobs group and job description
// to be cleared in threads related to streaming. See SPARK-10649.
sparkContext.conf.set("spark.localProperties.clone", "true")

conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)

/**
Expand Down Expand Up @@ -588,12 +594,20 @@ class StreamingContext private[streaming] (
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
scheduler.start()

// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
Expand All @@ -618,6 +632,7 @@ class StreamingContext private[streaming] (
}
}


/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}

test("start should set job group and description of streaming jobs correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc

@volatile var jobGroupFound: String = ""
@volatile var jobDescFound: String = ""
@volatile var jobInterruptFound: String = ""
@volatile var allFound: Boolean = false

addInputStream(ssc).foreachRDD { rdd =>
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
allFound = true
}
ssc.start()

eventually(timeout(10 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}

// Verify streaming jobs have expected thread-local properties
assert(jobGroupFound === null)
assert(jobDescFound === null)
assert(jobInterruptFound === "false")

// Verify current thread's thread-local properties have not changed
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
}

test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
Expand Down