diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java
index fdec337e85d39..bfb252ce75388 100644
--- a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java
@@ -78,6 +78,11 @@ public void retain(int increment) {
bufRefCnt.addAndGet(increment);
}
+ @Override
+ public boolean isOpen() {
+ return getRefCount() > 0;
+ }
+
@Override
public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
retain();
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
index 57ea5f19a7e02..72a1cadcf69b9 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
@@ -21,7 +21,6 @@
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;
/**
@@ -30,6 +29,20 @@
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
+ private final Method methodReserve;
+ private final Method methodUnreserve;
+
+ private DirectReservationListener() {
+ try {
+ final Class> classBits = Class.forName("java.nio.Bits");
+ methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
+ methodReserve.setAccessible(true);
+ methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
+ methodUnreserve.setAccessible(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
private static final DirectReservationListener INSTANCE = new DirectReservationListener();
@@ -42,7 +55,14 @@ public static DirectReservationListener instance() {
*/
@Override
public void reserve(long size) {
- MemoryUtil.reserveDirectMemory(size);
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ methodReserve.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -50,7 +70,14 @@ public void reserve(long size) {
*/
@Override
public void unreserve(long size) {
- MemoryUtil.unreserveDirectMemory(size);
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ methodUnreserve.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -58,6 +85,13 @@ public void unreserve(long size) {
*/
@VisibleForTesting
public long getCurrentDirectMemReservation() {
- return MemoryUtil.getCurrentDirectMemReservation();
+ try {
+ final Class> classBits = Class.forName("java.nio.Bits");
+ final Field f = classBits.getDeclaredField("reservedMemory");
+ f.setAccessible(true);
+ return ((AtomicLong) f.get(null)).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeUnderlyingMemory.java
new file mode 100644
index 0000000000000..ea1c8bb53a6c8
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeUnderlyingMemory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.jni;
+
+import org.apache.arrow.memory.*;
+
+/**
+ * MemoryChunkManager implementation for native allocated memory.
+ */
+public class NativeUnderlyingMemory implements MemoryChunk {
+
+ private final int size;
+ private final long nativeInstanceId;
+ private final long address;
+
+ /**
+ * Constructor.
+ *
+ * @param size Size of underlying memory (in bytes)
+ * @param nativeInstanceId ID of the native instance
+ * @param address Address of underlying memory
+ */
+ NativeUnderlyingMemory(int size, long nativeInstanceId, long address) {
+ this.size = size;
+ this.nativeInstanceId = nativeInstanceId;
+ this.address = address;
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ @Override
+ public long memoryAddress() {
+ return address;
+ }
+
+ @Override
+ public void destroy() {
+ JniWrapper.get().releaseBuffer(nativeInstanceId);
+ }
+}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
index e1b6766f6ad60..0d8ddf619141b 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
@@ -35,7 +35,8 @@
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.RecordBatch;
-import org.apache.arrow.memory.*;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.NoCompressionCodec;
@@ -126,10 +127,9 @@ public static ArrowRecordBatch deserializeUnsafe(
final byte[] refDecoded = Base64.getDecoder().decode(keyValue.value());
final long nativeBufferRef = ByteBuffer.wrap(refDecoded).order(ByteOrder.LITTLE_ENDIAN).getLong();
final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length());
- final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
- size, nativeBufferRef, bufferMeta.offset());
- ReferenceManager rm = am.createReferenceManager(allocator);
- ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset());
+ final NativeUnderlyingMemory chunk = new NativeUnderlyingMemory(size, nativeBufferRef,
+ bufferMeta.offset());
+ ArrowBuf buf = allocator.buffer(chunk);
buffers.add(buf);
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
index 61cf3072203de..e69de29bb2d1d 100644
--- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
+++ b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import org.apache.arrow.dataset.jni.JniWrapper;
-
-/**
- * AllocationManager implementation for native allocated memory.
- */
-public class NativeUnderlyingMemory extends AllocationManager {
-
- private final int size;
- private final long nativeInstanceId;
- private final long address;
-
- /**
- * Constructor.
- *
- * @param accountingAllocator The accounting allocator instance
- * @param size Size of underlying memory (in bytes)
- * @param nativeInstanceId ID of the native instance
- */
- public NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) {
- super(accountingAllocator);
- this.size = size;
- this.nativeInstanceId = nativeInstanceId;
- this.address = address;
- // pre-allocate bytes on accounting allocator
- final AllocationListener listener = accountingAllocator.getListener();
- try (final AllocationReservation reservation = accountingAllocator.newReservation()) {
- listener.onPreAllocation(size);
- reservation.reserve(size);
- listener.onAllocation(size);
- } catch (Exception e) {
- release0();
- throw e;
- }
- }
-
- /**
- * Alias to constructor.
- */
- public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId,
- long address) {
- return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address);
- }
-
- public ReferenceManager createReferenceManager(BufferAllocator allocator) {
- return super.associate(allocator).newReferenceManager();
- }
-
- @Override
- protected void release0() {
- JniWrapper.get().releaseBuffer(nativeInstanceId);
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- protected long memoryAddress() {
- return address;
- }
-}
diff --git a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeUnderlyingMemory.java
similarity index 63%
rename from java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
rename to java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeUnderlyingMemory.java
index c81868e42b279..90f195c3417b9 100644
--- a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeUnderlyingMemory.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.memory;
+package org.apache.arrow.dataset.jni;
import static org.junit.Assert.*;
+import org.apache.arrow.memory.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,41 +47,41 @@ public void testReservation() {
final RootAllocator root = rootAllocator();
final int size = 512;
- final AllocationManager am = new MockUnderlyingMemory(root, size);
- final BufferLedger ledger = am.associate(root);
+ final MemoryChunk chunk = new MockUnderlyingMemory(size);
+ final ArrowBuf buffer = root.buffer(chunk);
assertEquals(size, root.getAllocatedMemory());
- ledger.release();
+ buffer.close();
}
@Test
public void testBufferTransfer() {
final RootAllocator root = rootAllocator();
- ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
- ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
+ BufferAllocator allocator1 = root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
+ BufferAllocator allocator2 = root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
assertEquals(0, allocator1.getAllocatedMemory());
assertEquals(0, allocator2.getAllocatedMemory());
final int size = 512;
- final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
+ final MemoryChunk chunk = new MockUnderlyingMemory(size);
+ final ArrowBuf buffer = allocator1.buffer(chunk);
- final BufferLedger owningLedger = am.associate(allocator1);
- assertEquals(size, owningLedger.getAccountedSize());
- assertEquals(size, owningLedger.getSize());
+ assertEquals(size, buffer.getActualMemoryConsumed());
+ assertEquals(size, buffer.getPossibleMemoryConsumed());
assertEquals(size, allocator1.getAllocatedMemory());
- final BufferLedger transferredLedger = am.associate(allocator2);
- owningLedger.release(); // release previous owner
- assertEquals(0, owningLedger.getAccountedSize());
- assertEquals(size, owningLedger.getSize());
- assertEquals(size, transferredLedger.getAccountedSize());
- assertEquals(size, transferredLedger.getSize());
+ final ArrowBuf transferredBuffer = buffer.getReferenceManager().retain(buffer, allocator2);
+ buffer.close(); // release previous owner
+ assertEquals(0, buffer.getActualMemoryConsumed());
+ assertEquals(size, buffer.getPossibleMemoryConsumed());
+ assertEquals(size, transferredBuffer.getActualMemoryConsumed());
+ assertEquals(size, transferredBuffer.getPossibleMemoryConsumed());
assertEquals(0, allocator1.getAllocatedMemory());
assertEquals(size, allocator2.getAllocatedMemory());
- transferredLedger.release();
+ transferredBuffer.close();
allocator1.close();
allocator2.close();
}
@@ -93,18 +94,18 @@ private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
/**
* Constructor.
*/
- MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
- super(accountingAllocator, size, -1L, -1L);
+ MockUnderlyingMemory(int size) {
+ super(size, -1L, -1L);
}
@Override
- protected void release0() {
- System.out.println("Underlying memory released. Size: " + getSize());
+ public void destroy() {
+ System.out.println("Underlying memory released. Size: " + size());
}
@Override
- protected long memoryAddress() {
- throw new UnsupportedOperationException();
+ public long memoryAddress() {
+ return -1L;
}
}
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java
index 42dac7b8c6015..41707e7ef0aa1 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java
@@ -97,7 +97,7 @@ AllocationOutcome allocateBytes(long size) {
} else {
// Try again, but with details this time.
// Populating details only on failures avoids performance overhead in the common case (success case).
- AllocationOutcomeDetails details = new AllocationOutcomeDetails();
+ AllocationOutcomeDetails details = new AllocationOutcomeDetails(this);
status = allocateBytesInternal(size, details);
return new AllocationOutcome(status, details);
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
deleted file mode 100644
index 4bba7947ff8bd..0000000000000
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.arrow.util.Preconditions;
-
-/**
- * The abstract base class of AllocationManager.
- *
- *
Manages the relationship between one or more allocators and a particular UDLE. Ensures that
- * one allocator owns the
- * memory that multiple allocators may be referencing. Manages a BufferLedger between each of its
- * associated allocators.
- *
- *
The only reason that this isn't package private is we're forced to put ArrowBuf in Netty's
- * package which need access
- * to these objects or methods.
- *
- *
Threading: AllocationManager manages thread-safety internally. Operations within the context
- * of a single BufferLedger
- * are lockless in nature and can be leveraged by multiple threads. Operations that cross the
- * context of two ledgers
- * will acquire a lock on the AllocationManager instance. Important note, there is one
- * AllocationManager per
- * UnsafeDirectLittleEndian buffer allocation. As such, there will be thousands of these in a
- * typical query. The
- * contention of acquiring a lock on AllocationManager should be very low.
- */
-public abstract class AllocationManager {
-
- private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
-
- private final BufferAllocator root;
- private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
- // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
- // see JIRA for details
- private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>();
- private final long amCreationTime = System.nanoTime();
-
- // The ReferenceManager created at the time of creation of this AllocationManager
- // is treated as the owning reference manager for the underlying chunk of memory
- // managed by this allocation manager
- private volatile BufferLedger owningLedger;
- private volatile long amDestructionTime = 0;
-
- protected AllocationManager(BufferAllocator accountingAllocator) {
- Preconditions.checkNotNull(accountingAllocator);
- accountingAllocator.assertOpen();
-
- this.root = accountingAllocator.getRoot();
-
- // we do a no retain association since our creator will want to retrieve the newly created
- // ledger and will create a reference count at that point
- this.owningLedger = associate(accountingAllocator, false);
- }
-
- BufferLedger getOwningLedger() {
- return owningLedger;
- }
-
- void setOwningLedger(final BufferLedger ledger) {
- this.owningLedger = ledger;
- }
-
- /**
- * Associate the existing underlying buffer with a new allocator. This will increase the
- * reference count on the corresponding buffer ledger by 1
- *
- * @param allocator The target allocator to associate this buffer with.
- * @return The reference manager (new or existing) that associates the underlying
- * buffer to this new ledger.
- */
- BufferLedger associate(final BufferAllocator allocator) {
- return associate(allocator, true);
- }
-
- private BufferLedger associate(final BufferAllocator allocator, final boolean retain) {
- allocator.assertOpen();
- Preconditions.checkState(root == allocator.getRoot(),
- "A buffer can only be associated between two allocators that share the same root");
-
- synchronized (this) {
- BufferLedger ledger = map.get(allocator);
- if (ledger != null) {
- if (retain) {
- // bump the ref count for the ledger
- ledger.increment();
- }
- return ledger;
- }
-
- ledger = allocator.getBufferLedgerFactory().create(allocator, this);
-
- if (retain) {
- // the new reference manager will have a ref count of 1
- ledger.increment();
- }
-
- // store the mapping for
- BufferLedger oldLedger = map.put(ledger);
- Preconditions.checkState(oldLedger == null,
- "Detected inconsistent state: A reference manager already exists for this allocator");
-
- if (allocator instanceof BaseAllocator) {
- // needed for debugging only: keep a pointer to reference manager inside allocator
- // to dump state, verify allocator state etc
- ((BaseAllocator) allocator).associateLedger(ledger);
- }
- return ledger;
- }
- }
-
- /**
- * The way that a particular ReferenceManager (BufferLedger) communicates back to the
- * AllocationManager that it no longer needs to hold a reference to a particular
- * piece of memory. Reference manager needs to hold a lock to invoke this method
- * It is called when the shared refcount of all the ArrowBufs managed by the
- * calling ReferenceManager drops to 0.
- */
- void release(final BufferLedger ledger) {
- final BufferAllocator allocator = ledger.getAllocator();
- allocator.assertOpen();
-
- // remove the mapping for the allocator
- // of calling BufferLedger
- Preconditions.checkState(map.containsKey(allocator),
- "Expecting a mapping for allocator and reference manager");
- final BufferLedger oldLedger = map.remove(allocator);
-
- BufferAllocator oldAllocator = oldLedger.getAllocator();
- if (oldAllocator instanceof BaseAllocator) {
- // needed for debug only: tell the allocator that AllocationManager is removing a
- // reference manager associated with this particular allocator
- ((BaseAllocator) oldAllocator).dissociateLedger(oldLedger);
- }
-
- if (oldLedger == owningLedger) {
- // the release call was made by the owning reference manager
- if (map.isEmpty()) {
- // the only mapping was for the owner
- // which now has been removed, it implies we can safely destroy the
- // underlying memory chunk as it is no longer being referenced
- oldAllocator.releaseBytes(getSize());
- // free the memory chunk associated with the allocation manager
- release0();
- oldAllocator.getListener().onRelease(getSize());
- amDestructionTime = System.nanoTime();
- owningLedger = null;
- } else {
- // since the refcount dropped to 0 for the owning reference manager and allocation
- // manager will no longer keep a mapping for it, we need to change the owning
- // reference manager to whatever the next available
- // mapping exists.
- BufferLedger newOwningLedger = map.getNextValue();
- // we'll forcefully transfer the ownership and not worry about whether we
- // exceeded the limit since this consumer can't do anything with this.
- oldLedger.transferBalance(newOwningLedger);
- }
- } else {
- // the release call was made by a non-owning reference manager, so after remove there have
- // to be 1 or more mappings
- Preconditions.checkState(map.size() > 0,
- "The final removal of reference manager should be connected to owning reference manager");
- }
- }
-
- /**
- * Return the size of underlying chunk of memory managed by this Allocation Manager.
- *
- *
The underlying memory chunk managed can be different from the original requested size.
- *
- * @return size of underlying memory chunk
- */
- public abstract long getSize();
-
- /**
- * Return the absolute memory address pointing to the fist byte of underlying memory chunk.
- */
- protected abstract long memoryAddress();
-
- /**
- * Release the underlying memory chunk.
- */
- protected abstract void release0();
-
- /**
- * A factory interface for creating {@link AllocationManager}.
- * One may extend this interface to use a user-defined AllocationManager implementation.
- */
- public interface Factory {
-
- /**
- * Create an {@link AllocationManager}.
- *
- * @param accountingAllocator The allocator that are expected to be associated with newly created AllocationManager.
- * Currently it is always equivalent to "this"
- * @param size Size (in bytes) of memory managed by the AllocationManager
- * @return The created AllocationManager used by this allocator
- */
- AllocationManager create(BufferAllocator accountingAllocator, long size);
-
- ArrowBuf empty();
- }
-}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationOutcomeDetails.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationOutcomeDetails.java
index 6499ce84b1a10..34b7395e3fee1 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationOutcomeDetails.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationOutcomeDetails.java
@@ -24,10 +24,11 @@
* Captures details of allocation for each accountant in the hierarchical chain.
*/
public class AllocationOutcomeDetails {
- Deque allocEntries;
+ final Accountant accountant;
+ final Deque allocEntries = new ArrayDeque<>();
- AllocationOutcomeDetails() {
- allocEntries = new ArrayDeque<>();
+ AllocationOutcomeDetails(Accountant accountant) {
+ this.accountant = accountant;
}
void pushEntry(Accountant accountant, long totalUsedBeforeAllocation, long requestedSize,
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
index 43a271464a5d6..12c13d6717416 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
@@ -50,7 +50,7 @@
* The management (allocation, deallocation, reference counting etc) for
* the memory chunk is not done by ArrowBuf.
* Default implementation of ReferenceManager, allocation is in
- * {@link BaseAllocator}, {@link BufferLedger} and {@link AllocationManager}
+ * {@link BaseAllocator}, {@link BufferLedger} and {@link MemoryChunkManager}
*
*/
public final class ArrowBuf implements AutoCloseable {
@@ -101,6 +101,15 @@ public int refCnt() {
return referenceManager.getRefCount();
}
+ /**
+ * Whether this ArrowBuf is open for read or write.
+ *
+ * @return true if currently accessible, false if not
+ */
+ public boolean isOpen() {
+ return referenceManager.isOpen();
+ }
+
/**
* Allows a function to determine whether not reading a particular string of bytes is valid.
*
@@ -121,8 +130,8 @@ public void checkBytes(long start, long end) {
* For get/set operations, reference count should be >= 1.
*/
private void ensureAccessible() {
- if (this.refCnt() == 0) {
- throw new IllegalStateException("Ref count should be >= 1 for accessing the ArrowBuf");
+ if (!isOpen()) {
+ throw new IllegalStateException("ArrowBuf should be open for accessing the ArrowBuf");
}
}
@@ -264,7 +273,7 @@ public boolean equals(Object obj) {
*
* If the starting virtual address of chunk is MAR, then memory
* address of this ArrowBuf is MAR + offset -- this is what is stored
- * in variable addr. See the BufferLedger and AllocationManager code
+ * in variable addr. See the BufferLedger and MemoryChunkManager code
* for the implementation of ReferenceManager that manages a
* chunk of memory and creates ArrowBuf with access to a range of
* bytes within the chunk (or the entire chunk)
@@ -1094,15 +1103,6 @@ public long getId() {
return id;
}
- /**
- * Create a logger of this {@link ArrowBuf}.
- *
- * @return the newly created logger
- */
- Logger createLogger() {
- return new Logger(id, memoryAddress(), length, historicalLog);
- }
-
/**
* Prints information of this buffer into sb at the given
* indentation and verbosity level.
@@ -1112,7 +1112,12 @@ Logger createLogger() {
*
*/
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
- new Logger(id, addr, length, historicalLog).print(sb, indent, verbosity);
+ CommonUtil.indent(sb, indent).append(toString());
+
+ if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
+ sb.append("\n");
+ historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
+ }
}
/**
@@ -1222,36 +1227,4 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) {
}
}
- /**
- * Create a logger for an {@link ArrowBuf}. This is currently used in debugging or historical logging
- * in code of {@link BufferLedger} to avoid directly holding a strong reference to {@link ArrowBuf}.
- * So that GC could be able to involved in auto cleaning logic in {@link AutoBufferLedger}.
- */
- static class Logger {
- private final long id;
- private final long addr;
- private final long length;
- private final HistoricalLog historicalLog;
-
- public Logger(long id, long addr, long length, HistoricalLog historicalLog) {
- this.id = id;
- this.addr = addr;
- this.length = length;
- this.historicalLog = historicalLog;
- }
-
- public void print(StringBuilder sb, int indent, Verbosity verbosity) {
- CommonUtil.indent(sb, indent).append(toString());
-
- if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
- sb.append("\n");
- historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
- }
- }
-
- @Override
- public String toString() {
- return String.format("ArrowBuf.Logger[%d], address:%d, length:%d", id, addr, length);
- }
- }
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
deleted file mode 100644
index cc61cb90cbe80..0000000000000
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import sun.misc.Cleaner;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * An alternative implementation of {@link BufferLedger}. The reference is auto managed by JVM garbage collector
- * comparing to {@link LegacyBufferLedger}. Explicit calls to reference management methods such as
- * {@link #retain()} and {@link #release()} will be ignored.
- *
- *
- * Note when this implementation, the accurate release time of the underlying {@link AllocationManager} may become
- * unpredictable because we are relying GC to do clean-up. As a result, it's recommended to specify very large
- * allocation limit (e.g. {@link Integer#MAX_VALUE}) to the corresponding {@link BufferAllocator} to avoid
- * unexpected allocation failures.
- *
- *
- *
- * Also, to let the GC be aware of these allocations when off-heap based
- * {@link AllocationManager}s are used, it's required to also add the allocated sizes to JVM direct
- * memory counter (which can be limited by specifying JVM option "-XX:MaxDirectMemorySize"). To
- * achieve this one can simply set allocator's {@link AllocationListener} to
- * {@link DirectAllocationListener}.
- * JVM should ensure that garbage collection will be performed once total reservation reached the limit.
- *
- */
-public class AutoBufferLedger extends BufferLedger {
-
- public static class Factory implements BufferLedger.Factory, AutoCloseable {
- private AutoBufferLedger tail = null;
-
- @Override
- public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) {
- return new AutoBufferLedger(allocator, allocationManager, this);
- }
-
- private void link(AutoBufferLedger ledger) {
- synchronized (this) {
- if (ledger.next != null || ledger.prev != null) {
- throw new IllegalStateException("already linked");
- }
- if (tail == null) {
- tail = ledger;
- return;
- }
- tail.next = ledger;
- ledger.prev = tail;
- tail = ledger;
- }
- }
-
- private void unlink(AutoBufferLedger ledger) {
- synchronized (this) {
- if (ledger.next == ledger) {
- return;
- }
- if (ledger.prev == ledger) {
- throw new IllegalStateException();
- }
- if (ledger == tail) {
- tail = ledger.prev;
- }
- if (ledger.prev != null) {
- ledger.prev.next = ledger.next;
- }
- if (ledger.next != null) {
- ledger.next.prev = ledger.prev;
- }
- ledger.prev = ledger;
- ledger.next = ledger;
- }
- }
-
- @Override
- public void close() {
- synchronized (this) {
- while (tail != null) {
- final AutoBufferLedger tmp = tail.prev;
- tail.destruct();
- tail = tmp;
- }
- }
- }
- }
-
- public static Factory newFactory() {
- return new Factory();
- }
-
- private volatile long lDestructionTime = 0;
- private final AtomicInteger refCount = new AtomicInteger(0);
- private final AtomicBoolean destructed = new AtomicBoolean(false);
- private final Factory factory;
-
- private AutoBufferLedger prev = null;
- private AutoBufferLedger next = null;
-
- AutoBufferLedger(BufferAllocator allocator, AllocationManager allocationManager,
- Factory factory) {
- super(allocator, allocationManager);
- this.factory = factory;
- factory.link(this);
- }
-
- @Override
- protected long getDestructionTime() {
- return lDestructionTime;
- }
-
- @Override
- protected ReferenceManager newReferenceManager() {
- reserve0();
- final ReferenceManager rm = new BaseReferenceManager(this);
- Cleaner.create(rm, new LedgerDeallocator());
- return rm;
- }
-
- @Override
- public int getRefCount() {
- return refCount.get();
- }
-
- @Override
- protected void increment() {
- // no-op
- }
-
- @Override
- public boolean release() {
- return false;
- }
-
- @Override
- public boolean release(int decrement) {
- return false;
- }
-
- @Override
- public void retain() {
-
- }
-
- @Override
- public void retain(int increment) {
-
- }
-
- private void reserve0() {
- if (refCount.getAndAdd(1) == 0) {
- // no-op
- }
- }
-
- private void release0() {
- if (refCount.addAndGet(-1) == 0) {
- destruct();
- }
- }
-
- private void destruct() {
- if (!destructed.compareAndSet(false, true)) {
- return;
- }
- synchronized (getAllocationManager()) {
- final AllocationManager am = getAllocationManager();
- lDestructionTime = System.nanoTime();
- am.release(this);
- }
- factory.unlink(this);
- }
-
- /**
- * Release hook will be invoked by JVM cleaner.
- *
- * @see #newReferenceManager()
- */
- private class LedgerDeallocator implements Runnable {
-
- private LedgerDeallocator() {
- }
-
- @Override
- public void run() {
- release0();
- }
- }
-}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 9af1cdf3e0255..1836d62df9cc9 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -45,7 +45,7 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
public static final boolean DEBUG;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
- // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the allocation manager,
+ // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the MemoryChunkManager,
// which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized
static {
// the system property takes precedence.
@@ -73,8 +73,8 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
private final IdentityHashMap reservations;
private final HistoricalLog historicalLog;
private final RoundingPolicy roundingPolicy;
- private final AllocationManager.Factory allocationManagerFactory;
- private final BufferLedger.Factory bufferLedgerFactory;
+ private final MemoryChunkAllocator memoryChunkAllocator;
+ private final MemoryChunkManager.Factory memoryChunkManagerFactory;
private volatile boolean isClosed = false; // the allocator has been closed
@@ -94,8 +94,8 @@ protected BaseAllocator(
super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation());
this.listener = config.getListener();
- this.allocationManagerFactory = config.getAllocationManagerFactory();
- this.bufferLedgerFactory = config.getBufferLedgerFactory();
+ this.memoryChunkAllocator = config.getMemoryChunkAllocator();
+ this.memoryChunkManagerFactory = config.getMemoryChunkManagerFactory();
if (parentAllocator != null) {
this.root = parentAllocator.root;
@@ -143,11 +143,6 @@ public Collection getChildAllocators() {
}
}
- @Override
- public BufferLedger.Factory getBufferLedgerFactory() {
- return bufferLedgerFactory;
- }
-
private static String createErrorMsg(final BufferAllocator allocator, final long rounded, final long requested) {
if (rounded != requested) {
return String.format(
@@ -185,7 +180,7 @@ public ArrowBuf getEmpty() {
}
/**
- * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
+ * For debug/verification purposes only. Allows an MemoryChunkManager to tell the allocator that
* we have a new ledger
* associated with this allocator.
*/
@@ -199,7 +194,7 @@ void associateLedger(BufferLedger ledger) {
}
/**
- * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
+ * For debug/verification purposes only. Allows an MemoryChunkManager to tell the allocator that
* we are removing a
* ledger associated with this allocator
*/
@@ -242,17 +237,37 @@ private void childClosed(final BaseAllocator childAllocator) {
@Override
public ArrowBuf buffer(final long initialRequestSize) {
- assertOpen();
-
- return buffer(initialRequestSize, null);
+ return buffer(initialRequestSize, memoryChunkAllocator, true, null);
}
private ArrowBuf createEmpty() {
- return allocationManagerFactory.empty();
+ return memoryChunkAllocator.empty();
}
@Override
public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
+ return buffer(initialRequestSize, memoryChunkAllocator, true, manager);
+ }
+
+ @Override
+ public ArrowBuf buffer(MemoryChunk chunk) {
+ final long size = chunk.size();
+ return buffer(size, new MemoryChunkAllocator() {
+ @Override
+ public MemoryChunk allocate(long requestedSize) {
+ Preconditions.checkState(requestedSize == size);
+ return chunk;
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ throw new UnsupportedOperationException();
+ }
+ }, false, null);
+ }
+
+ private ArrowBuf buffer(final long initialRequestSize, MemoryChunkAllocator chunkAllocator, boolean round,
+ BufferManager manager) {
assertOpen();
Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");
@@ -262,7 +277,7 @@ public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
}
// round the request size according to the rounding policy
- final long actualRequestSize = roundingPolicy.getRoundedSize(initialRequestSize);
+ final long actualRequestSize = round ? roundingPolicy.getRoundedSize(initialRequestSize) : initialRequestSize;
listener.onPreAllocation(actualRequestSize);
@@ -280,7 +295,7 @@ public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
boolean success = false;
try {
- ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
+ ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, chunkAllocator, manager);
success = true;
listener.onAllocation(actualRequestSize);
return buffer;
@@ -299,10 +314,11 @@ public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
*/
private ArrowBuf bufferWithoutReservation(
final long size,
+ MemoryChunkAllocator chunkAllocator,
BufferManager bufferManager) throws OutOfMemoryException {
assertOpen();
- final AllocationManager manager = newAllocationManager(size);
+ final MemoryChunkManager manager = newMemoryChunkManager(chunkAllocator, size);
final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager);
@@ -313,13 +329,9 @@ private ArrowBuf bufferWithoutReservation(
return buffer;
}
- private AllocationManager newAllocationManager(long size) {
- return newAllocationManager(this, size);
- }
-
-
- private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, long size) {
- return allocationManagerFactory.create(accountingAllocator, size);
+ private MemoryChunkManager newMemoryChunkManager(MemoryChunkAllocator chunkAllocator,
+ long size) {
+ return memoryChunkManagerFactory.create(chunkAllocator.allocate(size));
}
@Override
@@ -349,8 +361,8 @@ public BufferAllocator newChildAllocator(
.initReservation(initReservation)
.maxAllocation(maxAllocation)
.roundingPolicy(roundingPolicy)
- .allocationManagerFactory(allocationManagerFactory)
- .bufferLedgerFactory(bufferLedgerFactory)
+ .memoryChunkManagerFactory(memoryChunkManagerFactory)
+ .memoryChunkAllocator(memoryChunkAllocator)
.build());
if (DEBUG) {
@@ -386,6 +398,8 @@ public synchronized void close() {
isClosed = true;
+ memoryChunkManagerFactory.close();
+
StringBuilder outstandingChildAllocators = new StringBuilder();
if (DEBUG) {
synchronized (DEBUG_LOCK) {
@@ -494,7 +508,7 @@ private void hist(String noteFormat, Object... args) {
* @throws IllegalStateException when any problems are found
*/
void verifyAllocator() {
- final IdentityHashMap seen = new IdentityHashMap<>();
+ final IdentityHashMap seen = new IdentityHashMap<>();
verifyAllocator(seen);
}
@@ -508,7 +522,7 @@ void verifyAllocator() {
* @throws IllegalStateException when any problems are found
*/
private void verifyAllocator(
- final IdentityHashMap buffersSeen) {
+ final IdentityHashMap buffersSeen) {
// The remaining tests can only be performed if we're in debug mode.
if (!DEBUG) {
return;
@@ -555,7 +569,7 @@ private void verifyAllocator(
continue;
}
- final AllocationManager am = ledger.getAllocationManager();
+ final MemoryChunkManager am = ledger.getMemoryChunkManager();
/*
* Even when shared, ArrowBufs are rewrapped, so we should never see the same instance
* twice.
@@ -679,7 +693,7 @@ private void dumpBuffers(final StringBuilder sb, final Set ledgerS
if (!ledger.isOwningLedger()) {
continue;
}
- final AllocationManager am = ledger.getAllocationManager();
+ final MemoryChunkManager am = ledger.getMemoryChunkManager();
sb.append("UnsafeDirectLittleEndian[identityHashCode == ");
sb.append(Integer.toString(System.identityHashCode(am)));
sb.append("] size ");
@@ -734,19 +748,19 @@ public RoundingPolicy getRoundingPolicy() {
@Value.Immutable
public abstract static class Config {
/**
- * Factory for creating {@link AllocationManager} instances.
+ * Factory for creating {@link MemoryChunkManager} instances.
*/
@Value.Default
- AllocationManager.Factory getAllocationManagerFactory() {
- return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory();
+ MemoryChunkManager.Factory getMemoryChunkManagerFactory() {
+ return MemoryChunkManager.FACTORY;
}
/**
- * Factory for creating {@link BufferLedger} instances.
+ * Factory for creating {@link MemoryChunk} instances.
*/
@Value.Default
- BufferLedger.Factory getBufferLedgerFactory() {
- return LegacyBufferLedger.FACTORY;
+ MemoryChunkAllocator getMemoryChunkAllocator() {
+ return DefaultMemoryChunkAllocatorOption.getDefaultMemoryChunkAllocator();
}
/**
@@ -932,7 +946,8 @@ private ArrowBuf allocate(int nBytes) {
* as well, so we need to return the same number back to avoid double-counting them.
*/
try {
- final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+ final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes,
+ memoryChunkAllocator, null);
listener.onAllocation(nBytes);
if (DEBUG) {
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java
deleted file mode 100644
index be72223ce8610..0000000000000
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-/**
- * Standard implementation of {@link ReferenceManager} backed by a
- * {@link BufferLedger}.
- */
-public class BaseReferenceManager implements ReferenceManager {
- private final BufferLedger ledger;
- private final BufferAllocator allocator;
- private final AllocationManager allocationManager;
-
- public BaseReferenceManager(BufferLedger ledger) {
- this.ledger = ledger;
- this.allocator = ledger.getAllocator();
- this.allocationManager = ledger.getAllocationManager();
- }
-
- @Override
- public int getRefCount() {
- return ledger.getRefCount();
- }
-
- @Override
- public boolean release() {
- return ledger.release();
- }
-
- @Override
- public boolean release(int decrement) {
- return ledger.release(decrement);
- }
-
- @Override
- public void retain() {
- ledger.retain();
- }
-
- @Override
- public void retain(int increment) {
- ledger.retain(increment);
- }
-
- /**
- * Derive a new ArrowBuf from a given source ArrowBuf. The new derived
- * ArrowBuf will share the same reference count as rest of the ArrowBufs
- * associated with this ledger. This operation is typically used for
- * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at
- * a particular index in the underlying memory and having access to a
- * particular length (in bytes) of data in memory chunk.
- *
- * This method is also used as a helper for transferring ownership and retain to target
- * allocator.
- *
- * @param sourceBuffer source ArrowBuf
- * @param index index (relative to source ArrowBuf) new ArrowBuf should be
- * derived from
- * @param length length (bytes) of data in underlying memory that derived buffer will
- * have access to in underlying memory
- * @return derived buffer
- */
- @Override
- public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
- return ledger.deriveBuffer(sourceBuffer, index, length);
- }
-
- /**
- * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
- * memory ownership and accounting. This has no impact on the reference counting for the current
- * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created
- * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this
- * memory is being associated with the target allocator or in other words allocation manager currently
- * doesn't hold a mapping for the target allocator) or the current value of the reference count for
- * the target allocator-reference manager combination + 1 in the case that the provided allocator
- * already had an association to this underlying memory.
- *
- *
- * @param srcBuffer source ArrowBuf
- * @param target The target allocator to create an association with.
- * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf.
- */
- @Override
- public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
-
- if (BaseAllocator.DEBUG) {
- ledger.logEvent("retain(%s)", target.getName());
- }
-
- // the call to associate will return the corresponding reference manager (buffer ledger) for
- // the target allocator. if the allocation manager didn't already have a mapping
- // for the target allocator, it will create one and return the new reference manager with a
- // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
- // alternatively, if there was already a mapping for in
- // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
- // and this will be true for all the existing buffers currently managed by targetrefmanager
- final BufferLedger targetRefManager = allocationManager.associate(target);
- // create a new ArrowBuf to associate with new allocator and target ref manager
- final long targetBufLength = srcBuffer.capacity();
- ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
- targetArrowBuf.readerIndex(srcBuffer.readerIndex());
- targetArrowBuf.writerIndex(srcBuffer.writerIndex());
- return targetArrowBuf;
- }
-
- /**
- * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
- * This will generate a new ArrowBuf that carries an association with the underlying memory
- * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory,
- * that memory ownership/accounting will be transferred to the target allocator. If this
- * ArrowBuf does not currently own the memory underlying it (and is only associated with it),
- * this does not transfer any ownership to the newly created ArrowBuf.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created
- * ArrowBuf with either have a reference count of 1 (in the case that this is the first time
- * this memory is being associated with the new allocator) or the current value of the reference
- * count for the other AllocationManager/BufferLedger combination + 1 in the case that the provided
- * allocator already had an association to this underlying memory.
- *
- *
- * Transfers will always succeed, even if that puts the other allocator into an overlimit
- * situation. This is possible due to the fact that the original owning allocator may have
- * allocated this memory out of a local reservation whereas the target allocator may need to
- * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless
- * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
- * to an actual overlimit==true condition. This is simply conservative behavior which means we may
- * return overlimit slightly sooner than is necessary.
- *
- *
- * @param target The allocator to transfer ownership to.
- * @return A new transfer result with the impact of the transfer (whether it was overlimit) as
- * well as the newly created ArrowBuf.
- */
- @Override
- public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) {
- // the call to associate will return the corresponding reference manager (buffer ledger) for
- // the target allocator. if the allocation manager didn't already have a mapping
- // for the target allocator, it will create one and return the new reference manager with a
- // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
- // alternatively, if there was already a mapping for in
- // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
- // and this will be true for all the existing buffers currently managed by targetrefmanager
- final BufferLedger targetLedger = allocationManager.associate(target);
- // create a new ArrowBuf to associate with new allocator and target ref manager
- final long targetBufLength = srcBuffer.capacity();
- final ArrowBuf targetArrowBuf = targetLedger.deriveBuffer(srcBuffer, 0, targetBufLength);
- targetArrowBuf.readerIndex(srcBuffer.readerIndex());
- targetArrowBuf.writerIndex(srcBuffer.writerIndex());
- final boolean allocationFit = ledger.transferBalance(targetLedger);
- return new TransferResult(allocationFit, targetArrowBuf);
- }
-
- @Override
- public BufferAllocator getAllocator() {
- return ledger.getAllocator();
- }
-
- @Override
- public long getSize() {
- return ledger.getSize();
- }
-
- @Override
- public long getAccountedSize() {
- return ledger.getAccountedSize();
- }
-
- /**
- * The outcome of a Transfer.
- */
- public static class TransferResult implements OwnershipTransferResult {
-
- // Whether this transfer fit within the target allocator's capacity.
- final boolean allocationFit;
-
- // The newly created buffer associated with the target allocator
- public final ArrowBuf buffer;
-
- private TransferResult(boolean allocationFit, ArrowBuf buffer) {
- this.allocationFit = allocationFit;
- this.buffer = buffer;
- }
-
- @Override
- public ArrowBuf getTransferredBuffer() {
- return buffer;
- }
-
- @Override
- public boolean getAllocationFit() {
- return allocationFit;
- }
- }
-}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index f36f9662341c6..740e30573738c 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -52,6 +52,15 @@ public interface BufferAllocator extends AutoCloseable {
*/
ArrowBuf buffer(long size, BufferManager manager);
+ /**
+ * Allocate a new buffer using the given memory chunk. The output buffer size will equal
+ * to the chunk size by default.
+ *
+ * @param chunk A memory chunk.
+ * @return a new ArrowBuf
+ */
+ ArrowBuf buffer(MemoryChunk chunk);
+
/**
* Get the root allocator of this allocator. If this allocator is already a root, return
* this directly.
@@ -175,14 +184,6 @@ BufferAllocator newChildAllocator(
*/
Collection getChildAllocators();
-
- /**
- * Returns {@link BufferLedger.Factory} used by this allocator.
- *
- * @return the buffer ledger factory
- */
- BufferLedger.Factory getBufferLedgerFactory();
-
/**
* Create an allocation reservation. A reservation is a way of building up
* a request for a buffer whose size is not known in advance. See
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
index 42bd959c2554e..472897a64058e 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
@@ -18,6 +18,7 @@
package org.apache.arrow.memory;
import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.util.CommonUtil;
@@ -25,29 +26,57 @@
import org.apache.arrow.util.Preconditions;
/**
- * The reference manager that binds an {@link AllocationManager} to
+ * The reference manager that binds an {@link MemoryChunkManager} to
* {@link BufferAllocator} and a set of {@link ArrowBuf}. The set of
* ArrowBufs managed by this reference manager share a common
* fate (same reference count).
*/
-public abstract class BufferLedger implements ValueWithKeyIncluded, ReferenceCountAware {
- private final IdentityHashMap buffers =
+public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager {
+ private final IdentityHashMap buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<>() : null;
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
// unique ID assigned to each ledger
private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet();
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
// manage request for retain
// correctly
private final long lCreationTime = System.nanoTime();
private final BufferAllocator allocator;
- private final AllocationManager allocationManager;
+ private final MemoryChunkManager memoryChunkManager;
private final HistoricalLog historicalLog =
BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1) : null;
+ private volatile long lDestructionTime = 0;
- BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
+ BufferLedger(final BufferAllocator allocator, final MemoryChunkManager memoryChunkManager) {
this.allocator = allocator;
- this.allocationManager = allocationManager;
+ this.memoryChunkManager = memoryChunkManager;
+ }
+
+ boolean isOwningLedger() {
+ return this == memoryChunkManager.getOwningLedger();
+ }
+
+ public BufferAllocator getKey() {
+ return allocator;
+ }
+
+ /**
+ * Get the buffer allocator associated with this reference manager.
+ * @return buffer allocator
+ */
+ @Override
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ /**
+ * Get this ledger's reference count.
+ * @return reference count
+ */
+ @Override
+ public int getRefCount() {
+ return bufRefCnt.get();
}
/**
@@ -55,62 +84,132 @@ public abstract class BufferLedger implements ValueWithKeyIncluded= 1,
+ "ref count decrement should be greater than or equal to 1");
+ // decrement the ref count
+ final int refCnt = decrement(decrement);
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("release(%d). original value: %d",
+ decrement, refCnt + decrement);
+ }
+ // the new ref count should be >= 0
+ Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative");
+ return refCnt == 0;
+ }
/**
- * Used by an allocator to create a new ArrowBuf. This is provided
- * as a helper method for the allocator when it allocates a new memory chunk
- * using a new instance of allocation manager and creates a new reference manager
- * too.
+ * Decrement the ledger's reference count for the associated underlying
+ * memory chunk. If the reference count drops to 0, it implies that
+ * no ArrowBufs managed by this reference manager need access to the memory
+ * chunk. In that case, the ledger should inform the MemoryChunkManager
+ * about releasing its ownership for the chunk. Whether or not the memory
+ * chunk will be released is something that {@link MemoryChunkManager} will
+ * decide since tracks the usage of memory chunk across multiple reference
+ * managers and allocators.
*
- * @param length The length in bytes that this ArrowBuf will provide access to.
- * @param manager An optional BufferManager argument that can be used to manage expansion of
- * this ArrowBuf
- * @return A new ArrowBuf that shares references with all ArrowBufs associated
- * with this BufferLedger
+ * @param decrement amount to decrease the reference count by
+ * @return the new reference count
*/
- ArrowBuf newArrowBuf(final long length, final BufferManager manager) {
+ private int decrement(int decrement) {
allocator.assertOpen();
+ final int outcome;
+ synchronized (memoryChunkManager) {
+ outcome = bufRefCnt.addAndGet(-decrement);
+ if (outcome == 0) {
+ lDestructionTime = System.nanoTime();
+ // refcount of this reference manager has dropped to 0
+ // inform the MemoryChunkManager that this reference manager
+ // no longer holds references to underlying memory
+ memoryChunkManager.release(this);
+ }
+ }
+ return outcome;
+ }
- // the start virtual address of the ArrowBuf will be same as address of memory chunk
- final long startAddress = allocationManager.memoryAddress();
-
- // create ArrowBuf
- final ArrowBuf buf = new ArrowBuf(newReferenceManager(), manager, length, startAddress);
+ /**
+ * Increment the ledger's reference count for associated
+ * underlying memory chunk by 1.
+ */
+ @Override
+ public void retain() {
+ retain(1);
+ }
- // logging
+ /**
+ * Increment the ledger's reference count for associated
+ * underlying memory chunk by the given amount.
+ *
+ * @param increment amount to increase the reference count by
+ */
+ @Override
+ public void retain(int increment) {
+ Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
if (BaseAllocator.DEBUG) {
- logEvent(
- "ArrowBuf(BufferLedger, BufferAllocator[%s], " +
- "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d",
- allocator.getName(), System.identityHashCode(buf), buf.toString(),
- System.identityHashCode(this));
-
- synchronized (buffers) {
- buffers.put(buf.createLogger(), null);
- }
+ historicalLog.recordEvent("retain(%d)", increment);
}
+ final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
+ Preconditions.checkArgument(originalReferenceCount > 0);
+ }
- return buf;
+ @Override
+ public boolean isOpen() {
+ return getRefCount() > 0 && !memoryChunkManager.isChunkDestroyed();
}
- ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
+ /**
+ * Derive a new ArrowBuf from a given source ArrowBuf. The new derived
+ * ArrowBuf will share the same reference count as rest of the ArrowBufs
+ * associated with this ledger. This operation is typically used for
+ * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at
+ * a particular index in the underlying memory and having access to a
+ * particular length (in bytes) of data in memory chunk.
+ *
+ * This method is also used as a helper for transferring ownership and retain to target
+ * allocator.
+ *
+ * @param sourceBuffer source ArrowBuf
+ * @param index index (relative to source ArrowBuf) new ArrowBuf should be
+ * derived from
+ * @param length length (bytes) of data in underlying memory that derived buffer will
+ * have access to in underlying memory
+ * @return derived buffer
+ */
+ @Override
+ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
/*
* Usage type 1 for deriveBuffer():
* Used for slicing where index represents a relative index in the source ArrowBuf
@@ -132,7 +231,7 @@ ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
// create new ArrowBuf
final ArrowBuf derivedBuf = new ArrowBuf(
- newReferenceManager(),
+ this,
null,
length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf
derivedBufferAddress // starting byte address in the underlying memory for this new ArrowBuf
@@ -140,7 +239,7 @@ ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
// logging
if (BaseAllocator.DEBUG) {
- logEvent(
+ historicalLog.recordEvent(
"ArrowBuf(BufferLedger, BufferAllocator[%s], " +
"UnsafeDirectLittleEndian[identityHashCode == " +
"%d](%s)) => ledger hc == %d",
@@ -148,40 +247,117 @@ ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
System.identityHashCode(this));
synchronized (buffers) {
- buffers.put(derivedBuf.createLogger(), null);
+ buffers.put(derivedBuf, null);
}
}
return derivedBuf;
}
+ /**
+ * Used by an allocator to create a new ArrowBuf. This is provided
+ * as a helper method for the allocator when it allocates a new memory chunk
+ * using a new instance of MemoryChunkManager and creates a new reference manager
+ * too.
+ *
+ * @param length The length in bytes that this ArrowBuf will provide access to.
+ * @param manager An optional BufferManager argument that can be used to manage expansion of
+ * this ArrowBuf
+ * @return A new ArrowBuf that shares references with all ArrowBufs associated
+ * with this BufferLedger
+ */
+ ArrowBuf newArrowBuf(final long length, final BufferManager manager) {
+ allocator.assertOpen();
+
+ // the start virtual address of the ArrowBuf will be same as address of memory chunk
+ final long startAddress = memoryChunkManager.memoryAddress();
+
+ // create ArrowBuf
+ final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress);
+
+ // logging
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent(
+ "ArrowBuf(BufferLedger, BufferAllocator[%s], " +
+ "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d",
+ allocator.getName(), System.identityHashCode(buf), buf.toString(),
+ System.identityHashCode(this));
+
+ synchronized (buffers) {
+ buffers.put(buf, null);
+ }
+ }
+
+ return buf;
+ }
+
+ /**
+ * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
+ * memory ownership and accounting. This has no impact on the reference counting for the current
+ * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer.
+ *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this
+ * memory is being associated with the target allocator or in other words MemoryChunkManager currently
+ * doesn't hold a mapping for the target allocator) or the current value of the reference count for
+ * the target allocator-reference manager combination + 1 in the case that the provided allocator
+ * already had an association to this underlying memory.
+ *
+ *
+ * @param srcBuffer source ArrowBuf
+ * @param target The target allocator to create an association with.
+ * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf.
+ */
+ @Override
+ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
+
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("retain(%s)", target.getName());
+ }
+
+ // the call to associate will return the corresponding reference manager (buffer ledger) for
+ // the target allocator. if the MemoryChunkManager didn't already have a mapping
+ // for the target allocator, it will create one and return the new reference manager with a
+ // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
+ // alternatively, if there was already a mapping for in
+ // MemoryChunkManager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
+ // and this will be true for all the existing buffers currently managed by targetrefmanager
+ final BufferLedger targetRefManager = memoryChunkManager.associate(target);
+ // create a new ArrowBuf to associate with new allocator and target ref manager
+ final long targetBufLength = srcBuffer.capacity();
+ ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
+ targetArrowBuf.readerIndex(srcBuffer.readerIndex());
+ targetArrowBuf.writerIndex(srcBuffer.writerIndex());
+ return targetArrowBuf;
+ }
+
/**
* Transfer any balance the current ledger has to the target ledger. In the case
* that the current ledger holds no memory, no transfer is made to the new ledger.
*
- * @param targetLedger The ledger to transfer ownership account to.
+ * @param targetReferenceManager The ledger to transfer ownership account to.
* @return Whether transfer fit within target ledgers limits.
*/
- boolean transferBalance(final BufferLedger targetLedger) {
- Preconditions.checkArgument(targetLedger != null,
+ boolean transferBalance(final ReferenceManager targetReferenceManager) {
+ Preconditions.checkArgument(targetReferenceManager != null,
"Expecting valid target reference manager");
- final BufferAllocator targetAllocator = targetLedger.getAllocator();
+ final BufferAllocator targetAllocator = targetReferenceManager.getAllocator();
Preconditions.checkArgument(allocator.getRoot() == targetAllocator.getRoot(),
"You can only transfer between two allocators that share the same root.");
allocator.assertOpen();
- targetLedger.getAllocator().assertOpen();
+ targetReferenceManager.getAllocator().assertOpen();
// if we're transferring to ourself, just return.
- if (targetLedger == this) {
+ if (targetReferenceManager == this) {
return true;
}
- // since two balance transfers out from the allocation manager could cause incorrect
+ // since two balance transfers out from the MemoryChunkManager could cause incorrect
// accounting, we need to ensure
- // that this won't happen by synchronizing on the allocation manager instance.
- synchronized (allocationManager) {
- if (allocationManager.getOwningLedger() != this) {
+ // that this won't happen by synchronizing on the MemoryChunkManager instance.
+ synchronized (memoryChunkManager) {
+ if (memoryChunkManager.getOwningLedger() != this) {
// since the calling reference manager is not the owning
// reference manager for the underlying memory, transfer is
// a NO-OP
@@ -189,26 +365,101 @@ boolean transferBalance(final BufferLedger targetLedger) {
}
if (BaseAllocator.DEBUG) {
- logEvent("transferBalance(%s)",
- targetLedger.getAllocator().getName());
+ this.historicalLog.recordEvent("transferBalance(%s)",
+ targetReferenceManager.getAllocator().getName());
}
- boolean overlimit = targetAllocator.forceAllocate(allocationManager.getSize());
- allocator.releaseBytes(allocationManager.getSize());
+ boolean overlimit = targetAllocator.forceAllocate(memoryChunkManager.getSize());
+ allocator.releaseBytes(memoryChunkManager.getSize());
// since the transfer can only happen from the owning reference manager,
// we need to set the target ref manager as the new owning ref manager
- // for the chunk of memory in allocation manager
- allocationManager.setOwningLedger(targetLedger);
+ // for the chunk of memory in MemoryChunkManager
+ memoryChunkManager.setOwningLedger((BufferLedger) targetReferenceManager);
return overlimit;
}
}
+ /**
+ * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
+ * This will generate a new ArrowBuf that carries an association with the underlying memory
+ * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory,
+ * that memory ownership/accounting will be transferred to the target allocator. If this
+ * ArrowBuf does not currently own the memory underlying it (and is only associated with it),
+ * this does not transfer any ownership to the newly created ArrowBuf.
+ *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a reference count of 1 (in the case that this is the first time
+ * this memory is being associated with the new allocator) or the current value of the reference
+ * count for the other MemoryChunkManager/BufferLedger combination + 1 in the case that the provided
+ * allocator already had an association to this underlying memory.
+ *
+ *
+ * Transfers will always succeed, even if that puts the other allocator into an overlimit
+ * situation. This is possible due to the fact that the original owning allocator may have
+ * allocated this memory out of a local reservation whereas the target allocator may need to
+ * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless
+ * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
+ * to an actual overlimit==true condition. This is simply conservative behavior which means we may
+ * return overlimit slightly sooner than is necessary.
+ *
+ *
+ * @param target The allocator to transfer ownership to.
+ * @return A new transfer result with the impact of the transfer (whether it was overlimit) as
+ * well as the newly created ArrowBuf.
+ */
+ @Override
+ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) {
+ // the call to associate will return the corresponding reference manager (buffer ledger) for
+ // the target allocator. if the MemoryChunkManager didn't already have a mapping
+ // for the target allocator, it will create one and return the new reference manager with a
+ // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
+ // alternatively, if there was already a mapping for in
+ // MemoryChunkManager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
+ // and this will be true for all the existing buffers currently managed by targetrefmanager
+ final BufferLedger targetRefManager = memoryChunkManager.associate(target);
+ // create a new ArrowBuf to associate with new allocator and target ref manager
+ final long targetBufLength = srcBuffer.capacity();
+ final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
+ targetArrowBuf.readerIndex(srcBuffer.readerIndex());
+ targetArrowBuf.writerIndex(srcBuffer.writerIndex());
+ final boolean allocationFit = transferBalance(targetRefManager);
+ return new TransferResult(allocationFit, targetArrowBuf);
+ }
+
+ /**
+ * The outcome of a Transfer.
+ */
+ public class TransferResult implements OwnershipTransferResult {
+
+ // Whether this transfer fit within the target allocator's capacity.
+ final boolean allocationFit;
+
+ // The newly created buffer associated with the target allocator
+ public final ArrowBuf buffer;
+
+ private TransferResult(boolean allocationFit, ArrowBuf buffer) {
+ this.allocationFit = allocationFit;
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ArrowBuf getTransferredBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public boolean getAllocationFit() {
+ return allocationFit;
+ }
+ }
+
/**
* Total size (in bytes) of memory underlying this reference manager.
* @return Size (in bytes) of the memory chunk
*/
+ @Override
public long getSize() {
- return allocationManager.getSize();
+ return memoryChunkManager.getSize();
}
/**
@@ -217,10 +468,11 @@ public long getSize() {
* is not the owning ledger associated with this memory.
* @return Amount of accounted(owned) memory associated with this ledger.
*/
+ @Override
public long getAccountedSize() {
- synchronized (allocationManager) {
- if (allocationManager.getOwningLedger() == this) {
- return allocationManager.getSize();
+ synchronized (memoryChunkManager) {
+ if (memoryChunkManager.getOwningLedger() == this) {
+ return memoryChunkManager.getSize();
} else {
return 0;
}
@@ -243,11 +495,11 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
.append("), isOwning: ")
.append(", size: ")
.append(", references: ")
- .append(getRefCount())
+ .append(bufRefCnt.get())
.append(", life: ")
.append(lCreationTime)
.append("..")
- .append(getDestructionTime())
+ .append(lDestructionTime)
.append(", allocatorManager: [")
.append(", life: ");
@@ -258,8 +510,8 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
sb.append("] holds ")
.append(buffers.size())
.append(" buffers. \n");
- for (ArrowBuf.Logger bufLogger : buffers.keySet()) {
- bufLogger.print(sb, indent + 2, verbosity);
+ for (ArrowBuf buf : buffers.keySet()) {
+ buf.print(sb, indent + 2, verbosity);
sb.append('\n');
}
}
@@ -267,58 +519,12 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
}
/**
- * Get the {@link AllocationManager} used by this BufferLedger.
- *
- * @return The AllocationManager used by this BufferLedger.
- */
- AllocationManager getAllocationManager() {
- return allocationManager;
- }
-
- /**
- * Record a single log line into this ledger's historical log.
- */
- protected void logEvent(final String noteFormat, Object... args) {
- historicalLog.recordEvent(noteFormat, args);
- }
-
- /**
- * If this ledger is the owning ledger of the underlying allocation manager.
+ * Get the {@link MemoryChunkManager} used by this BufferLedger.
*
- * @return true if this ledger owns its allocation manager
- */
- boolean isOwningLedger() {
- return this == allocationManager.getOwningLedger();
- }
-
- /**
- * Get the buffer allocator associated with this reference manager.
- * @return buffer allocator
- */
- BufferAllocator getAllocator() {
- return allocator;
- }
-
- /**
- * Get allocator key. Used by {@link LowCostIdentityHashMap}.
+ * @return The MemoryChunkManager used by this BufferLedger.
*/
- public BufferAllocator getKey() {
- return allocator;
+ public MemoryChunkManager getMemoryChunkManager() {
+ return memoryChunkManager;
}
- /**
- * A factory interface for creating {@link BufferLedger}.
- */
- public interface Factory {
- /**
- * Create an instance of {@link BufferLedger}.
- *
- * @param allocator The allocator that will bind to the newly created {@link BufferLedger}.
- * @param allocationManager The {@link AllocationManager} that actually holds the underlying
- * memory block. Note that the newly created {@link BufferLedger} will
- * not be the one that actually owns this piece of memory by default.
- * @return The created {@link BufferLedger}.
- */
- BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager);
- }
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/CheckAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/CheckAllocator.java
index 79b825aa2e898..fff400e9ebde4 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/CheckAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/CheckAllocator.java
@@ -31,7 +31,7 @@
*/
final class CheckAllocator {
private static final Logger logger = LoggerFactory.getLogger(CheckAllocator.class);
- private static final String ALLOCATOR_PATH = "org/apache/arrow/memory/DefaultAllocationManagerFactory.class";
+ private static final String ALLOCATOR_PATH = "org/apache/arrow/memory/DefaultMemoryChunkAllocator.class";
private CheckAllocator() {
@@ -41,7 +41,7 @@ static String check() {
Set urls = scanClasspath();
URL rootAllocator = assertOnlyOne(urls);
reportResult(rootAllocator);
- return "org.apache.arrow.memory.DefaultAllocationManagerFactory";
+ return "org.apache.arrow.memory.DefaultMemoryChunkAllocator";
}
@@ -70,15 +70,15 @@ private static Set scanClasspath() {
private static void reportResult(URL rootAllocator) {
String path = rootAllocator.getPath();
String subPath = path.substring(path.indexOf("memory"));
- logger.info("Using DefaultAllocationManager at {}", subPath);
+ logger.info("Using DefaultMemoryChunkAllocator at {}", subPath);
}
private static URL assertOnlyOne(Set urls) {
if (urls.size() > 1) {
- logger.warn("More than one DefaultAllocationManager on classpath. Choosing first found");
+ logger.warn("More than one DefaultMemoryChunkAllocator on classpath. Choosing first found");
}
if (urls.isEmpty()) {
- throw new RuntimeException("No DefaultAllocationManager found on classpath. Can't allocate Arrow buffers." +
+ throw new RuntimeException("No DefaultMemoryChunkAllocator found on classpath. Can't allocate Arrow buffers." +
" Please consider adding arrow-memory-netty or arrow-memory-unsafe as a dependency.");
}
return urls.iterator().next();
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerOption.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerOption.java
deleted file mode 100644
index 15120c252fca3..0000000000000
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerOption.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import java.lang.reflect.Field;
-
-/**
- * A class for choosing the default allocation manager.
- */
-public class DefaultAllocationManagerOption {
-
- /**
- * The environmental variable to set the default allocation manager type.
- */
- public static final String ALLOCATION_MANAGER_TYPE_ENV_NAME = "ARROW_ALLOCATION_MANAGER_TYPE";
-
- /**
- * The system property to set the default allocation manager type.
- */
- public static final String ALLOCATION_MANAGER_TYPE_PROPERTY_NAME = "arrow.allocation.manager.type";
-
- static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DefaultAllocationManagerOption.class);
-
- /**
- * The default allocation manager factory.
- */
- private static AllocationManager.Factory DEFAULT_ALLOCATION_MANAGER_FACTORY = null;
-
- /**
- * The allocation manager type.
- */
- public enum AllocationManagerType {
- /**
- * Netty based allocation manager.
- */
- Netty,
-
- /**
- * Unsafe based allocation manager.
- */
- Unsafe,
-
- /**
- * Unknown type.
- */
- Unknown,
- }
-
- static AllocationManagerType getDefaultAllocationManagerType() {
- AllocationManagerType ret = AllocationManagerType.Unknown;
-
- try {
- String envValue = System.getenv(ALLOCATION_MANAGER_TYPE_ENV_NAME);
- ret = AllocationManagerType.valueOf(envValue);
- } catch (IllegalArgumentException | NullPointerException e) {
- // ignore the exception, and make the allocation manager type remain unchanged
- }
-
- // system property takes precedence
- try {
- String propValue = System.getProperty(ALLOCATION_MANAGER_TYPE_PROPERTY_NAME);
- ret = AllocationManagerType.valueOf(propValue);
- } catch (IllegalArgumentException | NullPointerException e) {
- // ignore the exception, and make the allocation manager type remain unchanged
- }
- return ret;
- }
-
- static AllocationManager.Factory getDefaultAllocationManagerFactory() {
- if (DEFAULT_ALLOCATION_MANAGER_FACTORY != null) {
- return DEFAULT_ALLOCATION_MANAGER_FACTORY;
- }
- AllocationManagerType type = getDefaultAllocationManagerType();
- switch (type) {
- case Netty:
- DEFAULT_ALLOCATION_MANAGER_FACTORY = getNettyFactory();
- break;
- case Unsafe:
- DEFAULT_ALLOCATION_MANAGER_FACTORY = getUnsafeFactory();
- break;
- case Unknown:
- LOGGER.info("allocation manager type not specified, using netty as the default type");
- DEFAULT_ALLOCATION_MANAGER_FACTORY = getFactory(CheckAllocator.check());
- break;
- default:
- throw new IllegalStateException("Unknown allocation manager type: " + type);
- }
- return DEFAULT_ALLOCATION_MANAGER_FACTORY;
- }
-
- private static AllocationManager.Factory getFactory(String clazzName) {
- try {
- Field field = Class.forName(clazzName).getDeclaredField("FACTORY");
- field.setAccessible(true);
- return (AllocationManager.Factory) field.get(null);
- } catch (Exception e) {
- throw new RuntimeException("Unable to instantiate Allocation Manager for " + clazzName, e);
- }
- }
-
- private static AllocationManager.Factory getUnsafeFactory() {
- try {
- return getFactory("org.apache.arrow.memory.UnsafeAllocationManager");
- } catch (RuntimeException e) {
- throw new RuntimeException("Please add arrow-memory-unsafe to your classpath," +
- " No DefaultAllocationManager found to instantiate an UnsafeAllocationManager", e);
- }
- }
-
- private static AllocationManager.Factory getNettyFactory() {
- try {
- return getFactory("org.apache.arrow.memory.NettyAllocationManager");
- } catch (RuntimeException e) {
- throw new RuntimeException("Please add arrow-memory-netty to your classpath," +
- " No DefaultAllocationManager found to instantiate an NettyAllocationManager", e);
- }
- }
-}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultMemoryChunkAllocatorOption.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultMemoryChunkAllocatorOption.java
new file mode 100644
index 0000000000000..9adc20344eb93
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DefaultMemoryChunkAllocatorOption.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import java.lang.reflect.Field;
+import java.util.Optional;
+
+/**
+ * A class for choosing the default memory chunk.
+ */
+public class DefaultMemoryChunkAllocatorOption {
+
+ /**
+ * The environmental variable to set the default chunk allocator type.
+ */
+ public static final String ALLOCATION_MANAGER_TYPE_ENV_NAME = "ARROW_CHUNK_ALLOCATOR_TYPE";
+
+ /**
+ * The system property to set the default chunk allocator type.
+ */
+ public static final String ALLOCATION_MANAGER_TYPE_PROPERTY_NAME = "arrow.chunk.allocator.type";
+
+ /**
+ * The environmental variable to set the default MemoryChunkManager type.
+ *
+ * @deprecated the value has been deprecated. Use ALLOCATION_MANAGER_TYPE_ENV_NAME instead.
+ */
+ @Deprecated
+ public static final String OBSOLETE_ALLOCATION_MANAGER_TYPE_ENV_NAME = "ARROW_ALLOCATION_MANAGER_TYPE";
+
+ /**
+ * The system property to set the default MemoryChunkManager type.
+ *
+ * @deprecated the value has been deprecated. Use ALLOCATION_MANAGER_TYPE_PROPERTY_NAME instead.
+ */
+ @Deprecated
+ public static final String OBSOLETE_ALLOCATION_MANAGER_TYPE_PROPERTY_NAME = "arrow.allocation.manager.type";
+
+ static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DefaultMemoryChunkAllocatorOption.class);
+
+ /**
+ * The default memory chunk allocator.
+ */
+ private static MemoryChunkAllocator DEFAULT_CHUNK_ALLOCATOR = null;
+
+ /**
+ * The chunk allocator type.
+ */
+ public enum MemoryChunkAllocatorType {
+ /**
+ * Netty based chunk allocator.
+ */
+ Netty,
+
+ /**
+ * Unsafe based chunk allocator.
+ */
+ Unsafe,
+
+ /**
+ * Unknown type.
+ */
+ Unknown,
+ }
+
+ private static String getEnvWithFallbackKey(String key, String fallbackKey) {
+ return Optional.of(System.getenv(key)).orElse(System.getenv(fallbackKey));
+ }
+
+ private static String getPropertyWithFallbackKey(String key, String fallbackKey) {
+ return Optional.of(System.getProperty(key)).orElse(System.getenv(fallbackKey));
+ }
+
+ static MemoryChunkAllocatorType getDefaultMemoryChunkAllocatorType() {
+ MemoryChunkAllocatorType ret = MemoryChunkAllocatorType.Unknown;
+
+ try {
+ String envValue = getEnvWithFallbackKey(ALLOCATION_MANAGER_TYPE_ENV_NAME,
+ OBSOLETE_ALLOCATION_MANAGER_TYPE_ENV_NAME);
+ ret = MemoryChunkAllocatorType.valueOf(envValue);
+ } catch (IllegalArgumentException | NullPointerException e) {
+ // ignore the exception, and make the chunk allocator type remain unchanged
+ }
+
+ // system property takes precedence
+ try {
+ String propValue = getPropertyWithFallbackKey(ALLOCATION_MANAGER_TYPE_PROPERTY_NAME,
+ OBSOLETE_ALLOCATION_MANAGER_TYPE_PROPERTY_NAME);
+ ret = MemoryChunkAllocatorType.valueOf(propValue);
+ } catch (IllegalArgumentException | NullPointerException e) {
+ // ignore the exception, and make the chunk allocator type remain unchanged
+ }
+ return ret;
+ }
+
+ static MemoryChunkAllocator getDefaultMemoryChunkAllocator() {
+ if (DEFAULT_CHUNK_ALLOCATOR != null) {
+ return DEFAULT_CHUNK_ALLOCATOR;
+ }
+ MemoryChunkAllocatorType type = getDefaultMemoryChunkAllocatorType();
+ switch (type) {
+ case Netty:
+ DEFAULT_CHUNK_ALLOCATOR = getNettyAllocator();
+ break;
+ case Unsafe:
+ DEFAULT_CHUNK_ALLOCATOR = getUnsafeAllocator();
+ break;
+ case Unknown:
+ LOGGER.info("memory chunk allocator type not specified, using netty as the default type");
+ DEFAULT_CHUNK_ALLOCATOR = getAllocator(CheckAllocator.check());
+ break;
+ default:
+ throw new IllegalStateException("Unknown memory chunk allocator type: " + type);
+ }
+ return DEFAULT_CHUNK_ALLOCATOR;
+ }
+
+ private static MemoryChunkAllocator getAllocator(String clazzName) {
+ try {
+ Field field = Class.forName(clazzName).getDeclaredField("ALLOCATOR");
+ field.setAccessible(true);
+ return (MemoryChunkAllocator) field.get(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate MemoryChunkAllocator for " + clazzName, e);
+ }
+ }
+
+ private static MemoryChunkAllocator getUnsafeAllocator() {
+ try {
+ return getAllocator("org.apache.arrow.memory.UnsafeMemoryChunk");
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Please add arrow-memory-unsafe to your classpath," +
+ " No DefaultMemoryChunkAllocator found to instantiate an UnsafeMemoryChunk", e);
+ }
+ }
+
+ private static MemoryChunkAllocator getNettyAllocator() {
+ try {
+ return getAllocator("org.apache.arrow.memory.NettyMemoryChunk");
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Please add arrow-memory-netty to your classpath," +
+ " No DefaultMemoryChunkAllocator found to instantiate an NettyMemoryChunk", e);
+ }
+ }
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java
deleted file mode 100644
index 76e69f9257076..0000000000000
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.arrow.util.Preconditions;
-
-/**
- * Legacy implementation of {@link BufferLedger}. The reference count should be manually managed by
- * explicitly invoking methods {@link #retain()} and {@link #release()}, etc.
- */
-public class LegacyBufferLedger extends BufferLedger {
-
- public static final Factory FACTORY = new Factory() {
- @Override
- public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) {
- return new LegacyBufferLedger(allocator, allocationManager);
- }
- };
-
- private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
- private volatile long lDestructionTime = 0;
-
- LegacyBufferLedger(BufferAllocator allocator, AllocationManager allocationManager) {
- super(allocator, allocationManager);
- }
-
- /**
- * Get this ledger's reference count.
- * @return reference count
- */
- @Override
- public int getRefCount() {
- return bufRefCnt.get();
- }
-
-
- @Override
- protected void increment() {
- bufRefCnt.incrementAndGet();
- }
-
- @Override
- protected long getDestructionTime() {
- return lDestructionTime;
- }
-
- @Override
- protected ReferenceManager newReferenceManager() {
- return new BaseReferenceManager(this);
- }
-
- /**
- * Decrement the ledger's reference count for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
- *
- * @param decrement amount to decrease the reference count by
- * @return the new reference count
- */
- private int decrement(int decrement) {
- getAllocator().assertOpen();
- final int outcome;
- synchronized (getAllocationManager()) {
- outcome = bufRefCnt.addAndGet(-decrement);
- if (outcome == 0) {
- lDestructionTime = System.nanoTime();
- // refcount of this reference manager has dropped to 0
- // inform the allocation manager that this reference manager
- // no longer holds references to underlying memory
- getAllocationManager().release(this);
- }
- }
- return outcome;
- }
-
- /**
- * Decrement the ledger's reference count by 1 for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
- * @return true if the new ref count has dropped to 0, false otherwise
- */
- @Override
- public boolean release() {
- return release(1);
- }
-
- /**
- * Decrement the ledger's reference count for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
- * @param decrement amount to decrease the reference count by
- * @return true if the new ref count has dropped to 0, false otherwise
- */
- @Override
- public boolean release(int decrement) {
- Preconditions.checkState(decrement >= 1,
- "ref count decrement should be greater than or equal to 1");
- // decrement the ref count
- final int refCnt = decrement(decrement);
- if (BaseAllocator.DEBUG) {
- logEvent("release(%d). original value: %d",
- decrement, refCnt + decrement);
- }
- // the new ref count should be >= 0
- Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative");
- return refCnt == 0;
- }
-
- /**
- * Increment the ledger's reference count for associated
- * underlying memory chunk by 1.
- */
- @Override
- public void retain() {
- retain(1);
- }
-
- /**
- * Increment the ledger's reference count for associated
- * underlying memory chunk by the given amount.
- *
- * @param increment amount to increase the reference count by
- */
- @Override
- public void retain(int increment) {
- Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
- if (BaseAllocator.DEBUG) {
- logEvent("retain(%d)", increment);
- }
- final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
- Preconditions.checkArgument(originalReferenceCount > 0);
- }
-
-
-
-}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunk.java
similarity index 63%
rename from java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java
rename to java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunk.java
index 0a4bd46e773b2..f465d35a82ab6 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunk.java
@@ -17,22 +17,26 @@
package org.apache.arrow.memory;
-import org.apache.arrow.memory.util.MemoryUtil;
-
/**
- * {@link AllocationListener} implementation to reserve bytes on JVM direct memory.
+ * The underlying memory chunk of {@link MemoryChunkManager}.
*/
-public class DirectAllocationListener implements AllocationListener {
-
- public static final DirectAllocationListener INSTANCE = new DirectAllocationListener();
+public interface MemoryChunk {
+ /**
+ * The size (in bytes) of this chunk.
+ *
+ * @return size of this chunk (in bytes).
+ */
+ long size();
- @Override
- public void onPreAllocation(long size) {
- MemoryUtil.reserveDirectMemory(size);
- }
+ /**
+ * The native address indicating this chunk.
+ *
+ * @return the native address of this chunk.
+ */
+ long memoryAddress();
- @Override
- public void onRelease(long size) {
- MemoryUtil.unreserveDirectMemory(size);
- }
+ /**
+ * Destroy and reclaim all memory spaces of this chunk.
+ */
+ void destroy();
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkAllocator.java
new file mode 100644
index 0000000000000..caaa198ce0185
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkAllocator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Factory interface of {@link MemoryChunk}.
+ */
+public interface MemoryChunkAllocator {
+
+ /**
+ * Allocate for new {@link MemoryChunk}.
+ *
+ * @param requestedSize the requested size of memory chunk. This could be different from the actual size of the
+ * allocated chunk which can be accessed within {@link MemoryChunk#size()}.
+ * @return the newly created {@link MemoryChunk}
+ */
+ MemoryChunk allocate(long requestedSize);
+
+ /**
+ * Return the empty {@link ArrowBuf} instance which internally holds a {@link MemoryChunk} within this allocator
+ * type.
+ *
+ * @return the empty {@link ArrowBuf} instance.
+ */
+ ArrowBuf empty();
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkCleaner.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkCleaner.java
new file mode 100644
index 0000000000000..1feeb72d089ad
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/MemoryChunkCleaner.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.util.VisibleForTesting;
+
+import sun.misc.JavaLangRefAccess;
+import sun.misc.SharedSecrets;
+
+/**
+ * A {@link MemoryChunkManager} implementation to clean up managed chunk automatically by leveraging JVM
+ * garbage collector. This is typically used for the scenario that buffers are often created with large size
+ * and being shared everywhere. For the case of extremely high TPS with small buffer sizes, completely
+ * relying on this facility is not recommended since the GC overhead will be comparatively high. For that
+ * case, a hybrid mode (manual + gc) is provided via {@link MemoryChunkCleaner.Mode#HYBRID}.
+ *
+ *
+ * Note when this implementation is used, an allocator-wise cleanup operation can be manually
+ * triggered via {@link BufferAllocator#close()}, which internally calls {@link ChunkWeakRefLifecycles#cleanAll()}.
+ *
+ *
+ *
+ * Also, the built-in {@link AllocationListener}, {@link GarbageCollectorTrigger} must be set to allocator when
+ * {@link MemoryChunkCleaner} is used. {@link GarbageCollectorTrigger} triggers GC automatically when the allocator
+ * is full. A static instance of {@link GarbageCollectorTrigger} can be retrieved via
+ * {@link MemoryChunkCleaner#gcTrigger()}. There is a detector against this so if {@link GarbageCollectorTrigger}
+ * is not set, an error will be thrown when user tries to allocate buffers.
+ *
+ */
+public class MemoryChunkCleaner extends MemoryChunkManager {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryChunkCleaner.class);
+
+ private final ChunkWeakRef chunkWeakRef;
+ private final Mode mode;
+
+ private MemoryChunkCleaner(SimpleChunkWrapper wrapper, ChunkWeakRefLifecycles lifecycles,
+ Mode mode) {
+ super(wrapper);
+ this.chunkWeakRef = ChunkWeakRef.create(wrapper, lifecycles, wrapper.getChunk(),
+ getOwnerManager(), mode);
+ this.mode = mode;
+ }
+
+ @Override
+ void associateLedger(BufferAllocator allocator, BufferLedger ledger) {
+ checkGCTriggerIsSet(allocator);
+ // Do not actually associate to prevent a strong reference being held by the target allocator
+ }
+
+ private void checkGCTriggerIsSet(BufferAllocator allocator) {
+ if (!(allocator.getListener() instanceof GarbageCollectorTrigger)) {
+ throw new IllegalStateException("GarbageCollectorTrigger should be set as the default " +
+ "AllocationListener of the parent BufferAllocator when MemoryChunkCleaner is used. " +
+ "Try using MemoryChunkCleaner#gcTrigger() to get a built-in instance.");
+ }
+ }
+
+ @Override
+ void dissociateLedger(BufferAllocator allocator, BufferLedger ledger) {
+ // Do not actually dissociate to prevent a strong being held by the target allocator
+ }
+
+ @Override
+ void doRelease() {
+ if (!mode.enableManualRelease) {
+ // Do nothing. Rely on GC to do release.
+ return;
+ }
+ chunkWeakRef.deallocate();
+ }
+
+ @Override
+ boolean isChunkDestroyed() {
+ return chunkWeakRef.destroyed.get();
+ }
+
+ /**
+ *
+ * Create new Factory of {@link MemoryChunkCleaner}. All manual calls to {@link ArrowBuf#close()} or
+ * {@link ReferenceManager#release()} will be ignored. JVM garbage collector will be responsible for
+ * all cleanup work of the managed Arrow buffers as well as the underlying memory chunks.
+ *
+ *
+ *
+ * This is a shortcut to {@code #newFactory(Mode.GC_ONLY)}.
+ *
+ *
+ * @return the factory instance.
+ */
+ public static MemoryChunkManager.Factory newFactory() {
+ return new Factory(Mode.GC_ONLY);
+ }
+
+ /**
+ * Create new Factory of {@link MemoryChunkCleaner}. A specific {@link MemoryChunkCleaner.Mode} can be
+ * specified to let Arrow decide how to handle managed chunks' lifecycles.
+ *
+ * @see Mode
+ * @return the factory instance.
+ */
+ public static MemoryChunkManager.Factory newFactory(Mode mode) {
+ return new Factory(mode);
+ }
+
+ /**
+ * Factory. Unlike {@link MemoryChunkManager.Factory}, each factory here has its own
+ * lifecycle to hold references to all managed chunks. Using {@link MemoryChunkCleaner#newFactory()}
+ * to get a new instance of this.
+ */
+ static class Factory implements MemoryChunkManager.Factory {
+
+ private static Map instances = new IdentityHashMap<>();
+
+ private final Mode mode;
+
+ private void add(Factory factory) {
+ instances.put(factory, null);
+ }
+
+ private static void remove(Factory factory) {
+ instances.remove(factory, null);
+ }
+
+ private final ChunkWeakRefLifecycles lifecycles = new ChunkWeakRefLifecycles();
+
+ private Factory(Mode mode) {
+ this.mode = mode;
+ // Retain strong reference to Factory instance until #close() is called.
+ // If we don't do this, when lifecycles gets collected by GC,
+ // The chunk weak refs will then get collected without being enqueued
+ // to reference queue.
+ add(this);
+ }
+
+ @Override
+ public MemoryChunkManager create(MemoryChunk chunk) {
+ Preconditions.checkState(!(chunk instanceof SimpleChunkWrapper), "Chunk is already " +
+ "managed by MemoryChunkCleaner");
+ final SimpleChunkWrapper wrapper = new SimpleChunkWrapper(chunk);
+ return new MemoryChunkCleaner(wrapper, lifecycles, mode);
+ }
+
+ @Override
+ public void close() {
+ cleanup();
+ remove(this);
+ }
+
+ /**
+ * Clean up all managed chunks in one shot.
+ */
+ public void cleanup() {
+ lifecycles.cleanAll();
+ }
+ }
+
+ private static class SimpleChunkWrapper implements MemoryChunk {
+ private final MemoryChunk chunk;
+
+ private SimpleChunkWrapper(MemoryChunk chunk) {
+ this.chunk = chunk;
+ }
+
+ private MemoryChunk getChunk() {
+ return chunk;
+ }
+
+ @Override
+ public long size() {
+ return chunk.size();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return chunk.memoryAddress();
+ }
+
+ @Override
+ public void destroy() {
+ chunk.destroy();
+ }
+ }
+
+ private static final ReferenceQueue