Skip to content

Commit

Permalink
Hybrid Java buffer release: Support both manual and auto release at t…
Browse files Browse the repository at this point in the history
…he same time (#34)

* Revert "Add AutoBufferLedger (#31)"

This reverts commit e48da37.

* Commit 1

* Commit 2

* Fix config builder visibility in Scala

* Commit 2 Fixup

* Commit 3

* Commit 3 Fixup
  • Loading branch information
zhztheplayer authored Sep 30, 2021
1 parent a11e628 commit 609f033
Show file tree
Hide file tree
Showing 42 changed files with 2,228 additions and 1,673 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void retain(int increment) {
bufRefCnt.addAndGet(increment);
}

@Override
public boolean isOpen() {
return getRefCount() > 0;
}

@Override
public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;

/**
Expand All @@ -30,6 +29,20 @@
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
private final Method methodReserve;
private final Method methodUnreserve;

private DirectReservationListener() {
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
methodReserve.setAccessible(true);
methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
methodUnreserve.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static final DirectReservationListener INSTANCE = new DirectReservationListener();

Expand All @@ -42,22 +55,43 @@ public static DirectReservationListener instance() {
*/
@Override
public void reserve(long size) {
MemoryUtil.reserveDirectMemory(size);
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodReserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
*/
@Override
public void unreserve(long size) {
MemoryUtil.unreserveDirectMemory(size);
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodUnreserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Get current reservation of jVM direct memory. Visible for testing.
*/
@VisibleForTesting
public long getCurrentDirectMemReservation() {
return MemoryUtil.getCurrentDirectMemReservation();
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
final Field f = classBits.getDeclaredField("reservedMemory");
f.setAccessible(true);
return ((AtomicLong) f.get(null)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.jni;

import org.apache.arrow.memory.*;

/**
* MemoryChunkManager implementation for native allocated memory.
*/
public class NativeUnderlyingMemory implements MemoryChunk {

private final int size;
private final long nativeInstanceId;
private final long address;

/**
* Constructor.
*
* @param size Size of underlying memory (in bytes)
* @param nativeInstanceId ID of the native instance
* @param address Address of underlying memory
*/
NativeUnderlyingMemory(int size, long nativeInstanceId, long address) {
this.size = size;
this.nativeInstanceId = nativeInstanceId;
this.address = address;
}

@Override
public long size() {
return size;
}

@Override
public long memoryAddress() {
return address;
}

@Override
public void destroy() {
JniWrapper.get().releaseBuffer(nativeInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.*;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.NoCompressionCodec;
Expand Down Expand Up @@ -126,10 +127,9 @@ public static ArrowRecordBatch deserializeUnsafe(
final byte[] refDecoded = Base64.getDecoder().decode(keyValue.value());
final long nativeBufferRef = ByteBuffer.wrap(refDecoded).order(ByteOrder.LITTLE_ENDIAN).getLong();
final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length());
final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
size, nativeBufferRef, bufferMeta.offset());
ReferenceManager rm = am.createReferenceManager(allocator);
ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset());
final NativeUnderlyingMemory chunk = new NativeUnderlyingMemory(size, nativeBufferRef,
bufferMeta.offset());
ArrowBuf buf = allocator.buffer(chunk);
buffers.add(buf);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.arrow.memory;
package org.apache.arrow.dataset.jni;

import static org.junit.Assert.*;

import org.apache.arrow.memory.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -46,41 +47,41 @@ public void testReservation() {
final RootAllocator root = rootAllocator();

final int size = 512;
final AllocationManager am = new MockUnderlyingMemory(root, size);
final BufferLedger ledger = am.associate(root);
final MemoryChunk chunk = new MockUnderlyingMemory(size);
final ArrowBuf buffer = root.buffer(chunk);

assertEquals(size, root.getAllocatedMemory());

ledger.release();
buffer.close();
}

@Test
public void testBufferTransfer() {
final RootAllocator root = rootAllocator();

ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
BufferAllocator allocator1 = root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
BufferAllocator allocator2 = root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
assertEquals(0, allocator1.getAllocatedMemory());
assertEquals(0, allocator2.getAllocatedMemory());

final int size = 512;
final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
final MemoryChunk chunk = new MockUnderlyingMemory(size);
final ArrowBuf buffer = allocator1.buffer(chunk);

final BufferLedger owningLedger = am.associate(allocator1);
assertEquals(size, owningLedger.getAccountedSize());
assertEquals(size, owningLedger.getSize());
assertEquals(size, buffer.getActualMemoryConsumed());
assertEquals(size, buffer.getPossibleMemoryConsumed());
assertEquals(size, allocator1.getAllocatedMemory());

final BufferLedger transferredLedger = am.associate(allocator2);
owningLedger.release(); // release previous owner
assertEquals(0, owningLedger.getAccountedSize());
assertEquals(size, owningLedger.getSize());
assertEquals(size, transferredLedger.getAccountedSize());
assertEquals(size, transferredLedger.getSize());
final ArrowBuf transferredBuffer = buffer.getReferenceManager().retain(buffer, allocator2);
buffer.close(); // release previous owner
assertEquals(0, buffer.getActualMemoryConsumed());
assertEquals(size, buffer.getPossibleMemoryConsumed());
assertEquals(size, transferredBuffer.getActualMemoryConsumed());
assertEquals(size, transferredBuffer.getPossibleMemoryConsumed());
assertEquals(0, allocator1.getAllocatedMemory());
assertEquals(size, allocator2.getAllocatedMemory());

transferredLedger.release();
transferredBuffer.close();
allocator1.close();
allocator2.close();
}
Expand All @@ -93,18 +94,18 @@ private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
/**
* Constructor.
*/
MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
super(accountingAllocator, size, -1L, -1L);
MockUnderlyingMemory(int size) {
super(size, -1L, -1L);
}

@Override
protected void release0() {
System.out.println("Underlying memory released. Size: " + getSize());
public void destroy() {
System.out.println("Underlying memory released. Size: " + size());
}

@Override
protected long memoryAddress() {
throw new UnsupportedOperationException();
public long memoryAddress() {
return -1L;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ AllocationOutcome allocateBytes(long size) {
} else {
// Try again, but with details this time.
// Populating details only on failures avoids performance overhead in the common case (success case).
AllocationOutcomeDetails details = new AllocationOutcomeDetails();
AllocationOutcomeDetails details = new AllocationOutcomeDetails(this);
status = allocateBytesInternal(size, details);
return new AllocationOutcome(status, details);
}
Expand Down
Loading

0 comments on commit 609f033

Please sign in to comment.