diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index a3c58ae025477..7dc2d38144296 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -67,6 +67,18 @@ public abstract class ColumnVector implements AutoCloseable { @Override public abstract void close(); + /** + * Cleans up memory for this column vector if it's not writable. The column vector is not usable + * after this. + * + * If this is a writable column vector, it is a no-op. + */ + public void closeIfNotWritable() { + // By default, we just call close() for all column vectors. If a column vector is writable, it + // should override this method and do nothing. + close(); + } + /** * Returns true if this column vector contains any null values. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 9e859e77644ac..52e4115af336a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -45,6 +45,16 @@ public void close() { } } + /** + * Called to close all the columns if they are not writable. This is used to clean up memory + * allocated during columnar processing. + */ + public void closeIfNotWritable() { + for (ColumnVector c: columns) { + c.closeIfNotWritable(); + } + } + /** * Returns an iterator over the rows in this batch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index a8e4aad60c222..0fde85fd454c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -87,6 +87,11 @@ public void close() { dictionary = null; } + @Override + public void closeIfNotWritable() { + // no-op + } + public void reserveAdditional(int additionalCapacity) { reserve(elementsAppended + additionalCapacity); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 3fec13a7f9ba9..ea559efc45f13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w | $shouldStop | } | $idx = $numRows; + | $batch.closeIfNotWritable(); | $batch = null; | $nextBatchFuncName(); |} + |// clean up resources + |if ($batch != null) { + | $batch.close(); + |} """.stripMargin }