Skip to content

Commit b17fe4e

Browse files
committed
[SPARK-14131][STREAMING] SQL Improved fix for avoiding potential deadlocks in HDFSMetadataLog
## What changes were proposed in this pull request? Current fix for deadlock disables interrupts in the StreamExecution which getting offsets for all sources, and when writing to any metadata log, to avoid potential deadlocks in HDFSMetadataLog(see JIRA for more details). However, disabling interrupts can have unintended consequences in other sources. So I am making the fix more narrow, by disabling interrupt it only in the HDFSMetadataLog. This is a narrower fix for something risky like disabling interrupt. ## How was this patch tested? Existing tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14292 from tdas/SPARK-14131. (cherry picked from commit c979c8b) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 41e72f6 commit b17fe4e

File tree

5 files changed

+80
-36
lines changed

5 files changed

+80
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging
3232
import org.apache.spark.network.util.JavaUtils
3333
import org.apache.spark.serializer.JavaSerializer
3434
import org.apache.spark.sql.SparkSession
35+
import org.apache.spark.util.UninterruptibleThread
3536

3637

3738
/**
@@ -91,18 +92,30 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
9192
serializer.deserialize[T](ByteBuffer.wrap(bytes))
9293
}
9394

95+
/**
96+
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
97+
* metadata has already been stored, this method will return `false`.
98+
*
99+
* Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
100+
* so that interrupts can be disabled while writing the batch file. This is because there is a
101+
* potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
102+
* running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
103+
* case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
104+
* file permissions, and can get deadlocked if the stream execution thread is stopped by
105+
* interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
106+
* allows us to disable interrupts here. Also see SPARK-14131.
107+
*/
94108
override def add(batchId: Long, metadata: T): Boolean = {
95109
get(batchId).map(_ => false).getOrElse {
96-
// Only write metadata when the batch has not yet been written.
97-
try {
98-
writeBatch(batchId, serialize(metadata))
99-
true
100-
} catch {
101-
case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
102-
// create may convert InterruptedException to IOException. Let's convert it back to
103-
// InterruptedException so that this failure won't crash StreamExecution
104-
throw new InterruptedException("Creating file is interrupted")
110+
// Only write metadata when the batch has not yet been written
111+
Thread.currentThread match {
112+
case ut: UninterruptibleThread =>
113+
ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) }
114+
case _ =>
115+
throw new IllegalStateException(
116+
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
105117
}
118+
true
106119
}
107120
}
108121

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ class StreamExecution(
110110
/* Get the call site in the caller thread; will pass this into the micro batch thread */
111111
private val callSite = Utils.getCallSite()
112112

113-
/** The thread that runs the micro-batches of this stream. */
113+
/**
114+
* The thread that runs the micro-batches of this stream. Note that this thread must be
115+
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
116+
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
117+
*/
114118
private[sql] val microBatchThread =
115119
new UninterruptibleThread(s"stream execution thread for $name") {
116120
override def run(): Unit = {
@@ -269,19 +273,11 @@ class StreamExecution(
269273
* batchId counter is incremented and a new log entry is written with the newest offsets.
270274
*/
271275
private def constructNextBatch(): Unit = {
272-
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
273-
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
274-
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
275-
// to set the file permission, we should not interrupt "microBatchThread" when running this
276-
// method. See SPARK-14131.
277-
//
278276
// Check to see what new data is available.
279277
val hasNewData = {
280278
awaitBatchLock.lock()
281279
try {
282-
val newData = microBatchThread.runUninterruptibly {
283-
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
284-
}
280+
val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
285281
availableOffsets ++= newData
286282

287283
if (dataAvailable) {
@@ -295,16 +291,8 @@ class StreamExecution(
295291
}
296292
}
297293
if (hasNewData) {
298-
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
299-
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
300-
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
301-
// the file permission, we should not interrupt "microBatchThread" when running this method.
302-
// See SPARK-14131.
303-
microBatchThread.runUninterruptibly {
304-
assert(
305-
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
306-
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
307-
}
294+
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
295+
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
308296
logInfo(s"Committed offsets for batch $currentBatchId.")
309297
} else {
310298
awaitBatchLock.lock()

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
190190
}
191191
}
192192

193-
test("compact") {
193+
testWithUninterruptibleThread("compact") {
194194
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
195195
withFileStreamSinkLog { sinkLog =>
196196
for (batchId <- 0 to 10) {
@@ -210,7 +210,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
210210
}
211211
}
212212

213-
test("delete expired file") {
213+
testWithUninterruptibleThread("delete expired file") {
214214
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
215215
// deterministically
216216
withSQLConf(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
3333
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
3434
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
3535
import org.apache.spark.sql.test.SharedSQLContext
36+
import org.apache.spark.util.UninterruptibleThread
3637

3738
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
3839

@@ -56,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
5657
}
5758
}
5859

59-
test("HDFSMetadataLog: basic") {
60+
testWithUninterruptibleThread("HDFSMetadataLog: basic") {
6061
withTempDir { temp =>
6162
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
6263
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
@@ -81,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
8182
}
8283
}
8384

84-
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
85+
testWithUninterruptibleThread(
86+
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
8587
spark.conf.set(
8688
s"fs.$scheme.impl",
8789
classOf[FakeFileSystem].getName)
@@ -101,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
101103
}
102104
}
103105

104-
test("HDFSMetadataLog: restart") {
106+
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
105107
withTempDir { temp =>
106108
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
107109
assert(metadataLog.add(0, "batch0"))
@@ -124,7 +126,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
124126
val waiter = new Waiter
125127
val maxBatchId = 100
126128
for (id <- 0 until 10) {
127-
new Thread() {
129+
new UninterruptibleThread(s"HDFSMetadataLog: metadata directory collision - thread $id") {
128130
override def run(): Unit = waiter {
129131
val metadataLog =
130132
new HDFSMetadataLog[String](spark, temp.getAbsolutePath)

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.UUID
2222

2323
import scala.language.implicitConversions
2424
import scala.util.Try
25+
import scala.util.control.NonFatal
2526

2627
import org.apache.hadoop.conf.Configuration
2728
import org.scalatest.BeforeAndAfterAll
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
3435
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3536
import org.apache.spark.sql.catalyst.util._
3637
import org.apache.spark.sql.execution.FilterExec
37-
import org.apache.spark.util.Utils
38+
import org.apache.spark.util.{UninterruptibleThread, Utils}
3839

3940
/**
4041
* Helper trait that should be extended by all SQL test suites.
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
247248
}
248249
}
249250
}
251+
252+
/** Run a test on a separate [[UninterruptibleThread]]. */
253+
protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false)
254+
(body: => Unit): Unit = {
255+
val timeoutMillis = 10000
256+
@transient var ex: Throwable = null
257+
258+
def runOnThread(): Unit = {
259+
val thread = new UninterruptibleThread(s"Testing thread for test $name") {
260+
override def run(): Unit = {
261+
try {
262+
body
263+
} catch {
264+
case NonFatal(e) =>
265+
ex = e
266+
}
267+
}
268+
}
269+
thread.setDaemon(true)
270+
thread.start()
271+
thread.join(timeoutMillis)
272+
if (thread.isAlive) {
273+
thread.interrupt()
274+
// If this interrupt does not work, then this thread is most likely running something that
275+
// is not interruptible. There is not much point to wait for the thread to termniate, and
276+
// we rather let the JVM terminate the thread on exit.
277+
fail(
278+
s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" +
279+
s" $timeoutMillis ms")
280+
} else if (ex != null) {
281+
throw ex
282+
}
283+
}
284+
285+
if (quietly) {
286+
testQuietly(name) { runOnThread() }
287+
} else {
288+
test(name) { runOnThread() }
289+
}
290+
}
250291
}
251292

252293
private[sql] object SQLTestUtils {

0 commit comments

Comments
 (0)