Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,6 @@ public <T> T unwrap(Class<T> clazz) {
return clazz.cast(allocationManager);
}

// TODO: remove this in a future release.
if (clazz == UnsafeDirectLittleEndian.class) {
Preconditions.checkState(allocationManager instanceof NettyAllocationManager,
"Underlying memory was not allocated by Netty");
return clazz.cast(((NettyAllocationManager) allocationManager).getMemoryChunk());
}

throw new IllegalArgumentException("Unexpected unwrapping class: " + clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,82 @@

package org.apache.arrow.memory;

import org.apache.arrow.memory.util.LargeMemoryUtil;

import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.buffer.UnsafeDirectLittleEndian;
import io.netty.util.internal.PlatformDependent;

/**
* The default implementation of AllocationManagerBase. The implementation is responsible for managing when memory
* The default implementation of {@link AllocationManager}. The implementation is responsible for managing when memory
* is allocated and returned to the Netty-based PooledByteBufAllocatorL.
*/
public class NettyAllocationManager extends AllocationManager {

public static final Factory FACTORY = new Factory();

/**
* The default cut-off value for switching allocation strategies.
* If the request size is not greater than the cut-off value, we will allocate memory by
* {@link PooledByteBufAllocatorL} APIs,
* otherwise, we will use {@link PlatformDependent} APIs.
*/
public static final int DEFAULT_ALLOCATION_CUTOFF_VALUE = Integer.MAX_VALUE;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none of this is necessary, please remove. you can create a custom NettyAllocationManager for tests and specify a different value there. See https://github.com/apache/arrow/blob/master/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java#L393

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thank you for the good suggestion.

private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL();
static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty;
static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize();

private final int allocatedSize;
private final long allocatedSize;
private final UnsafeDirectLittleEndian memoryChunk;
private final long allocatedAddress;

/**
* The cut-off value for switching allocation strategies.
*/
private final int allocationCutOffValue;

NettyAllocationManager(BaseAllocator accountingAllocator, int requestedSize) {
NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) {
super(accountingAllocator);
this.memoryChunk = INNER_ALLOCATOR.allocate(requestedSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should remove this which effectively replaces all allocations done in Arrow Java, which is a big change. INNER_ALLOCATOR also uses a pool which has some benefits. Instead, can you just change requestedSize to be a long, then check if it is over the max Int size and only then use PlatformDependent.allocateMemory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised. Thank you for the good suggestion.

this.allocatedSize = memoryChunk.capacity();
this.allocationCutOffValue = allocationCutOffValue;

if (requestedSize > allocationCutOffValue) {
this.memoryChunk = null;
this.allocatedAddress = PlatformDependent.allocateMemory(requestedSize);
this.allocatedSize = requestedSize;
} else {
this.memoryChunk = INNER_ALLOCATOR.allocate(requestedSize);
this.allocatedAddress = memoryChunk.memoryAddress();
this.allocatedSize = memoryChunk.capacity();
}
}

NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize) {
this(accountingAllocator, requestedSize, DEFAULT_ALLOCATION_CUTOFF_VALUE);
}

/**
* Get the underlying memory chunk managed by this AllocationManager.
* @return buffer
* @return the underlying memory chunk if the request size is not greater than the
* {@link NettyAllocationManager#allocationCutOffValue}, or null otherwise.
*
* @deprecated this method will be removed in a future release.
*/
@Deprecated
UnsafeDirectLittleEndian getMemoryChunk() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of removing this, could you deprecate and note it will be removed in future releases? also note that it will return null if allocated size is more than max int.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Reverted this accordingly.

return memoryChunk;
}

@Override
protected long memoryAddress() {
return memoryChunk.memoryAddress();
return allocatedAddress;
}

@Override
protected void release0() {
memoryChunk.release();
if (memoryChunk == null) {
PlatformDependent.freeMemory(allocatedAddress);
} else {
memoryChunk.release();
}
}

/**
Expand All @@ -79,7 +113,7 @@ private Factory() {}

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
return new NettyAllocationManager(accountingAllocator, LargeMemoryUtil.checkedCastToInt(size));
return new NettyAllocationManager(accountingAllocator, size);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.memory;

import static org.junit.Assert.assertEquals;

import io.netty.buffer.ArrowBuf;

/**
* Integration test for large (more than 2GB) {@link io.netty.buffer.ArrowBuf}.
* To run this test, please make sure there is at least 4GB memory in the system.
* <p>
* Please note that this is not a standard test case, so please run it by manually invoking the
* main method.
* </p>
*/
public class TestLargeArrowBuf {

private static void testLargeArrowBuf(long bufSize) {
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ArrowBuf largeBuf = allocator.buffer(bufSize)) {
assertEquals(bufSize, largeBuf.capacity());
System.out.println("Successfully allocated a buffer with capacity " + largeBuf.capacity());

for (long i = 0; i < bufSize / 8; i++) {
largeBuf.setLong(i * 8, i);

if ((i + 1) % 10000 == 0) {
System.out.println("Successfully written " + (i + 1) + " long words");
}
}
System.out.println("Successfully written " + (bufSize / 8) + " long words");

for (long i = 0; i < bufSize / 8; i++) {
long val = largeBuf.getLong(i * 8);
assertEquals(i, val);

if ((i + 1) % 10000 == 0) {
System.out.println("Successfully read " + (i + 1) + " long words");
}
}
System.out.println("Successfully read " + (bufSize / 8) + " long words");
}
System.out.println("Successfully released the large buffer.");
}

public static void main(String[] args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be converted to a standard unit test now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.
The problem is that we set arrow.vector.max_allocation_bytes to 1048576 for every test case (to avoid OOM). Please see the pom.xml file.

So if we convert it to a test case, we cannot allocate too much memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding an additional constructor for NettyAllocationManager that also takes a cut-off value that will determine if PlatformDependent.allocateMemory(requestedSize) is used with a default of Int.MAX_VALUE? Then you could create a new NettyAllocationManager for testing with a small cut-off value that can be used to run normal tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.
I have revised the code to make the cut-off value configurable, and added cases to test the scenarios when the request size is below/above the cut-off value. Please see if it looks good to you. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is not run as part of the normal test suite correct? Does it need to run manually? if so could you put that in the javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs to run manually.
I have updated the javadoc accordingly.

testLargeArrowBuf(4 * 1024 * 1024 * 1024L);
testLargeArrowBuf(Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.memory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

import io.netty.buffer.ArrowBuf;

/**
* Test cases for {@link NettyAllocationManager}.
*/
public class TestNettyAllocationManager {

static int CUSTOMIZED_ALLOCATION_CUTOFF_VALUE = 1024;

private BaseAllocator createCustomizedAllocator() {
return new RootAllocator(BaseAllocator.configBuilder()
.allocationManagerFactory((accountingAllocator, requestedSize) ->
new NettyAllocationManager(
accountingAllocator, requestedSize, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE)).build());
}

private void readWriteArrowBuf(ArrowBuf buffer) {
// write buffer
for (long i = 0; i < buffer.capacity() / 8; i++) {
buffer.setLong(i * 8, i);
}

// read buffer
for (long i = 0; i < buffer.capacity() / 8; i++) {
long val = buffer.getLong(i * 8);
assertEquals(i, val);
}
}

/**
* Test the allocation strategy for small buffers..
*/
@Test
public void testSmallBufferAllocation() {
final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE - 512L;
try (BaseAllocator allocator = createCustomizedAllocator();
ArrowBuf buffer = allocator.buffer(bufSize)) {

assertTrue(buffer.getReferenceManager() instanceof BufferLedger);
BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager();

// make sure we are using netty allocation manager
AllocationManager allocMgr = bufferLedger.getAllocationManager();
assertTrue(allocMgr instanceof NettyAllocationManager);
NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr;

// for the small buffer allocation strategy, the chunk is not null
assertNotNull(nettyMgr.getMemoryChunk());

readWriteArrowBuf(buffer);
}
}

/**
* Test the allocation strategy for large buffers..
*/
@Test
public void testLargeBufferAllocation() {
final long bufSize = CUSTOMIZED_ALLOCATION_CUTOFF_VALUE + 1024L;
try (BaseAllocator allocator = createCustomizedAllocator();
ArrowBuf buffer = allocator.buffer(bufSize)) {
assertTrue(buffer.getReferenceManager() instanceof BufferLedger);
BufferLedger bufferLedger = (BufferLedger) buffer.getReferenceManager();

// make sure we are using netty allocation manager
AllocationManager allocMgr = bufferLedger.getAllocationManager();
assertTrue(allocMgr instanceof NettyAllocationManager);
NettyAllocationManager nettyMgr = (NettyAllocationManager) allocMgr;

// for the large buffer allocation strategy, the chunk is null
assertNull(nettyMgr.getMemoryChunk());

readWriteArrowBuf(buffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected static int getValidityBufferSizeFromCount(final int valueCount) {
}

/* round up bytes for the validity buffer for the given valueCount */
private static long roundUp8ForValidityBuffer(int valueCount) {
private static long roundUp8ForValidityBuffer(long valueCount) {
return ((valueCount + 63) >> 6) << 3;
}

Expand All @@ -140,7 +140,7 @@ long computeCombinedBufferSize(int valueCount, int typeWidth) {
// for boolean type, value-buffer and validity-buffer are of same size.
bufferSize *= 2;
} else {
bufferSize += DataSizeRoundingUtil.roundUpTo8Multiple(valueCount * typeWidth);
bufferSize += DataSizeRoundingUtil.roundUpTo8Multiple((long) valueCount * typeWidth);
}
return BaseAllocator.nextPowerOfTwo(bufferSize);
}
Expand Down Expand Up @@ -170,16 +170,16 @@ DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWid
long bufferSize = computeCombinedBufferSize(valueCount, typeWidth);
assert bufferSize <= MAX_ALLOCATION_SIZE;

int validityBufferSize;
int dataBufferSize;
long validityBufferSize;
long dataBufferSize;
if (typeWidth == 0) {
validityBufferSize = dataBufferSize = (int) (bufferSize / 2);
validityBufferSize = dataBufferSize = bufferSize / 2;
} else {
// Due to roundup to power-of-2 allocation, the bufferSize could be greater than the
// requested size. Utilize the allocated buffer fully.;
int actualCount = (int) ((bufferSize * 8.0) / (8 * typeWidth + 1));
long actualCount = (long) ((bufferSize * 8.0) / (8 * typeWidth + 1));
do {
validityBufferSize = (int) roundUp8ForValidityBuffer(actualCount);
validityBufferSize = roundUp8ForValidityBuffer(actualCount);
dataBufferSize = DataSizeRoundingUtil.roundUpTo8Multiple(actualCount * typeWidth);
if (validityBufferSize + dataBufferSize <= bufferSize) {
break;
Expand All @@ -191,14 +191,14 @@ DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWid


/* allocate combined buffer */
ArrowBuf combinedBuffer = allocator.buffer((int) bufferSize);
ArrowBuf combinedBuffer = allocator.buffer(bufferSize);

/* slice into requested lengths */
ArrowBuf dataBuf = null;
ArrowBuf validityBuf = null;
int bufferOffset = 0;
long bufferOffset = 0;
for (int numBuffers = 0; numBuffers < 2; ++numBuffers) {
int len = (numBuffers == 0 ? dataBufferSize : validityBufferSize);
long len = (numBuffers == 0 ? dataBufferSize : validityBufferSize);
ArrowBuf buf = combinedBuffer.slice(bufferOffset, len);
buf.getReferenceManager().retain();
buf.readerIndex(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public long get(int index) throws IllegalStateException {
if (NULL_CHECKING_ENABLED && isSet(index) == 0) {
throw new IllegalStateException("Value at index is null");
}
return valueBuffer.getLong(index * TYPE_WIDTH);
return valueBuffer.getLong((long) index * TYPE_WIDTH);
}

/**
Expand All @@ -128,7 +128,7 @@ public void get(int index, NullableBigIntHolder holder) {
return;
}
holder.isSet = 1;
holder.value = valueBuffer.getLong(index * TYPE_WIDTH);
holder.value = valueBuffer.getLong((long) index * TYPE_WIDTH);
}

/**
Expand All @@ -141,7 +141,7 @@ public Long getObject(int index) {
if (isSet(index) == 0) {
return null;
} else {
return valueBuffer.getLong(index * TYPE_WIDTH);
return valueBuffer.getLong((long) index * TYPE_WIDTH);
}
}

Expand All @@ -153,7 +153,7 @@ public Long getObject(int index) {


private void setValue(int index, long value) {
valueBuffer.setLong(index * TYPE_WIDTH, value);
valueBuffer.setLong((long) index * TYPE_WIDTH, value);
}

/**
Expand Down Expand Up @@ -275,7 +275,7 @@ public void setSafe(int index, int isSet, long value) {
* @return value stored at the index.
*/
public static long get(final ArrowBuf buffer, final int index) {
return buffer.getLong(index * TYPE_WIDTH);
return buffer.getLong((long) index * TYPE_WIDTH);
}


Expand Down
Loading