Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file. This is because there is a potential dead-lock in
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
// the file permission if using the local file system, and can get deadlocked if the
// stream execution thread is stopped by interrupt. Hence, we make sure that
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
// interrupts here. Also see SPARK-14131.
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
// while writing the batch file.
//
// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084).
// If the user tries to stop a query, and the thread running "Shell.runCommand" is
// interrupted, then InterruptException will be dropped and the query will be still
// running. (Note: `writeBatch` creates a file using HDFS APIs and will call
// "Shell.runCommand" to set the file permission if using the local file system)
//
// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which
// allows us to disable interrupts here, in order to propagate the interrupt state
// correctly. Also see SPARK-19599.
ut.runUninterruptibly { writeBatch(batchId, metadata) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
Expand All @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata, serialize)
writeBatch(batchId, metadata)
}
true
}
}

def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
def writeTempBatch(metadata: T): Option[Path] = {
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
serialize(metadata, output)
return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
Expand All @@ -164,7 +166,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// big problem because it requires the attacker must have the permission to write the
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
}
}
None
Expand All @@ -176,8 +177,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
val tempPath = writeTempBatch(metadata, writer).getOrElse(
private def writeBatch(batchId: Long, metadata: T): Unit = {
val tempPath = writeTempBatch(metadata).getOrElse(
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
try {
// Try to commit the batch
Expand All @@ -195,12 +196,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it because we always create a temp file so it won't happen.

case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
} finally {
fileManager.delete(tempPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ class StreamExecution(

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
* [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when
* using [[HDFSMetadataLog]]. See SPARK-19599 for more details.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
Expand Down