From c21f57c19ed35f30d586e195e779c7dd498be88c Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 25 Jul 2022 10:38:44 +0800 Subject: [PATCH] fix memory leakage in native columntorow Signed-off-by: Yuan Zhou --- .../intel/oap/execution/ArrowColumnarToRowExec.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 6a34b882c..0e65abd2b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.types._ +import org.apache.spark.TaskContext import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -146,13 +147,21 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransit var rowId = 0 val row = new UnsafeRow(batch.numCols()) var closed = false + + TaskContext.get().addTaskCompletionListener[Unit](_ => { + if (!closed) { + jniWrapper.nativeClose(info.instanceID) + closed = true + } + }) + override def hasNext: Boolean = { val result = rowId < batch.numRows() if (!result && !closed) { jniWrapper.nativeClose(info.instanceID) closed = true } - return result + result } override def next: UnsafeRow = {