Skip to content

Commit e6e60e0

Browse files
committed
use long array
1 parent 1dae660 commit e6e60e0

File tree

4 files changed

+23
-22
lines changed

4 files changed

+23
-22
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -520,11 +520,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
520520

521521
/**
522522
* After writing array elements to the child column vector, call this method to set the offset and
523-
* length of the written array.
523+
* size of the written array.
524524
*/
525-
public void putArrayOffsetAndLength(int rowId, int offset, int length) {
526-
putInt(2 * rowId, offset);
527-
putInt(2 * rowId + 1, length);
525+
public void putArrayOffsetAndSize(int rowId, int offset, int size) {
526+
long offsetAndSize = (offset << 32) | size;
527+
putLong(rowId, offsetAndSize);
528528
}
529529

530530
/**
@@ -548,8 +548,9 @@ public ColumnarBatch.Row getStruct(int rowId, int size) {
548548
* Returns the array at rowid.
549549
*/
550550
public final Array getArray(int rowId) {
551-
resultArray.offset = getInt(2 * rowId);
552-
resultArray.length = getInt(2 * rowId + 1);
551+
long offsetAndSize = getLong(rowId);
552+
resultArray.offset = (int) (offsetAndSize >> 32);
553+
resultArray.length = (int) offsetAndSize;
553554
return resultArray;
554555
}
555556

@@ -563,7 +564,7 @@ public final Array getArray(int rowId) {
563564
*/
564565
public int putByteArray(int rowId, byte[] value, int offset, int length) {
565566
int result = arrayData().appendBytes(length, value, offset);
566-
putArrayOffsetAndLength(rowId, result, length);
567+
putArrayOffsetAndSize(rowId, result, length);
567568
return result;
568569
}
569570

@@ -829,13 +830,13 @@ public final int appendDoubles(int length, double[] src, int offset) {
829830
public final int appendByteArray(byte[] value, int offset, int length) {
830831
int copiedOffset = arrayData().appendBytes(length, value, offset);
831832
reserve(elementsAppended + 1);
832-
putArrayOffsetAndLength(elementsAppended, copiedOffset, length);
833+
putArrayOffsetAndSize(elementsAppended, copiedOffset, length);
833834
return elementsAppended++;
834835
}
835836

836837
public final int appendArray(int length) {
837838
reserve(elementsAppended + 1);
838-
putArrayOffsetAndLength(elementsAppended, arrayData().elementsAppended, length);
839+
putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length);
839840
return elementsAppended++;
840841
}
841842

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ public void loadBytes(ColumnVector.Array array) {
401401
protected void reserveInternal(int newCapacity) {
402402
int oldCapacity = (this.data == 0L) ? 0 : capacity;
403403
if (this.resultArray != null) {
404-
// need 2 ints as offset and length for each array.
404+
// need a long as offset and length for each array.
405405
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
406406
} else if (type instanceof ByteType || type instanceof BooleanType) {
407407
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public final class OnHeapColumnVector extends ColumnVector {
4242
// Array for each type. Only 1 is populated for any type.
4343
private byte[] byteData;
4444
private short[] shortData;
45+
private int[] intData;
4546
// This is not only used to store data for int column vector, but also can store offsets and
4647
// lengths for array column vector.
47-
private int[] intData;
4848
private long[] longData;
4949
private float[] floatData;
5050
private double[] doubleData;
@@ -374,11 +374,11 @@ public void loadBytes(ColumnVector.Array array) {
374374
@Override
375375
protected void reserveInternal(int newCapacity) {
376376
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
377-
// need 2 ints as offset and length for each array.
378-
if (intData == null || intData.length < newCapacity * 2) {
379-
int[] newData = new int[newCapacity * 2];
380-
if (intData != null) System.arraycopy(intData, 0, newData, 0, intData.length);
381-
intData = newData;
377+
// need 1 long as offset and length for each array.
378+
if (longData == null || longData.length < newCapacity) {
379+
long[] newData = new long[newCapacity];
380+
if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
381+
longData = newData;
382382
}
383383
} else if (type instanceof BooleanType) {
384384
if (byteData == null || byteData.length < newCapacity) {

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
631631
assert(column.arrayData().elementsAppended == 17)
632632

633633
// Put the same "ll" at offset. This should not allocate more memory in the column.
634-
column.putArrayOffsetAndLength(idx, offset, 2)
634+
column.putArrayOffsetAndSize(idx, offset, 2)
635635
reference += "ll"
636636
idx += 1
637637
assert(column.arrayData().elementsAppended == 17)
@@ -667,10 +667,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
667667
}
668668

669669
// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
670-
column.putArrayOffsetAndLength(0, 0, 1)
671-
column.putArrayOffsetAndLength(1, 1, 2)
672-
column.putArrayOffsetAndLength(2, 3, 0)
673-
column.putArrayOffsetAndLength(3, 3, 3)
670+
column.putArrayOffsetAndSize(0, 0, 1)
671+
column.putArrayOffsetAndSize(1, 1, 2)
672+
column.putArrayOffsetAndSize(2, 3, 0)
673+
column.putArrayOffsetAndSize(3, 3, 3)
674674

675675
val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
676676
val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
@@ -703,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
703703
data.reserve(array.length)
704704
assert(data.capacity == array.length * 2)
705705
data.putInts(0, array.length, array, 0)
706-
column.putArrayOffsetAndLength(0, 0, array.length)
706+
column.putArrayOffsetAndSize(0, 0, array.length)
707707
assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
708708
=== array)
709709
}}

0 commit comments

Comments
 (0)