From 81bcc48e0c608a2c98369c7598d5040d3b39197e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 13 Feb 2017 16:18:36 -0800 Subject: [PATCH 1/3] Clean up HDFSMetadataLog for Hadoop 2.6 --- .../sql/kafka010/KafkaSourceOffsetSuite.scala | 2 +- .../execution/streaming/HDFSMetadataLog.scala | 59 ++----------------- .../execution/streaming/StreamExecution.scala | 4 +- .../CompactibleFileStreamLogSuite.scala | 4 +- .../streaming/FileStreamSinkLogSuite.scala | 4 +- .../streaming/HDFSMetadataLogSuite.scala | 9 ++- .../streaming/OffsetSeqLogSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- 8 files changed, 18 insertions(+), 68 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 10b35c74f473..efec51d09745 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } - testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") { + test("OffsetSeqLog serialization - deserialization") { withTempDir { temp => // use non-existent directory to test whether log make the dir val dir = new File(temp, "dir") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bfdc2cb0ac5b..ef583b2af523 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -109,43 +109,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: override def add(batchId: Long, metadata: T): Boolean = { get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written - if (fileManager.isLocalFileSystem) { - Thread.currentThread match { - case ut: UninterruptibleThread => - // When using a local file system, "writeBatch" must be called on a - // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled - // while writing the batch file. This is because there is a potential dead-lock in - // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running - // "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, - // `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set - // the file permission if using the local file system, and can get deadlocked if the - // stream execution thread is stopped by interrupt. Hence, we make sure that - // "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable - // interrupts here. Also see SPARK-14131. - ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } - case _ => - throw new IllegalStateException( - "HDFSMetadataLog.add() on a local file system must be executed on " + - "a o.a.spark.util.UninterruptibleThread") - } - } else { - // For a distributed file system, such as HDFS or S3, if the network is broken, write - // operations may just hang until timeout. We should enable interrupts to allow stopping - // the query fast. - writeBatch(batchId, metadata, serialize) - } + writeBatch(batchId, metadata) true } } - def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { - var nextId = 0 + def writeTempBatch(metadata: T): Option[Path] = { while (true) { val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") try { val output = fileManager.create(tempPath) try { - writer(metadata, output) + serialize(metadata, output) return Some(tempPath) } finally { IOUtils.closeQuietly(output) @@ -164,7 +139,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // big problem because it requires the attacker must have the permission to write the // metadata path. In addition, the old Streaming also have this issue, people can create // malicious checkpoint files to crash a Streaming application too. - nextId += 1 } } None @@ -176,8 +150,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a * valid behavior, we still need to prevent it from destroying the files. */ - private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { - val tempPath = writeTempBatch(metadata, writer).getOrElse( + private def writeBatch(batchId: Long, metadata: T): Unit = { + val tempPath = writeTempBatch(metadata).getOrElse( throw new IllegalStateException(s"Unable to create temp batch file $batchId")) try { // Try to commit the batch @@ -195,12 +169,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // So throw an exception to tell the user this is not a valid behavior. throw new ConcurrentModificationException( s"Multiple HDFSMetadataLog are using $path", e) - case e: FileNotFoundException => - // Sometimes, "create" will succeed when multiple writers are calling it at the same - // time. However, only one writer can call "rename" successfully, others will get - // FileNotFoundException because the first writer has removed it. - throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) } finally { fileManager.delete(tempPath) } @@ -332,9 +300,6 @@ object HDFSMetadataLog { /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ def delete(path: Path): Unit - - /** Whether the file systme is a local FS. */ - def isLocalFileSystem: Boolean } /** @@ -379,13 +344,6 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } - - override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match { - case _: local.LocalFs | _: local.RawLocalFs => - // LocalFs = RawLocalFs + ChecksumFs - true - case _ => false - } } /** @@ -442,12 +400,5 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } - - override def isLocalFileSystem: Boolean = fs match { - case _: LocalFileSystem | _: RawLocalFileSystem => - // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem - true - case _ => false - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3149ef04f7d1..561ac1e858a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -179,8 +179,8 @@ class StreamExecution( /** * The thread that runs the micro-batches of this stream. Note that this thread must be - * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using - * [[HDFSMetadataLog]]. See SPARK-14131 for more details. + * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential endless loop in + * `KafkaConsumer`. See KAFKA-1894 for more details. */ val microBatchThread = new StreamExecutionThread(s"stream execution thread for $prettyIdString") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 435d874d75b9..24d92a96237e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } - testWithUninterruptibleThread("compact") { + test("compact") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, @@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } - testWithUninterruptibleThread("delete expired file") { + test("delete expired file") { // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically withFakeCompactibleFileStreamLog( fileCleanupDelayMs = 0, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 7e0de5e2657b..340d2945acd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread("compact") { + test("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => for (batchId <- 0 to 10) { @@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread("delete expired file") { + test("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain withSQLConf( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index d556861a487f..55750b920298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread("HDFSMetadataLog: basic") { + test("HDFSMetadataLog: basic") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath) @@ -82,8 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread( - "HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) { + testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { spark.conf.set( s"fs.$scheme.impl", classOf[FakeFileSystem].getName) @@ -103,7 +102,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread("HDFSMetadataLog: purge") { + test("HDFSMetadataLog: purge") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -128,7 +127,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testWithUninterruptibleThread("HDFSMetadataLog: restart") { + test("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index bb4274a162e8..5ae8b2484d2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) } - testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") { + test("OffsetSeqLog - serialization - deserialization") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8a9fa94bea60..5110d89c85b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("b", 10)) } - testWithUninterruptibleThread("do not recheck that files exist during getBatch") { + test("do not recheck that files exist during getBatch") { withTempDir { temp => spark.conf.set( s"fs.$scheme.impl", From 14a199c2df29825e506d6373a4725215503c8145 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 14 Feb 2017 14:11:49 -0800 Subject: [PATCH 2/3] Debug --- .../sql/streaming/StreamingQueryManagerSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index f05e9d1fda73..94c817144858 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -49,6 +49,16 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } after { + val activeQueries = spark.streams.active + if (activeQueries.nonEmpty) { + for (query <- activeQueries) { + val stackTrace = + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.microBatchThread.getStackTrace + // scalastyle:off + println(stackTrace.mkString("\n")) + // scalastyle:on + } + } assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() } From 38444ea7d89c2626603cb1377735d9a83825872c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 Feb 2017 13:22:06 -0800 Subject: [PATCH 3/3] Restore the workaround --- .../sql/kafka010/KafkaSourceOffsetSuite.scala | 2 +- .../execution/streaming/HDFSMetadataLog.scala | 46 ++++++++++++++++++- .../execution/streaming/StreamExecution.scala | 4 +- .../CompactibleFileStreamLogSuite.scala | 4 +- .../streaming/FileStreamSinkLogSuite.scala | 4 +- .../streaming/HDFSMetadataLogSuite.scala | 9 ++-- .../streaming/OffsetSeqLogSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../StreamingQueryManagerSuite.scala | 10 ---- 9 files changed, 59 insertions(+), 24 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index efec51d09745..10b35c74f473 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } - test("OffsetSeqLog serialization - deserialization") { + testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") { withTempDir { temp => // use non-existent directory to test whether log make the dir val dir = new File(temp, "dir") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index ef583b2af523..3155ce04a110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -109,7 +109,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: override def add(batchId: Long, metadata: T): Boolean = { get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written - writeBatch(batchId, metadata) + if (fileManager.isLocalFileSystem) { + Thread.currentThread match { + case ut: UninterruptibleThread => + // When using a local file system, "writeBatch" must be called on a + // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled + // while writing the batch file. + // + // This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). + // If the user tries to stop a query, and the thread running "Shell.runCommand" is + // interrupted, then InterruptException will be dropped and the query will be still + // running. (Note: `writeBatch` creates a file using HDFS APIs and will call + // "Shell.runCommand" to set the file permission if using the local file system) + // + // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which + // allows us to disable interrupts here, in order to propagate the interrupt state + // correctly. Also see SPARK-19599. + ut.runUninterruptibly { writeBatch(batchId, metadata) } + case _ => + throw new IllegalStateException( + "HDFSMetadataLog.add() on a local file system must be executed on " + + "a o.a.spark.util.UninterruptibleThread") + } + } else { + // For a distributed file system, such as HDFS or S3, if the network is broken, write + // operations may just hang until timeout. We should enable interrupts to allow stopping + // the query fast. + writeBatch(batchId, metadata) + } true } } @@ -300,6 +327,9 @@ object HDFSMetadataLog { /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ def delete(path: Path): Unit + + /** Whether the file systme is a local FS. */ + def isLocalFileSystem: Boolean } /** @@ -344,6 +374,13 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } + + override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match { + case _: local.LocalFs | _: local.RawLocalFs => + // LocalFs = RawLocalFs + ChecksumFs + true + case _ => false + } } /** @@ -400,5 +437,12 @@ object HDFSMetadataLog { // ignore if file has already been deleted } } + + override def isLocalFileSystem: Boolean = fs match { + case _: LocalFileSystem | _: RawLocalFileSystem => + // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem + true + case _ => false + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 561ac1e858a5..239d49b08a2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -179,8 +179,8 @@ class StreamExecution( /** * The thread that runs the micro-batches of this stream. Note that this thread must be - * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential endless loop in - * `KafkaConsumer`. See KAFKA-1894 for more details. + * [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when + * using [[HDFSMetadataLog]]. See SPARK-19599 for more details. */ val microBatchThread = new StreamExecutionThread(s"stream execution thread for $prettyIdString") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 24d92a96237e..435d874d75b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } - test("compact") { + testWithUninterruptibleThread("compact") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, @@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } - test("delete expired file") { + testWithUninterruptibleThread("delete expired file") { // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically withFakeCompactibleFileStreamLog( fileCleanupDelayMs = 0, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 340d2945acd4..7e0de5e2657b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("compact") { + testWithUninterruptibleThread("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => for (batchId <- 0 to 10) { @@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("delete expired file") { + testWithUninterruptibleThread("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain withSQLConf( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 55750b920298..d556861a487f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("HDFSMetadataLog: basic") { + testWithUninterruptibleThread("HDFSMetadataLog: basic") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath) @@ -82,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { + testWithUninterruptibleThread( + "HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) { spark.conf.set( s"fs.$scheme.impl", classOf[FakeFileSystem].getName) @@ -102,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("HDFSMetadataLog: purge") { + testWithUninterruptibleThread("HDFSMetadataLog: purge") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -127,7 +128,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("HDFSMetadataLog: restart") { + testWithUninterruptibleThread("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 5ae8b2484d2e..bb4274a162e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) } - test("OffsetSeqLog - serialization - deserialization") { + testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 5110d89c85b1..8a9fa94bea60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("b", 10)) } - test("do not recheck that files exist during getBatch") { + testWithUninterruptibleThread("do not recheck that files exist during getBatch") { withTempDir { temp => spark.conf.set( s"fs.$scheme.impl", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 94c817144858..f05e9d1fda73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -49,16 +49,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } after { - val activeQueries = spark.streams.active - if (activeQueries.nonEmpty) { - for (query <- activeQueries) { - val stackTrace = - query.asInstanceOf[StreamingQueryWrapper].streamingQuery.microBatchThread.getStackTrace - // scalastyle:off - println(stackTrace.mkString("\n")) - // scalastyle:on - } - } assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() }