Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread


/**
Expand Down Expand Up @@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
serializer.deserialize[T](ByteBuffer.wrap(bytes))
}

/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*
* Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
* so that interrupts can be disabled while writing the batch file. This is because there is a
* potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
* running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
* case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
* file permissions, and can get deadlocked if the stream execution thread is stopped by
* interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
* allows us to disable interrupts here. Also see SPARK-14131.
*/
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written.
try {
writeBatch(batchId, serialize(metadata))
true
} catch {
case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
// create may convert InterruptedException to IOException. Let's convert it back to
// InterruptedException so that this failure won't crash StreamExecution
throw new InterruptedException("Creating file is interrupted")
// Only write metadata when the batch has not yet been written
Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
}
true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ class StreamExecution(
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()

/** The thread that runs the micro-batches of this stream. */
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
*/
private[sql] val microBatchThread =
new UninterruptibleThread(s"stream execution thread for $name") {
override def run(): Unit = {
Expand Down Expand Up @@ -269,19 +273,11 @@ class StreamExecution(
* batchId counter is incremented and a new log entry is written with the newest offsets.
*/
private def constructNextBatch(): Unit = {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
// to set the file permission, we should not interrupt "microBatchThread" when running this
// method. See SPARK-14131.
//
// Check to see what new data is available.
val hasNewData = {
awaitBatchLock.lock()
try {
val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a single line but takes a while to figure out what it does. I'd rewrite it to:

uniqueSources.map(s => (s, s.getOffset))...

and would do more transformation depending on the types (didn't check in IDE) Just an idea to untangle the knots :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You snippet coverts it Seq[(Source, Option[Offset])]. I find it more tedious to extract Seq[Source, Offset)] from it.

uniqueSources.map(s => (s, s.getOffset)).filter(_._2.nonEmpty).map { case (k, v) => (k, v.get)}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave it a longer thought. I'm not using for comprehension very often, but when I do...What do you think about this?

        val newData = for {
          source <- uniqueSources
          offset <- source.getOffset
        } yield (source, offset)

availableOffsets ++= newData

if (dataAvailable) {
Expand All @@ -295,16 +291,8 @@ class StreamExecution(
}
}
if (hasNewData) {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
// the file permission, we should not interrupt "microBatchThread" when running this method.
// See SPARK-14131.
microBatchThread.runUninterruptibly {
assert(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
}
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
} else {
awaitBatchLock.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("compact") {
testWithUninterruptibleThread("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
Expand All @@ -210,7 +210,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("delete expired file") {
testWithUninterruptibleThread("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.UninterruptibleThread

class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {

Expand All @@ -56,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("HDFSMetadataLog: basic") {
testWithUninterruptibleThread("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
Expand All @@ -81,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
testWithUninterruptibleThread(
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
spark.conf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
Expand All @@ -101,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("HDFSMetadataLog: restart") {
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand All @@ -124,7 +126,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
val waiter = new Waiter
val maxBatchId = 100
for (id <- 0 until 10) {
new Thread() {
new UninterruptibleThread(s"HDFSMetadataLog: metadata directory collision - thread $id") {
override def run(): Unit = waiter {
val metadataLog =
new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID

import scala.language.implicitConversions
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterAll
Expand All @@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.util.Utils
import org.apache.spark.util.{UninterruptibleThread, Utils}

/**
* Helper trait that should be extended by all SQL test suites.
Expand Down Expand Up @@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
}
}
}

/** Run a test on a separate [[UninterruptibleThread]]. */
protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false)
(body: => Unit): Unit = {
val timeoutMillis = 10000
@transient var ex: Throwable = null

def runOnThread(): Unit = {
val thread = new UninterruptibleThread(s"Testing thread for test $name") {
override def run(): Unit = {
try {
body
} catch {
case NonFatal(e) =>
ex = e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work?! You're on another thread here and closing over ex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. ex needs to be transient.

}
}
}
thread.setDaemon(true)
thread.start()
thread.join(timeoutMillis)
if (thread.isAlive) {
thread.interrupt()
// If this interrupt does not work, then this thread is most likely running something that
// is not interruptible. There is not much point to wait for the thread to termniate, and
// we rather let the JVM terminate the thread on exit.
fail(
s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" +
s" $timeoutMillis ms")
} else if (ex != null) {
throw ex
}
}

if (quietly) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate your comment on the following alternative:

val f = if (quietly) testQuietly else test
f(name) { runOnThread() }

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its more scala-ish, but slightly non-intuitive to read. Maybe rename f to testingFunc

Copy link
Contributor Author

@tdas tdas Jul 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not compile that easily because test and testQuietly have different param signature, and the code gets complicated trying to make it work.

val f = if (quietly) testQuietly(name) _ else test(name) _
    f {
      runOnThread()
    }

This is hard to understand. So I am keeping it as is.

Copy link
Contributor

@jaceklaskowski jaceklaskowski Jul 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this?

val testingFunc = if (quietly) testQuietly(name) else test(name)
testingFunc(runOnThread())

testQuietly(name) { runOnThread() }
} else {
test(name) { runOnThread() }
}
}
}

private[sql] object SQLTestUtils {
Expand Down