Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private static String createErrorMsg(final BufferAllocator allocator, final int
* @param val An integer value.
* @return The closest power of two of that value.
*/
static int nextPowerOfTwo(int val) {
public static int nextPowerOfTwo(int val) {
int highestBit = Integer.highestOneBit(val);
if (highestBit == val) {
return val;
Expand All @@ -142,6 +142,21 @@ static int nextPowerOfTwo(int val) {
}
}

/**
* Rounds up the provided value to the nearest power of two.
*
* @param val A long value.
* @return The closest power of two of that value.
*/
public static long nextPowerOfTwo(long val) {
long highestBit = Long.highestOneBit(val);
if (highestBit == val) {
return val;
} else {
return highestBit << 1;
}
}

public static StringBuilder indent(StringBuilder sb, int indent) {
final char[] indentation = new char[indent * 2];
Arrays.fill(indentation, ' ');
Expand Down
13 changes: 10 additions & 3 deletions java/vector/src/main/codegen/templates/FixedValueVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,21 @@ private void allocateBytes(final long size) {
* @throws org.apache.arrow.memory.OutOfMemoryException if it can't allocate the new buffer
*/
public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes * 2L;
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
}

logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", name, allocationSizeInBytes, newAllocationSize);
final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
final int halfNewCapacity = newBuf.capacity() / 2;
newBuf.setZero(halfNewCapacity, halfNewCapacity);
newBuf.writerIndex(data.writerIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,20 @@ public void reset() {
}

public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes*2L;
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
}

final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
data.release();
data = newBuf;
allocationSizeInBytes = (int)newAllocationSize;
Expand Down
12 changes: 10 additions & 2 deletions java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.arrow.vector;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.BitHolder;
Expand Down Expand Up @@ -208,15 +209,22 @@ private void allocateBytes(final long size) {
* Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
*/
public void reAlloc() {
final long newAllocationSize = allocationSizeInBytes * 2L;
long baseSize = allocationSizeInBytes;
final int currentBufferCapacity = data.capacity();
if (baseSize < (long)currentBufferCapacity) {
baseSize = (long)currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
}

final int curSize = (int) newAllocationSize;
final ArrowBuf newBuf = allocator.buffer(curSize);
newBuf.setZero(0, newBuf.capacity());
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.setBytes(0, data, 0, currentBufferCapacity);
data.release();
data = newBuf;
allocationSizeInBytes = curSize;
Expand Down
193 changes: 193 additions & 0 deletions java/vector/src/test/java/org/apache/arrow/vector/TestBitVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -234,6 +235,198 @@ public void testSplitAndTransfer2() throws Exception {
}
}

@Test
public void testReallocAfterVectorTransfer1() {
try (final BitVector vector = new BitVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(4096);
int valueCapacity = vector.getValueCapacity();
assertEquals(4096, valueCapacity);

final BitVector.Mutator mutator = vector.getMutator();
final BitVector.Accessor accessor = vector.getAccessor();

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity; i++) {
int val = accessor.get(i);
if ((i & 1) == 1) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* trigger first realloc */
mutator.setSafeToOne(valueCapacity);
assertEquals(valueCapacity * 2, vector.getValueCapacity());

for (int i = valueCapacity; i < valueCapacity*2; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity*2; i++) {
int val = accessor.get(i);
if (((i & 1) == 1) || (i == valueCapacity)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* trigger second realloc */
mutator.setSafeToOne(valueCapacity*2);
assertEquals(valueCapacity * 4, vector.getValueCapacity());

for (int i = valueCapacity*2; i < valueCapacity*4; i++) {
if ((i & 1) == 1) {
mutator.setToOne(i);
}
}

for (int i = 0; i < valueCapacity*4; i++) {
int val = accessor.get(i);
if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

/* now transfer the vector */
TransferPair transferPair = vector.getTransferPair(allocator);
transferPair.transfer();
final BitVector toVector = (BitVector)transferPair.getTo();
final BitVector.Accessor toAccessor = toVector.getAccessor();
final BitVector.Mutator toMutator = toVector.getMutator();

assertEquals(valueCapacity * 4, toVector.getValueCapacity());

/* realloc the toVector */
toMutator.setSafeToOne(valueCapacity * 4);

for (int i = 0; i < toVector.getValueCapacity(); i++) {
int val = toAccessor.get(i);
if (i <= valueCapacity * 4) {
if (((i & 1) == 1) || (i == valueCapacity) ||
(i == valueCapacity*2) || (i == valueCapacity*4)) {
assertEquals("unexpected cleared bit at index: " + i, 1, val);
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}
else {
assertEquals("unexpected set bit at index: " + i, 0, val);
}
}

toVector.close();
}
}

@Test
public void testReallocAfterVectorTransfer2() {
try (final NullableBitVector vector = new NullableBitVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(4096);
int valueCapacity = vector.getValueCapacity();
assertEquals(4096, valueCapacity);

final NullableBitVector.Mutator mutator = vector.getMutator();
final NullableBitVector.Accessor accessor = vector.getAccessor();

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity; i++) {
if ((i & 1) == 1) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* trigger first realloc */
mutator.setSafe(valueCapacity, 1, 1);
assertEquals(valueCapacity * 2, vector.getValueCapacity());

for (int i = valueCapacity; i < valueCapacity*2; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity*2; i++) {
if (((i & 1) == 1) || (i == valueCapacity)) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* trigger second realloc */
mutator.setSafe(valueCapacity*2, 1, 1);
assertEquals(valueCapacity * 4, vector.getValueCapacity());

for (int i = valueCapacity*2; i < valueCapacity*4; i++) {
if ((i & 1) == 1) {
mutator.set(i, 1);
}
}

for (int i = 0; i < valueCapacity*4; i++) {
if (((i & 1) == 1) || (i == valueCapacity) || (i == valueCapacity*2)) {
assertFalse("unexpected cleared bit at index: " + i, accessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, accessor.isNull(i));
}
}

/* now transfer the vector */
TransferPair transferPair = vector.getTransferPair(allocator);
transferPair.transfer();
final NullableBitVector toVector = (NullableBitVector)transferPair.getTo();
final NullableBitVector.Accessor toAccessor = toVector.getAccessor();
final NullableBitVector.Mutator toMutator = toVector.getMutator();

assertEquals(valueCapacity * 4, toVector.getValueCapacity());

/* realloc the toVector */
toMutator.setSafe(valueCapacity * 4, 1, 1);

for (int i = 0; i < toVector.getValueCapacity(); i++) {
if (i <= valueCapacity * 4) {
if (((i & 1) == 1) || (i == valueCapacity) ||
(i == valueCapacity*2) || (i == valueCapacity*4)) {
assertFalse("unexpected cleared bit at index: " + i, toAccessor.isNull(i));
}
else {
assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i));
}
}
else {
assertTrue("unexpected set bit at index: " + i, toAccessor.isNull(i));
}
}

toVector.close();
}
}

@Test
public void testBitVector() {
// Create a new value vector for 1024 integers
Expand Down
Loading