Skip to content

Commit

Permalink
[SPARK-50463][SQL] Fix ConstantColumnVector with Columnar to Row co…
Browse files Browse the repository at this point in the history
…nversion

### What changes were proposed in this pull request?

apache@800faf0 frees column vector resources between batches in columnar to row conversion. However, like `WritableColumnVector`, `ConstantColumnVector` should not free resources between batches because the same data is used across batches

### Why are the changes needed?

Without this change, ConstantColumnVectors with string values, for example, will fail if used with column->row conversion. For instance, reading a parquet table partitioned by a string column with multiple batches.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

added UT that failed before and now passes

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#49021 from richardc-db/col_to_row_const_col_vec_fix.

Authored-by: Richard Chen <r.chen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
richardc-db authored and dongjoon-hyun committed Dec 3, 2024
1 parent 6cd1334 commit 7b974ca
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract void close();

/**
* Cleans up memory for this column vector if it's not writable. The column vector is not usable
* after this.
* Cleans up memory for this column vector if it's resources are freeable between batches.
* The column vector is not usable after this.
*
* If this is a writable column vector, it is a no-op.
* If this is a writable column vector or constant column vector, it is a no-op.
*/
public void closeIfNotWritable() {
// By default, we just call close() for all column vectors. If a column vector is writable, it
// should override this method and do nothing.
public void closeIfFreeable() {
// By default, we just call close() for all column vectors. If a column vector is writable or
// constant, it should override this method and do nothing.
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void close() {
}

/**
* Called to close all the columns if they are not writable. This is used to clean up memory
* allocated during columnar processing.
* Called to close all the columns if their resources are freeable between batches.
* This is used to clean up memory allocated during columnar processing.
*/
public void closeIfNotWritable() {
public void closeIfFreeable() {
for (ColumnVector c: columns) {
c.closeIfNotWritable();
c.closeIfFreeable();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public ConstantColumnVector(int numRows, DataType type) {
}
}

public void closeIfFreeable() {
// no-op: `ConstantColumnVector`s reuse the data backing its value across multiple batches and
// are freed at the end of execution in `close`.
}

@Override
public void close() {
stringData = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void close() {
}

@Override
public void closeIfNotWritable() {
public void closeIfFreeable() {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
| $shouldStop
| }
| $idx = $numRows;
| $batch.closeIfNotWritable();
| $batch.closeIfFreeable();
| $batch = null;
| $nextBatchFuncName();
|}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("SPARK-50463: Partition values can be read over multiple batches") {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") {
val path = dir.getAbsolutePath
spark.range(0, 5)
.selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id")
.write
.format("parquet")
.mode("overwrite")
.partitionBy("partCol").save(path)
val df = spark.read.format("parquet").load(path).selectExpr("partCol")
val expected = spark.range(0, 5)
.selectExpr("concat(cast(id % 2 as string), 'a') as partCol")
.collect()

checkAnswer(df, expected)
}
}
}

test("SPARK-10301 requested schema clipping - same schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Expand Down

0 comments on commit 7b974ca

Please sign in to comment.