Skip to content

Commit

Permalink
address Lucifer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Jan 9, 2025
1 parent a962e83 commit 721e294
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,7 @@ private[spark] class TaskContextImpl(

TaskContext.synchronized {
interruptIfRequired()

if (_interruptible) {
_interruptible = false
}
_interruptible = false
}
try {
val resource = resourceBuilder
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,9 @@ private[spark] abstract class Task[T](
taskThread.interrupt()
}
} else {
val threadToInterrupt = if (interruptThread && taskThread != null) {
Some(taskThread)
} else {
None
}
logInfo(log"Task ${MDC(LogKeys.TASK_ID, context.taskAttemptId())} " +
log"is currently not interruptible. ")
val threadToInterrupt = if (interruptThread) Option(taskThread) else None
context.pendingInterrupt(threadToInterrupt, reason)
}
}
Expand Down
22 changes: 17 additions & 5 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,11 +739,23 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
open()

private def dumpFile(typeName: String): Unit = {
val file = new File(dir, s"$typeName.$fileHint")
val out = new FileOutputStream(file)
val objOut = new ObjectOutputStream(out)
objOut.writeBoolean(true)
objOut.close()
var fileOut: FileOutputStream = null
var objOut: ObjectOutputStream = null
try {
val file = new File(dir, s"$typeName.$fileHint")
fileOut = new FileOutputStream(file)
objOut = new ObjectOutputStream(fileOut)
objOut.writeBoolean(true)
objOut.flush()
} finally {
if (fileOut != null) {
fileOut.close()
}
if (objOut != null) {
objOut.close()
}
}

}

private def open(): Unit = {
Expand Down

0 comments on commit 721e294

Please sign in to comment.