Skip to content

Commit bb26bdb

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-23399][SQL] Register a task completion listener first for OrcColumnarBatchReader
This PR aims to resolve an open file leakage issue reported at [SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the listener registration position. Currently, the sequence is like the following. 1. Create `batchReader` 2. `batchReader.initialize` opens a ORC file. 3. `batchReader.initBatch` may take a long time to alloc memory in some environment and cause errors. 4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))` This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3. Manual. The following test case makes OOM intentionally to cause leaked filesystem connection in the current code base. With this patch, leakage doesn't occurs. ```scala // This should be tested manually because it raises OOM intentionally // in order to cause `Leaked filesystem connection`. test("SPARK-23399 Register a task completion listener first for OrcColumnarBatchReader") { withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString) val df = spark.read.orc( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } ``` Author: Dongjoon Hyun <dongjoon@apache.org> Closes #20590 from dongjoon-hyun/SPARK-23399. (cherry picked from commit 357babd) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4f6a457 commit bb26bdb

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ class OrcFileFormat
187187
if (enableVectorizedReader) {
188188
val batchReader = new OrcColumnarBatchReader(
189189
enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
190+
// SPARK-23399 Register a task completion listener first to call `close()` in all cases.
191+
// There is a possibility that `initialize` and `initBatch` hit some errors (like OOM)
192+
// after opening a file.
193+
val iter = new RecordReaderIterator(batchReader)
194+
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
195+
190196
batchReader.initialize(fileSplit, taskAttemptContext)
191197
batchReader.initBatch(
192198
reader.getSchema,
@@ -195,8 +201,6 @@ class OrcFileFormat
195201
partitionSchema,
196202
file.partitionValues)
197203

198-
val iter = new RecordReaderIterator(batchReader)
199-
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
200204
iter.asInstanceOf[Iterator[InternalRow]]
201205
} else {
202206
val orcRecordReader = new OrcInputFormat[OrcStruct]

0 commit comments

Comments
 (0)