diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 35682756ed6c..e1d36858d4ee 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -515,7 +515,7 @@ public void loadBytes(ColumnVector.Array array) { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { - int oldCapacity = (this.data == 0L) ? 0 : capacity; + int oldCapacity = (nulls == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 998067a47033..f7b06c97f9db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456) assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67) } + + test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { + val arrayType = ArrayType(IntegerType, true) + testVector = new OffHeapColumnVector(8, arrayType) + + val data = testVector.arrayData() + (0 until 8).foreach(i => data.putInt(i, i)) + (0 until 8).foreach(i => testVector.putArray(i, i, 1)) + + // Increase vector's capacity and reallocate the data to new bigger buffers. + testVector.reserve(16) + + // Check that none of the values got lost/overwritten. + val array = new ColumnVector.Array(testVector) + (0 until 8).foreach { i => + assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i)) + } + } + + test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") { + val structType = new StructType().add("int", IntegerType).add("double", DoubleType) + testVector = new OffHeapColumnVector(8, structType) + (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i)) + testVector.reserve(16) + (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0))) + } }