Skip to content

Commit 4dedd39

Browse files
advancedxydongjoon-hyun
authored andcommitted
[SPARK-26713][CORE][2.4] Interrupt pipe IO threads in PipedRDD when task is finished
### What changes were proposed in this pull request? Manually release stdin writer and stderr reader thread when task is finished. This is the backport of #23638 including #25049. ### Why are the changes needed? This is a bug fix. PipedRDD's IO threads may hang even the corresponding task is already finished. Without this fix, it would leak resource(memory specially). ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add new test Closes #25825 from advancedxy/SPARK-26713_for_2.4. Authored-by: Xianjin YE <advancedxy@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 68e29ba commit 4dedd39

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private[spark] class PipedRDD[T: ClassTag](
113113
val childThreadException = new AtomicReference[Throwable](null)
114114

115115
// Start a thread to print the process's stderr to ours
116-
new Thread(s"stderr reader for $command") {
116+
val stderrReaderThread = new Thread(s"${PipedRDD.STDERR_READER_THREAD_PREFIX} $command") {
117117
override def run(): Unit = {
118118
val err = proc.getErrorStream
119119
try {
@@ -128,10 +128,11 @@ private[spark] class PipedRDD[T: ClassTag](
128128
err.close()
129129
}
130130
}
131-
}.start()
131+
}
132+
stderrReaderThread.start()
132133

133134
// Start a thread to feed the process input from our parent's iterator
134-
new Thread(s"stdin writer for $command") {
135+
val stdinWriterThread = new Thread(s"${PipedRDD.STDIN_WRITER_THREAD_PREFIX} $command") {
135136
override def run(): Unit = {
136137
TaskContext.setTaskContext(context)
137138
val out = new PrintWriter(new BufferedWriter(
@@ -156,7 +157,28 @@ private[spark] class PipedRDD[T: ClassTag](
156157
out.close()
157158
}
158159
}
159-
}.start()
160+
}
161+
stdinWriterThread.start()
162+
163+
// interrupts stdin writer and stderr reader threads when the corresponding task is finished.
164+
// Otherwise, these threads could outlive the task's lifetime. For example:
165+
// val pipeRDD = sc.range(1, 100).pipe(Seq("cat"))
166+
// val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
167+
// the iterator generated by PipedRDD is never involved. If the parent RDD's iterator takes a
168+
// long time to generate(ShuffledRDD's shuffle operation for example), the stdin writer thread
169+
// may consume significant memory and CPU time even if task is already finished.
170+
context.addTaskCompletionListener[Unit] { _ =>
171+
if (proc.isAlive) {
172+
proc.destroy()
173+
}
174+
175+
if (stdinWriterThread.isAlive) {
176+
stdinWriterThread.interrupt()
177+
}
178+
if (stderrReaderThread.isAlive) {
179+
stderrReaderThread.interrupt()
180+
}
181+
}
160182

161183
// Return an iterator that read lines from the process's stdout
162184
val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines
@@ -219,4 +241,7 @@ private object PipedRDD {
219241
}
220242
buf
221243
}
244+
245+
val STDIN_WRITER_THREAD_PREFIX = "stdin writer for"
246+
val STDERR_READER_THREAD_PREFIX = "stderr reader for"
222247
}

core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
1919

2020
import java.io.File
2121

22+
import scala.collection.JavaConverters._
2223
import scala.collection.Map
2324
import scala.io.Codec
2425

@@ -83,6 +84,29 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
8384
}
8485
}
8586

87+
test("stdin writer thread should be exited when task is finished") {
88+
assume(TestUtils.testCommandAvailable("cat"))
89+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x =>
90+
val obj = new Object()
91+
obj.synchronized {
92+
obj.wait() // make the thread waits here.
93+
}
94+
x
95+
}
96+
97+
val piped = nums.pipe(Seq("cat"))
98+
99+
val result = piped.mapPartitions(_ => Array.emptyIntArray.iterator)
100+
101+
assert(result.collect().length === 0)
102+
103+
// collect stderr writer threads
104+
val stderrWriterThread = Thread.getAllStackTraces.keySet().asScala
105+
.find { _.getName.startsWith(PipedRDD.STDIN_WRITER_THREAD_PREFIX) }
106+
107+
assert(stderrWriterThread.isEmpty)
108+
}
109+
86110
test("advanced pipe") {
87111
assume(TestUtils.testCommandAvailable("cat"))
88112
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

0 commit comments

Comments
 (0)