Skip to content

Commit 435ccff

Browse files
committed
Remove the Arrow allocation cleanup routine in TaskCompletionListen to avoid race condition
1 parent 439c695 commit 435ccff

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,13 @@ class ArrowPythonRunner(
7070
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
7171
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
7272
s"stdout writer for $pythonExec", 0, Long.MaxValue)
73-
7473
val root = VectorSchemaRoot.create(arrowSchema, allocator)
75-
val arrowWriter = ArrowWriter.create(root)
76-
77-
context.addTaskCompletionListener { _ =>
78-
root.close()
79-
allocator.close()
80-
}
81-
82-
val writer = new ArrowStreamWriter(root, null, dataOut)
83-
writer.start()
8474

8575
Utils.tryWithSafeFinally {
76+
val arrowWriter = ArrowWriter.create(root)
77+
val writer = new ArrowStreamWriter(root, null, dataOut)
78+
writer.start()
79+
8680
while (inputIterator.hasNext) {
8781
val nextBatch = inputIterator.next()
8882

@@ -94,8 +88,18 @@ class ArrowPythonRunner(
9488
writer.writeBatch()
9589
arrowWriter.reset()
9690
}
97-
} {
9891
writer.end()
92+
} {
93+
// If we close root and allocator in TaskCompletionListener, there could be a race
94+
// condition where the writer thread keeps writing to the VectorSchemaRoot while
95+
// it's being closed by the TaskCompletion listener.
96+
// Closing root and allocator here is cleaner because root and allocator is owned
97+
// by the writer thread and is only visible to the writer thread.
98+
//
99+
// If the writer thread is interrupted by TaskCompletionListener, it should either
100+
// (1) in the try block, in which case it will get an InterruptedException when
101+
// performing io, and goes into the finally block or (2) in the finally block,
102+
// in which case it will ignore the interruption and close the resources.
99103
root.close()
100104
allocator.close()
101105
}

0 commit comments

Comments
 (0)