Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;

/**
* The in-memory representation of FlightData used to manage a stream of Arrow messages.
Expand Down Expand Up @@ -333,14 +334,14 @@ private InputStream asInputStream(BufferAllocator allocator) {

if (appMetadata != null && appMetadata.capacity() > 0) {
// Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not -limit- bytes
cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.asNettyBuffer().nioBuffer().slice());
cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.nioBuffer().slice());
}

cos.writeTag(FlightData.DATA_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
int size = 0;
List<ByteBuf> allBufs = new ArrayList<>();
for (ArrowBuf b : bufs) {
allBufs.add(b.asNettyBuffer());
allBufs.add(Unpooled.wrappedBuffer(b.nioBuffer()).retain());
size += b.readableBytes();
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++.
if (b.readableBytes() % 8 != 0) {
Expand All @@ -349,19 +350,19 @@ private InputStream asInputStream(BufferAllocator allocator) {
size += paddingBytes;
allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain());
}
// gRPC/Netty will decrement the reference count (via the ByteBufInputStream below) when written, so increment
// the reference count
b.getReferenceManager().retain();
}
// rawvarint is used for length definition.
cos.writeUInt32NoTag(size);
cos.flush();

ArrowBuf initialBuf = allocator.buffer(baos.size());
ByteBuf initialBuf = Unpooled.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true,
final CompositeByteBuf bb = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, true,
Math.max(2, bufs.size() + 1),
ImmutableList.<ByteBuf>builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build());
ImmutableList.<ByteBuf>builder()
.add(initialBuf)
.addAll(allBufs)
.build());
final ByteBufInputStream is = new DrainableByteBufInputStream(bb);
return is;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ByteBuf copy(int index, int length) {
@Override
public ByteBuf capacity(int newCapacity) {
if (newCapacity > capacity()) {
ByteBuf newBuf = allocator.buffer(newCapacity).asNettyBuffer();
ByteBuf newBuf = NettyArrowBuf.unwrapBuffer(allocator.buffer(newCapacity));
newBuf.writeBytes(buffer, 0, buffer.capacity());
newBuf.readerIndex(buffer.readerIndex());
newBuf.writerIndex(buffer.writerIndex());
Expand Down
44 changes: 31 additions & 13 deletions java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.ArrowByteBufAllocator;
import org.apache.arrow.memory.BoundsChecking;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;

import io.netty.util.internal.PlatformDependent;
Expand All @@ -48,17 +49,17 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {
/**
* Constructs a new instance.
*
* @param arrowBuf The buffer to wrap.
* @param arrowByteBufAllocator The allocator for the buffer (assumed to be {@link ArrowByteBufAllocator}).
* @param length The length of this buffer.
* @param arrowBuf The buffer to wrap.
* @param bufferAllocator The allocator for the buffer.
* @param length The length of this buffer.
*/
public NettyArrowBuf(
final ArrowBuf arrowBuf,
final ByteBufAllocator arrowByteBufAllocator,
final BufferAllocator bufferAllocator,
final int length) {
super(length);
this.arrowBuf = arrowBuf;
this.arrowByteBufAllocator = (ArrowByteBufAllocator) arrowByteBufAllocator;
this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator);
this.length = length;
this.address = arrowBuf.memoryAddress();
}
Expand Down Expand Up @@ -164,12 +165,12 @@ public int capacity() {

@Override
public NettyArrowBuf slice() {
return arrowBuf.slice(readerIndex, writerIndex - readerIndex).asNettyBuffer();
return unwrapBuffer(arrowBuf.slice(readerIndex, writerIndex - readerIndex));
}

@Override
public NettyArrowBuf slice(int index, int length) {
return arrowBuf.slice(index, length).asNettyBuffer();
return unwrapBuffer(arrowBuf.slice(index, length));
}

@Override
Expand Down Expand Up @@ -252,6 +253,7 @@ public ByteBuffer nioBuffer(long index, int length) {

/**
* Get this ArrowBuf as a direct {@link ByteBuffer}.
*
* @return ByteBuffer
*/
private ByteBuffer getDirectBuffer(long index) {
Expand Down Expand Up @@ -284,8 +286,9 @@ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {

/**
* Determine if the requested {@code index} and {@code length} will fit within {@code capacity}.
* @param index The starting index.
* @param length The length which will be utilized (starting from {@code index}).
*
* @param index The starting index.
* @param length The length which will be utilized (starting from {@code index}).
* @param capacity The capacity that {@code index + length} is allowed to be within.
* @return {@code true} if the requested {@code index} and {@code length} will fit within {@code capacity}.
* {@code false} if this would result in an index out of bounds exception.
Expand Down Expand Up @@ -368,7 +371,7 @@ public int getBytes(int index, FileChannel out, long position, int length) throw
if (length == 0) {
return 0;
} else {
final ByteBuffer tmpBuf = getDirectBuffer(index );
final ByteBuffer tmpBuf = getDirectBuffer(index);
tmpBuf.clear().limit(length);
return out.write(tmpBuf, position);
}
Expand Down Expand Up @@ -404,7 +407,7 @@ protected int _getUnsignedMediumLE(int index) {
this.chk(index, 3);
long addr = this.addr(index);
return PlatformDependent.getByte(addr) & 255 |
(Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8;
(Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8;
}


Expand Down Expand Up @@ -516,7 +519,8 @@ private long addr(long index) {
/**
* Helper function to do bounds checking at a particular
* index for particular length of data.
* @param index index (0 based relative to this ArrowBuf)
*
* @param index index (0 based relative to this ArrowBuf)
* @param fieldLength provided length of data for get/set
*/
private void chk(long index, long fieldLength) {
Expand All @@ -529,7 +533,7 @@ private void chk(long index, long fieldLength) {
}
if (index < 0 || index > capacity() - fieldLength) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
}
}
}
Expand Down Expand Up @@ -601,4 +605,18 @@ public NettyArrowBuf setLong(int index, long value) {
arrowBuf.setLong(index, value);
return this;
}

/**
* unwrap arrow buffer into a netty buffer.
*/
public static NettyArrowBuf unwrapBuffer(ArrowBuf buf) {
final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf(
buf,
buf.getReferenceManager().getAllocator(),
checkedCastToInt(buf.capacity()));
nettyArrowBuf.readerIndex(checkedCastToInt(buf.readerIndex()));
nettyArrowBuf.writerIndex(checkedCastToInt(buf.writerIndex()));
return nettyArrowBuf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,7 @@ public interface Factory {
* @return The created AllocationManager used by this allocator
*/
AllocationManager create(BaseAllocator accountingAllocator, long size);

ArrowBuf empty();
}
}
44 changes: 15 additions & 29 deletions java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;

import io.netty.buffer.NettyArrowBuf;
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.util.internal.PlatformDependent;

/**
* ArrowBuf serves as a facade over underlying memory by providing
* several access APIs to read/write data into a chunk of direct
Expand Down Expand Up @@ -130,22 +126,6 @@ private void ensureAccessible() {
}
}

/**
* Get a wrapper buffer to comply with Netty interfaces and
* can be used in RPC/RPC allocator code.
* @return netty compliant {@link NettyArrowBuf}
*/
public NettyArrowBuf asNettyBuffer() {

final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf(
this,
referenceManager.getAllocator().getAsByteBufAllocator(),
checkedCastToInt(length));
nettyArrowBuf.readerIndex(checkedCastToInt(readerIndex));
nettyArrowBuf.writerIndex(checkedCastToInt(writerIndex));
return nettyArrowBuf;
}

/**
* Get reference manager for this ArrowBuf.
* @return user provided implementation of {@link ReferenceManager}
Expand Down Expand Up @@ -227,13 +207,25 @@ public ArrowBuf slice(long index, long length) {
return newBuf;
}

/**
* Make a nio byte buffer from this arrowbuf.
*/
public ByteBuffer nioBuffer() {
return asNettyBuffer().nioBuffer();
return nioBuffer(readerIndex, checkedCastToInt(readableBytes()));
}


/**
* Make a nio byte buffer from this ArrowBuf.
*/
public ByteBuffer nioBuffer(long index, int length) {
return length == 0 ? ByteBuffer.allocateDirect(0) :
PlatformDependent.directBuffer(memoryAddress() + index, length);
chk(index, length);
return getDirectBuffer(index, length);
}

private ByteBuffer getDirectBuffer(long index, int length) {
long address = addr(index);
return MemoryUtil.directBuffer(address, length);
}

public long memoryAddress() {
Expand Down Expand Up @@ -1244,10 +1236,4 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) {
}
}

/**
* Create an empty ArrowBuf with length.
*/
public static ArrowBuf empty(long length) {
return new ArrowBuf(ReferenceManager.NO_OP, null, length, new PooledByteBufAllocatorL().empty.memoryAddress());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ExpandableByteBuf;
import io.netty.buffer.NettyArrowBuf;

/**
* An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC
Expand Down Expand Up @@ -56,7 +57,7 @@ public ByteBuf buffer() {

@Override
public ByteBuf buffer(int initialCapacity) {
return new ExpandableByteBuf(allocator.buffer(initialCapacity).asNettyBuffer(), allocator);
return new ExpandableByteBuf(NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)), allocator);
}

@Override
Expand Down Expand Up @@ -86,7 +87,7 @@ public ByteBuf directBuffer() {

@Override
public ByteBuf directBuffer(int initialCapacity) {
return allocator.buffer(initialCapacity).asNettyBuffer();
return NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
final AllocationListener listener;
private final BaseAllocator parentAllocator;
private final ArrowByteBufAllocator thisAsByteBufAllocator;
private final Map<BaseAllocator, Object> childAllocators;
private final ArrowBuf empty;
// members used purely for debugging
Expand Down Expand Up @@ -107,7 +106,6 @@ protected BaseAllocator(
this.parentAllocator = parentAllocator;
this.name = name;

this.thisAsByteBufAllocator = new ArrowByteBufAllocator(this);
this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>());

if (DEBUG) {
Expand Down Expand Up @@ -239,7 +237,7 @@ public ArrowBuf buffer(final long initialRequestSize) {
}

private ArrowBuf createEmpty() {
return new ArrowBuf(ReferenceManager.NO_OP, null, 0, NettyAllocationManager.EMPTY.memoryAddress());
return allocationManagerFactory.empty();
}

@Override
Expand All @@ -249,7 +247,7 @@ public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");

if (initialRequestSize == 0) {
return empty;
return getEmpty();
}

// round the request size according to the rounding policy
Expand Down Expand Up @@ -313,11 +311,6 @@ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator
return allocationManagerFactory.create(accountingAllocator, size);
}

@Override
public ArrowByteBufAllocator getAsByteBufAllocator() {
return thisAsByteBufAllocator;
}

@Override
public BufferAllocator newChildAllocator(
final String name,
Expand Down Expand Up @@ -756,7 +749,7 @@ long getMaxAllocation() {
*/
@Value.Default
RoundingPolicy getRoundingPolicy() {
return DefaultRoundingPolicy.INSTANCE;
return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.Collection;

import io.netty.buffer.ByteBufAllocator;

/**
* Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.
*/
Expand Down Expand Up @@ -51,16 +49,6 @@ public interface BufferAllocator extends AutoCloseable {
*/
ArrowBuf buffer(long size, BufferManager manager);

/**
* Returns the allocator this allocator falls back to when it needs more memory.
*
* @return the underlying allocator used by this allocator
*
* @deprecated This method may be removed in a future release.
*/
@Deprecated
ByteBufAllocator getAsByteBufAllocator();

/**
* Create a new child allocator.
*
Expand Down
Loading