Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
}


testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
test("OffsetSeqLog serialization - deserialization") {
withTempDir { temp =>
// use non-existent directory to test whether log make the dir
val dir = new File(temp, "dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.UninterruptibleThread


/**
Expand Down Expand Up @@ -109,39 +108,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
if (fileManager.isLocalFileSystem) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file.
//
// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084).
// If the user tries to stop a query, and the thread running "Shell.runCommand" is
// interrupted, then InterruptException will be dropped and the query will be still
// running. (Note: `writeBatch` creates a file using HDFS APIs and will call
// "Shell.runCommand" to set the file permission if using the local file system)
//
// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which
// allows us to disable interrupts here, in order to propagate the interrupt state
// correctly. Also see SPARK-19599.
ut.runUninterruptibly { writeBatch(batchId, metadata) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
"a o.a.spark.util.UninterruptibleThread")
}
} else {
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata)
}
writeBatch(batchId, metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didnt we disable interrupt because with local files, hadoop used shell commands to do file manipulation which could hang when interrupted? Are we removing this now because that has been fixed in hadoop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing this now because that has been fixed in hadoop?

Yes. We dropped the support to Hadoop 2.5 and earlier versions.

true
}
}

def writeTempBatch(metadata: T): Option[Path] = {
private def writeTempBatch(metadata: T): Option[Path] = {
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
Expand Down Expand Up @@ -327,9 +299,6 @@ object HDFSMetadataLog {

/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
def delete(path: Path): Unit

/** Whether the file systme is a local FS. */
def isLocalFileSystem: Boolean
}

/**
Expand Down Expand Up @@ -374,13 +343,6 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
case _: local.LocalFs | _: local.RawLocalFs =>
// LocalFs = RawLocalFs + ChecksumFs
true
case _ => false
}
}

/**
Expand Down Expand Up @@ -437,12 +399,5 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fs match {
case _: LocalFileSystem | _: RawLocalFileSystem =>
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.sql.execution.streaming

import java.io.IOException
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -158,8 +158,7 @@ class StreamExecution(
}

/** Defines the internal state of execution */
@volatile
private var state: State = INITIALIZING
private val state = new AtomicReference[State](INITIALIZING)

@volatile
var lastExecution: QueryExecution = _
Expand All @@ -179,8 +178,8 @@ class StreamExecution(

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when
* using [[HDFSMetadataLog]]. See SPARK-19599 for more details.
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
* running `KafkaConsumer` may cause endless loop.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
Expand All @@ -201,10 +200,10 @@ class StreamExecution(
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state != INITIALIZING
private def isInitialized: Boolean = state.get != INITIALIZING

/** Whether the query is currently active or not */
override def isActive: Boolean = state != TERMINATED
override def isActive: Boolean = state.get != TERMINATED

/** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
Expand Down Expand Up @@ -250,53 +249,56 @@ class StreamExecution(
updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be created
logicalPlan
state = ACTIVE
// Unblock `awaitInitialization`
initializationLatch.countDown()

triggerExecutor.execute(() => {
startTrigger()

val isTerminated =
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most changes here are space changes. You can use https://github.com/apache/spark/pull/16947/files?w=1 to review it.

// Unblock `awaitInitialization`
initializationLatch.countDown()

triggerExecutor.execute(() => {
startTrigger()

val continueToRun =
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch()
}
}

// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}

// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
true
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
false
}
true
} else {
false
}

// Update committed offsets.
committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
isTerminated
})
updateStatusMessage("Stopped")
// Update committed offsets.
committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
continueToRun
})
updateStatusMessage("Stopped")
} else {
// `stop()` is already called. Let `finally` finish the cleanup.
}
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
Expand All @@ -319,7 +321,7 @@ class StreamExecution(
initializationLatch.countDown()

try {
state = TERMINATED
state.set(TERMINATED)
currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)

// Update metrics and status
Expand Down Expand Up @@ -563,7 +565,7 @@ class StreamExecution(
override def stop(): Unit = {
// Set the state to TERMINATED so that the batching thread knows that it was interrupted
// intentionally
state = TERMINATED
state.set(TERMINATED)
if (microBatchThread.isAlive) {
microBatchThread.interrupt()
microBatchThread.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
})
}

testWithUninterruptibleThread("compact") {
test("compact") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
Expand All @@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
})
}

testWithUninterruptibleThread("delete expired file") {
test("delete expired file") {
// Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("compact") {
test("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
Expand All @@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("delete expired file") {
test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: basic") {
test("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
Expand All @@ -82,8 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread(
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
spark.conf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
Expand All @@ -103,7 +102,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: purge") {
test("HDFSMetadataLog: purge") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand All @@ -128,7 +127,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
test("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
}

testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
test("OffsetSeqLog - serialization - deserialization") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(map.isNewFile("b", 10))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
test("do not recheck that files exist during getBatch") {
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
Expand Down