diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 2749b6fe030..5411baf7bdf 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -133,7 +133,7 @@ private static String createErrorMsg(final BufferAllocator allocator, final int * @param val An integer value. * @return The closest power of two of that value. */ - static int nextPowerOfTwo(int val) { + public static int nextPowerOfTwo(int val) { int highestBit = Integer.highestOneBit(val); if (highestBit == val) { return val; @@ -142,6 +142,21 @@ static int nextPowerOfTwo(int val) { } } + /** + * Rounds up the provided value to the nearest power of two. + * + * @param val A long value. + * @return The closest power of two of that value. + */ + public static long nextPowerOfTwo(long val) { + long highestBit = Long.highestOneBit(val); + if (highestBit == val) { + return val; + } else { + return highestBit << 1; + } + } + public static StringBuilder indent(StringBuilder sb, int indent) { final char[] indentation = new char[indent * 2]; Arrays.fill(indentation, ' '); diff --git a/java/vector/src/main/codegen/templates/FixedValueVectors.java b/java/vector/src/main/codegen/templates/FixedValueVectors.java index ffd8cad02e2..e07416ba984 100644 --- a/java/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/java/vector/src/main/codegen/templates/FixedValueVectors.java @@ -208,14 +208,21 @@ private void allocateBytes(final long size) { * @throws org.apache.arrow.memory.OutOfMemoryException if it can't allocate the new buffer */ public void reAlloc() { - final long newAllocationSize = allocationSizeInBytes * 2L; - if (newAllocationSize > MAX_ALLOCATION_SIZE) { + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", name, allocationSizeInBytes, newAllocationSize); final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); final int halfNewCapacity = newBuf.capacity() / 2; newBuf.setZero(halfNewCapacity, halfNewCapacity); newBuf.writerIndex(data.writerIndex()); diff --git a/java/vector/src/main/codegen/templates/VariableLengthVectors.java b/java/vector/src/main/codegen/templates/VariableLengthVectors.java index c276f11d796..3934e74f11b 100644 --- a/java/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/java/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -370,13 +370,20 @@ public void reset() { } public void reAlloc() { - final long newAllocationSize = allocationSizeInBytes*2L; + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); data.release(); data = newBuf; allocationSizeInBytes = (int)newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index 8a60273e179..e1911169fb6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -19,6 +19,7 @@ package org.apache.arrow.vector; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.BitHolder; @@ -208,7 +209,14 @@ private void allocateBytes(final long size) { * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one */ public void reAlloc() { - final long newAllocationSize = allocationSizeInBytes * 2L; + long baseSize = allocationSizeInBytes; + final int currentBufferCapacity = data.capacity(); + if (baseSize < (long)currentBufferCapacity) { + baseSize = (long)currentBufferCapacity; + } + long newAllocationSize = baseSize * 2L; + newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size"); } @@ -216,7 +224,7 @@ public void reAlloc() { final int curSize = (int) newAllocationSize; final ArrowBuf newBuf = allocator.buffer(curSize); newBuf.setZero(0, newBuf.capacity()); - newBuf.setBytes(0, data, 0, data.capacity()); + newBuf.setBytes(0, data, 0, currentBufferCapacity); data.release(); data = newBuf; allocationSizeInBytes = curSize; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java index 82e61be45c6..17fcf05fcd9 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -234,6 +235,198 @@ public void testSplitAndTransfer2() throws Exception { } } + @Test + public void testReallocAfterVectorTransfer1() { + try (final BitVector vector = new BitVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(4096); + int valueCapacity = vector.getValueCapacity(); + assertEquals(4096, valueCapacity); + + final BitVector.Mutator mutator = vector.getMutator(); + final BitVector.Accessor accessor = vector.getAccessor(); + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity; i++) { + int val = accessor.get(i); + if ((i & 1) == 1) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* trigger first realloc */ + mutator.setSafeToOne(valueCapacity); + assertEquals(valueCapacity * 2, vector.getValueCapacity()); + + for (int i = valueCapacity; i < valueCapacity*2; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity*2; i++) { + int val = accessor.get(i); + if (((i & 1) == 1) || (i == valueCapacity)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* trigger second realloc */ + mutator.setSafeToOne(valueCapacity*2); + assertEquals(valueCapacity * 4, vector.getValueCapacity()); + + for (int i = valueCapacity*2; i < valueCapacity*4; i++) { + if ((i & 1) == 1) { + mutator.setToOne(i); + } + } + + for (int i = 0; i < valueCapacity*4; i++) { + int val = accessor.get(i); + if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + /* now transfer the vector */ + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + final BitVector toVector = (BitVector)transferPair.getTo(); + final BitVector.Accessor toAccessor = toVector.getAccessor(); + final BitVector.Mutator toMutator = toVector.getMutator(); + + assertEquals(valueCapacity * 4, toVector.getValueCapacity()); + + /* realloc the toVector */ + toMutator.setSafeToOne(valueCapacity * 4); + + for (int i = 0; i < toVector.getValueCapacity(); i++) { + int val = toAccessor.get(i); + if (i <= valueCapacity * 4) { + if (((i & 1) == 1) || (i == valueCapacity) || + (i == valueCapacity*2) || (i == valueCapacity*4)) { + assertEquals("unexpected cleared bit at index: " + i, 1, val); + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + else { + assertEquals("unexpected set bit at index: " + i, 0, val); + } + } + + toVector.close(); + } + } + + @Test + public void testReallocAfterVectorTransfer2() { + try (final NullableBitVector vector = new NullableBitVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(4096); + int valueCapacity = vector.getValueCapacity(); + assertEquals(4096, valueCapacity); + + final NullableBitVector.Mutator mutator = vector.getMutator(); + final NullableBitVector.Accessor accessor = vector.getAccessor(); + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* trigger first realloc */ + mutator.setSafe(valueCapacity, 1, 1); + assertEquals(valueCapacity * 2, vector.getValueCapacity()); + + for (int i = valueCapacity; i < valueCapacity*2; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity*2; i++) { + if (((i & 1) == 1) || (i == valueCapacity)) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* trigger second realloc */ + mutator.setSafe(valueCapacity*2, 1, 1); + assertEquals(valueCapacity * 4, vector.getValueCapacity()); + + for (int i = valueCapacity*2; i < valueCapacity*4; i++) { + if ((i & 1) == 1) { + mutator.set(i, 1); + } + } + + for (int i = 0; i < valueCapacity*4; i++) { + if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) { + assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, accessor.isNull(i)); + } + } + + /* now transfer the vector */ + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + final NullableBitVector toVector = (NullableBitVector)transferPair.getTo(); + final NullableBitVector.Accessor toAccessor = toVector.getAccessor(); + final NullableBitVector.Mutator toMutator = toVector.getMutator(); + + assertEquals(valueCapacity * 4, toVector.getValueCapacity()); + + /* realloc the toVector */ + toMutator.setSafe(valueCapacity * 4, 1, 1); + + for (int i = 0; i < toVector.getValueCapacity(); i++) { + if (i <= valueCapacity * 4) { + if (((i & 1) == 1) || (i == valueCapacity) || + (i == valueCapacity*2) || (i == valueCapacity*4)) { + assertFalse("unexpected cleared bit at index: " + i, toAccessor.isNull(i)); + } + else { + assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i)); + } + } + else { + assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i)); + } + } + + toVector.close(); + } + } + @Test public void testBitVector() { // Create a new value vector for 1024 integers diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index 57119bfdae2..a239861d9b3 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -44,6 +44,7 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.TransferPair; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -889,9 +890,280 @@ public void testNullableVarType2() { * TODO: * * The realloc() related tests below should be moved up and we need to - * realloc related tests (edge cases) for more vector types. + * add realloc related tests (edge cases) for more vector types. */ + @Test /* Float8Vector */ + public void testReallocAfterVectorTransfer1() { + try (final Float8Vector vector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final Float8Vector.Mutator mutator = vector.getMutator(); + final Float8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + /* use the default capacity; 4096*8 => 32KB */ + vector.allocateNew(); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + Float8Vector toVector = (Float8Vector)transferPair.getTo(); + + /* now let's realloc the toVector */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + final Float8Vector.Accessor toAccessor = toVector.getAccessor(); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + double value = toAccessor.get(i); + if (i < (initialDefaultCapacity * 4)) { + assertEquals(baseValue + (double)i, value, 0); + } + else { + assertEquals(0, value, 0); + } + } + + toVector.close(); + } + } + + @Test /* NullableFloat8Vector */ + public void testReallocAfterVectorTransfer2() { + try (final NullableFloat8Vector vector = new NullableFloat8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableFloat8Vector.Mutator mutator = vector.getMutator(); + final NullableFloat8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + vector.allocateNew(initialDefaultCapacity); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + NullableFloat8Vector toVector = (NullableFloat8Vector)transferPair.getTo(); + final NullableFloat8Vector.Accessor toAccessor = toVector.getAccessor(); + + /* check toVector contents before realloc */ + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + + /* now let's realloc the toVector and check contents again */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + if (i < (initialDefaultCapacity * 4)) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + else { + assertTrue("unexpected non-null value at index: " + i, toAccessor.isNull(i)); + } + } + + toVector.close(); + } + } + + @Test /* NullableVarCharVector */ + public void testReallocAfterVectorTransfer3() { + try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + + /* 4096 values with 10 byte per record */ + vector.allocateNew(4096 * 10, 4096); + int valueCapacity = vector.getValueCapacity(); + + /* populate the vector */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger first realloc */ + mutator.setSafe(valueCapacity, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger second realloc */ + mutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* we are potentially working with 4x the size of vector buffer + * that we initially started with. Now let's transfer the vector. + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + NullableVarCharVector toVector = (NullableVarCharVector)transferPair.getTo(); + NullableVarCharVector.Mutator toMutator = toVector.getMutator(); + NullableVarCharVector.Accessor toAccessor = toVector.getAccessor(); + + valueCapacity = toVector.getValueCapacity(); + + /* trigger a realloc of this toVector */ + toMutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + toVector.close(); + } + } + @Test public void testReAllocNullableFixedWidthVector() { // Create a new value vector for 1024 integers