From a1a2fd12a9eae89b40b6e6980ef8faa1142eb78f Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Tue, 10 Aug 2021 19:31:40 +0800
Subject: [PATCH 1/9] Add AutoBufferLedger
---
.../jni/DirectReservationListener.java | 42 +-
.../jni/UnsafeRecordBatchSerializer.java | 9 +-
.../arrow/memory/NativeUnderlyingMemory.java | 4 +-
.../arrow/memory/AllocationManager.java | 2 +-
.../org/apache/arrow/memory/ArrowBuf.java | 48 +-
.../apache/arrow/memory/AutoBufferLedger.java | 136 ++++++
.../apache/arrow/memory/BaseAllocator.java | 16 +
.../arrow/memory/BaseReferenceManager.java | 211 +++++++++
.../apache/arrow/memory/BufferAllocator.java | 8 +
.../org/apache/arrow/memory/BufferLedger.java | 409 +++++-------------
.../arrow/memory/LegacyBufferLedger.java | 168 +++++++
.../arrow/memory/ReferenceCountAware.java | 57 +++
.../apache/arrow/memory/ReferenceManager.java | 38 +-
.../apache/arrow/memory/util/MemoryUtil.java | 94 ++++
.../arrow/memory/TestAutoBufferLedger.java | 141 ++++++
15 files changed, 988 insertions(+), 395 deletions(-)
create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java
create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java
create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java
create mode 100644 java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
index 72a1cadcf69b9..57ea5f19a7e02 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java
@@ -21,6 +21,7 @@
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;
/**
@@ -29,20 +30,6 @@
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
- private final Method methodReserve;
- private final Method methodUnreserve;
-
- private DirectReservationListener() {
- try {
- final Class> classBits = Class.forName("java.nio.Bits");
- methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
- methodReserve.setAccessible(true);
- methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
- methodUnreserve.setAccessible(true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
private static final DirectReservationListener INSTANCE = new DirectReservationListener();
@@ -55,14 +42,7 @@ public static DirectReservationListener instance() {
*/
@Override
public void reserve(long size) {
- try {
- if (size > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
- }
- methodReserve.invoke(null, (int) size, (int) size);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ MemoryUtil.reserveDirectMemory(size);
}
/**
@@ -70,14 +50,7 @@ public void reserve(long size) {
*/
@Override
public void unreserve(long size) {
- try {
- if (size > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
- }
- methodUnreserve.invoke(null, (int) size, (int) size);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ MemoryUtil.unreserveDirectMemory(size);
}
/**
@@ -85,13 +58,6 @@ public void unreserve(long size) {
*/
@VisibleForTesting
public long getCurrentDirectMemReservation() {
- try {
- final Class> classBits = Class.forName("java.nio.Bits");
- final Field f = classBits.getDeclaredField("reservedMemory");
- f.setAccessible(true);
- return ((AtomicLong) f.get(null)).get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return MemoryUtil.getCurrentDirectMemReservation();
}
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
index f7ad378f3dd6a..57aeb141be2f2 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java
@@ -35,10 +35,7 @@
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.RecordBatch;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.BufferLedger;
-import org.apache.arrow.memory.NativeUnderlyingMemory;
+import org.apache.arrow.memory.*;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.NoCompressionCodec;
@@ -131,8 +128,8 @@ public static ArrowRecordBatch deserializeUnsafe(
final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length());
final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
size, nativeBufferRef, bufferMeta.offset());
- BufferLedger ledger = am.associate(allocator);
- ArrowBuf buf = new ArrowBuf(ledger, null, size, bufferMeta.offset());
+ ReferenceManager rm = am.createReferenceManager(allocator);
+ ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset());
buffers.add(buf);
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
index f8abbed1f13d7..14c38e1c9a4fa 100644
--- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
+++ b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
@@ -60,8 +60,8 @@ public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int
return new NativeUnderlyingMemory(bufferAllocator, size, nativeBufferId, address);
}
- public BufferLedger associate(BufferAllocator allocator) {
- return super.associate(allocator);
+ public ReferenceManager createReferenceManager(BufferAllocator allocator) {
+ return super.associate(allocator).newReferenceManager();
}
@Override
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
index 5f8ab12446ad4..4bba7947ff8bd 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java
@@ -106,7 +106,7 @@ private BufferLedger associate(final BufferAllocator allocator, final boolean re
return ledger;
}
- ledger = new BufferLedger(allocator, this);
+ ledger = allocator.getBufferLedgerFactory().create(allocator, this);
if (retain) {
// the new reference manager will have a ref count of 1
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
index ea5e29f72536f..02f77fec7caa2 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ArrowBuf.java
@@ -1094,6 +1094,15 @@ public long getId() {
return id;
}
+ /**
+ * Create a logger of this {@link ArrowBuf}.
+ *
+ * @return the newly created logger
+ */
+ Logger createLogger() {
+ return new Logger(id, memoryAddress(), length, historicalLog);
+ }
+
/**
* Prints information of this buffer into sb
at the given
* indentation and verbosity level.
@@ -1103,12 +1112,7 @@ public long getId() {
*
*/
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
- CommonUtil.indent(sb, indent).append(toString());
-
- if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
- sb.append("\n");
- historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
- }
+ new Logger(id, addr, length, historicalLog).print(sb, indent, verbosity);
}
/**
@@ -1242,4 +1246,36 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) {
}
}
+ /**
+ * Create a logger for an {@link ArrowBuf}. This is currently used in debugging or historical logging
+ * in code of {@link BufferLedger} to avoid directly holding a strong reference to {@link ArrowBuf}.
+ * So that GC could be able to involved in auto cleaning logic in {@link AutoBufferLedger}.
+ */
+ static class Logger {
+ private final long id;
+ private final long addr;
+ private final long length;
+ private final HistoricalLog historicalLog;
+
+ public Logger(long id, long addr, long length, HistoricalLog historicalLog) {
+ this.id = id;
+ this.addr = addr;
+ this.length = length;
+ this.historicalLog = historicalLog;
+ }
+
+ public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+ CommonUtil.indent(sb, indent).append(toString());
+
+ if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
+ sb.append("\n");
+ historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ArrowBuf.Logger[%d], address:%d, length:%d", id, addr, length);
+ }
+ }
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
new file mode 100644
index 0000000000000..88e567f080487
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.arrow.memory.util.MemoryUtil;
+
+import sun.misc.Cleaner;
+
+/**
+ * An alternative implementation of {@link BufferLedger}. The reference is auto managed by JVM garbage collector
+ * comparing to {@link LegacyBufferLedger}. Explicit calls to reference management methods such as
+ * {@link #retain()} and {@link #release()} will be ignored.
+ *
+ *
+ * Note when this implementation, the accurate release time of the underlying {@link AllocationManager} may become
+ * unpredictable because we are relying GC to do clean-up. As a result, it's recommended to specify very large
+ * allocation limit (e.g. {@link Integer#MAX_VALUE}) to the corresponding {@link BufferAllocator} to avoid
+ * unexpected allocation failures.
+ *
+ *
+ *
+ * Also, comparing to {@link LegacyBufferLedger}, all instances of {@link AutoBufferLedger} will add its
+ * size to JVM direct memory counter which can be limited by specifying JVM option "-XX:MaxDirectMemorySize".
+ * JVM should ensure that garbage collection will be performed once total reservation reached the limit.
+ *
+ */
+public class AutoBufferLedger extends BufferLedger {
+
+ public static final Factory FACTORY = new Factory() {
+ @Override
+ public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) {
+ return new AutoBufferLedger(allocator, allocationManager);
+ }
+ };
+
+ private volatile long[] lDestructionTime = new long[] {0};
+ private final AtomicInteger refCount = new AtomicInteger(0);
+
+ AutoBufferLedger(BufferAllocator allocator, AllocationManager allocationManager) {
+ super(allocator, allocationManager);
+ }
+
+ @Override
+ protected void increment() {
+ // no-op
+ }
+
+ @Override
+ protected long getDestructionTime() {
+ return lDestructionTime[0];
+ }
+
+ @Override
+ protected ReferenceManager newReferenceManager() {
+ reserve0();
+ final ReferenceManager rm = new BaseReferenceManager(this);
+ Cleaner.create(rm, new LedgerDeallocator());
+ return rm;
+ }
+
+ @Override
+ public int getRefCount() {
+ return -1;
+ }
+
+ @Override
+ public boolean release() {
+ return false;
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ return false;
+ }
+
+ @Override
+ public void retain() {
+
+ }
+
+ @Override
+ public void retain(int increment) {
+
+ }
+
+ private void reserve0() {
+ if (refCount.getAndAdd(1) == 0) {
+ MemoryUtil.reserveDirectMemory(getAllocationManager().getSize());
+ }
+ }
+
+ private void release0() {
+ if (refCount.addAndGet(-1) == 0) {
+ synchronized (getAllocationManager()) {
+ final AllocationManager am = getAllocationManager();
+ lDestructionTime[0] = System.nanoTime();
+ am.release(this);
+ MemoryUtil.unreserveDirectMemory(am.getSize());
+ }
+ }
+ }
+
+ /**
+ * Release hook will be invoked by JVM cleaner.
+ *
+ * @see #newReferenceManager()
+ */
+ private class LedgerDeallocator implements Runnable {
+
+ private LedgerDeallocator() {
+ }
+
+ @Override
+ public void run() {
+ release0();
+ }
+ }
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 246b2212e2654..d48c87501360a 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -74,6 +74,7 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
private final HistoricalLog historicalLog;
private final RoundingPolicy roundingPolicy;
private final AllocationManager.Factory allocationManagerFactory;
+ private final BufferLedger.Factory bufferLedgerFactory;
private volatile boolean isClosed = false; // the allocator has been closed
@@ -94,6 +95,7 @@ protected BaseAllocator(
this.listener = config.getListener();
this.allocationManagerFactory = config.getAllocationManagerFactory();
+ this.bufferLedgerFactory = config.getBufferLedgerFactory();
if (parentAllocator != null) {
this.root = parentAllocator.root;
@@ -141,6 +143,11 @@ public Collection getChildAllocators() {
}
}
+ @Override
+ public BufferLedger.Factory getBufferLedgerFactory() {
+ return bufferLedgerFactory;
+ }
+
private static String createErrorMsg(final BufferAllocator allocator, final long rounded, final long requested) {
if (rounded != requested) {
return String.format(
@@ -343,6 +350,7 @@ public BufferAllocator newChildAllocator(
.maxAllocation(maxAllocation)
.roundingPolicy(roundingPolicy)
.allocationManagerFactory(allocationManagerFactory)
+ .bufferLedgerFactory(bufferLedgerFactory)
.build());
if (DEBUG) {
@@ -728,6 +736,14 @@ AllocationManager.Factory getAllocationManagerFactory() {
return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory();
}
+ /**
+ * Factory for creating {@link BufferLedger} instances.
+ */
+ @Value.Default
+ BufferLedger.Factory getBufferLedgerFactory() {
+ return LegacyBufferLedger.FACTORY;
+ }
+
/**
* Listener callback. Must be non-null.
*/
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java
new file mode 100644
index 0000000000000..be72223ce8610
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseReferenceManager.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Standard implementation of {@link ReferenceManager} backed by a
+ * {@link BufferLedger}.
+ */
+public class BaseReferenceManager implements ReferenceManager {
+ private final BufferLedger ledger;
+ private final BufferAllocator allocator;
+ private final AllocationManager allocationManager;
+
+ public BaseReferenceManager(BufferLedger ledger) {
+ this.ledger = ledger;
+ this.allocator = ledger.getAllocator();
+ this.allocationManager = ledger.getAllocationManager();
+ }
+
+ @Override
+ public int getRefCount() {
+ return ledger.getRefCount();
+ }
+
+ @Override
+ public boolean release() {
+ return ledger.release();
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ return ledger.release(decrement);
+ }
+
+ @Override
+ public void retain() {
+ ledger.retain();
+ }
+
+ @Override
+ public void retain(int increment) {
+ ledger.retain(increment);
+ }
+
+ /**
+ * Derive a new ArrowBuf from a given source ArrowBuf. The new derived
+ * ArrowBuf will share the same reference count as rest of the ArrowBufs
+ * associated with this ledger. This operation is typically used for
+ * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at
+ * a particular index in the underlying memory and having access to a
+ * particular length (in bytes) of data in memory chunk.
+ *
+ * This method is also used as a helper for transferring ownership and retain to target
+ * allocator.
+ *
+ * @param sourceBuffer source ArrowBuf
+ * @param index index (relative to source ArrowBuf) new ArrowBuf should be
+ * derived from
+ * @param length length (bytes) of data in underlying memory that derived buffer will
+ * have access to in underlying memory
+ * @return derived buffer
+ */
+ @Override
+ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
+ return ledger.deriveBuffer(sourceBuffer, index, length);
+ }
+
+ /**
+ * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
+ * memory ownership and accounting. This has no impact on the reference counting for the current
+ * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer.
+ *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this
+ * memory is being associated with the target allocator or in other words allocation manager currently
+ * doesn't hold a mapping for the target allocator) or the current value of the reference count for
+ * the target allocator-reference manager combination + 1 in the case that the provided allocator
+ * already had an association to this underlying memory.
+ *
+ *
+ * @param srcBuffer source ArrowBuf
+ * @param target The target allocator to create an association with.
+ * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf.
+ */
+ @Override
+ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
+
+ if (BaseAllocator.DEBUG) {
+ ledger.logEvent("retain(%s)", target.getName());
+ }
+
+ // the call to associate will return the corresponding reference manager (buffer ledger) for
+ // the target allocator. if the allocation manager didn't already have a mapping
+ // for the target allocator, it will create one and return the new reference manager with a
+ // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
+ // alternatively, if there was already a mapping for in
+ // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
+ // and this will be true for all the existing buffers currently managed by targetrefmanager
+ final BufferLedger targetRefManager = allocationManager.associate(target);
+ // create a new ArrowBuf to associate with new allocator and target ref manager
+ final long targetBufLength = srcBuffer.capacity();
+ ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
+ targetArrowBuf.readerIndex(srcBuffer.readerIndex());
+ targetArrowBuf.writerIndex(srcBuffer.writerIndex());
+ return targetArrowBuf;
+ }
+
+ /**
+ * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
+ * This will generate a new ArrowBuf that carries an association with the underlying memory
+ * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory,
+ * that memory ownership/accounting will be transferred to the target allocator. If this
+ * ArrowBuf does not currently own the memory underlying it (and is only associated with it),
+ * this does not transfer any ownership to the newly created ArrowBuf.
+ *
+ * This operation has no impact on the reference count of this ArrowBuf. The newly created
+ * ArrowBuf with either have a reference count of 1 (in the case that this is the first time
+ * this memory is being associated with the new allocator) or the current value of the reference
+ * count for the other AllocationManager/BufferLedger combination + 1 in the case that the provided
+ * allocator already had an association to this underlying memory.
+ *
+ *
+ * Transfers will always succeed, even if that puts the other allocator into an overlimit
+ * situation. This is possible due to the fact that the original owning allocator may have
+ * allocated this memory out of a local reservation whereas the target allocator may need to
+ * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless
+ * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
+ * to an actual overlimit==true condition. This is simply conservative behavior which means we may
+ * return overlimit slightly sooner than is necessary.
+ *
+ *
+ * @param target The allocator to transfer ownership to.
+ * @return A new transfer result with the impact of the transfer (whether it was overlimit) as
+ * well as the newly created ArrowBuf.
+ */
+ @Override
+ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) {
+ // the call to associate will return the corresponding reference manager (buffer ledger) for
+ // the target allocator. if the allocation manager didn't already have a mapping
+ // for the target allocator, it will create one and return the new reference manager with a
+ // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
+ // alternatively, if there was already a mapping for in
+ // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
+ // and this will be true for all the existing buffers currently managed by targetrefmanager
+ final BufferLedger targetLedger = allocationManager.associate(target);
+ // create a new ArrowBuf to associate with new allocator and target ref manager
+ final long targetBufLength = srcBuffer.capacity();
+ final ArrowBuf targetArrowBuf = targetLedger.deriveBuffer(srcBuffer, 0, targetBufLength);
+ targetArrowBuf.readerIndex(srcBuffer.readerIndex());
+ targetArrowBuf.writerIndex(srcBuffer.writerIndex());
+ final boolean allocationFit = ledger.transferBalance(targetLedger);
+ return new TransferResult(allocationFit, targetArrowBuf);
+ }
+
+ @Override
+ public BufferAllocator getAllocator() {
+ return ledger.getAllocator();
+ }
+
+ @Override
+ public long getSize() {
+ return ledger.getSize();
+ }
+
+ @Override
+ public long getAccountedSize() {
+ return ledger.getAccountedSize();
+ }
+
+ /**
+ * The outcome of a Transfer.
+ */
+ public static class TransferResult implements OwnershipTransferResult {
+
+ // Whether this transfer fit within the target allocator's capacity.
+ final boolean allocationFit;
+
+ // The newly created buffer associated with the target allocator
+ public final ArrowBuf buffer;
+
+ private TransferResult(boolean allocationFit, ArrowBuf buffer) {
+ this.allocationFit = allocationFit;
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ArrowBuf getTransferredBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public boolean getAllocationFit() {
+ return allocationFit;
+ }
+ }
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index 8fbf6f7b073c3..c9fafb6a4a503 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -172,6 +172,14 @@ BufferAllocator newChildAllocator(
*/
Collection getChildAllocators();
+
+ /**
+ * Returns {@link BufferLedger.Factory} used by this allocator.
+ *
+ * @return the buffer ledger factory
+ */
+ BufferLedger.Factory getBufferLedgerFactory();
+
/**
* Create an allocation reservation. A reservation is a way of building up
* a request for a buffer whose size is not known in advance. See
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
index 48b3e183d5ae0..7583e8f0035cc 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
@@ -18,7 +18,6 @@
package org.apache.arrow.memory;
import java.util.IdentityHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.util.CommonUtil;
@@ -31,13 +30,12 @@
* ArrowBufs managed by this reference manager share a common
* fate (same reference count).
*/
-public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager {
- private final IdentityHashMap buffers =
+public abstract class BufferLedger implements ValueWithKeyIncluded, ReferenceCountAware {
+ private final IdentityHashMap buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<>() : null;
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
// unique ID assigned to each ledger
private final long ledgerId = LEDGER_ID_GENERATOR.incrementAndGet();
- private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
// manage request for retain
// correctly
private final long lCreationTime = System.nanoTime();
@@ -46,165 +44,73 @@ public class BufferLedger implements ValueWithKeyIncluded, Refe
private final HistoricalLog historicalLog =
BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1) : null;
- private volatile long lDestructionTime = 0;
BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
this.allocator = allocator;
this.allocationManager = allocationManager;
}
- boolean isOwningLedger() {
- return this == allocationManager.getOwningLedger();
- }
-
- public BufferAllocator getKey() {
- return allocator;
- }
-
- /**
- * Get the buffer allocator associated with this reference manager.
- * @return buffer allocator
- */
- @Override
- public BufferAllocator getAllocator() {
- return allocator;
- }
-
- /**
- * Get this ledger's reference count.
- * @return reference count
- */
- @Override
- public int getRefCount() {
- return bufRefCnt.get();
- }
-
/**
* Increment the ledger's reference count for the associated
* underlying memory chunk. All ArrowBufs managed by this ledger
* will share the ref count.
*/
- void increment() {
- bufRefCnt.incrementAndGet();
- }
+ protected abstract void increment();
+
/**
- * Decrement the ledger's reference count by 1 for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
- * @return true if the new ref count has dropped to 0, false otherwise
+ * Get destruction time of this buffer ledger.
+ *
+ * @return destruction time in nano, 0 if the ledger is not destructed yet
*/
- @Override
- public boolean release() {
- return release(1);
- }
+ protected abstract long getDestructionTime();
+
/**
- * Decrement the ledger's reference count for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
- * @param decrement amount to decrease the reference count by
- * @return true if the new ref count has dropped to 0, false otherwise
+ * Create new instance of {@link ReferenceManager} using this ledger.
+ *
+ * @return the newly created instance of {@link ReferenceManager}
*/
- @Override
- public boolean release(int decrement) {
- Preconditions.checkState(decrement >= 1,
- "ref count decrement should be greater than or equal to 1");
- // decrement the ref count
- final int refCnt = decrement(decrement);
- if (BaseAllocator.DEBUG) {
- historicalLog.recordEvent("release(%d). original value: %d",
- decrement, refCnt + decrement);
- }
- // the new ref count should be >= 0
- Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative");
- return refCnt == 0;
- }
+ protected abstract ReferenceManager newReferenceManager();
/**
- * Decrement the ledger's reference count for the associated underlying
- * memory chunk. If the reference count drops to 0, it implies that
- * no ArrowBufs managed by this reference manager need access to the memory
- * chunk. In that case, the ledger should inform the allocation manager
- * about releasing its ownership for the chunk. Whether or not the memory
- * chunk will be released is something that {@link AllocationManager} will
- * decide since tracks the usage of memory chunk across multiple reference
- * managers and allocators.
+ * Used by an allocator to create a new ArrowBuf. This is provided
+ * as a helper method for the allocator when it allocates a new memory chunk
+ * using a new instance of allocation manager and creates a new reference manager
+ * too.
*
- * @param decrement amount to decrease the reference count by
- * @return the new reference count
+ * @param length The length in bytes that this ArrowBuf will provide access to.
+ * @param manager An optional BufferManager argument that can be used to manage expansion of
+ * this ArrowBuf
+ * @return A new ArrowBuf that shares references with all ArrowBufs associated
+ * with this BufferLedger
*/
- private int decrement(int decrement) {
+ ArrowBuf newArrowBuf(final long length, final BufferManager manager) {
allocator.assertOpen();
- final int outcome;
- synchronized (allocationManager) {
- outcome = bufRefCnt.addAndGet(-decrement);
- if (outcome == 0) {
- lDestructionTime = System.nanoTime();
- // refcount of this reference manager has dropped to 0
- // inform the allocation manager that this reference manager
- // no longer holds references to underlying memory
- allocationManager.release(this);
- }
- }
- return outcome;
- }
- /**
- * Increment the ledger's reference count for associated
- * underlying memory chunk by 1.
- */
- @Override
- public void retain() {
- retain(1);
- }
+ // the start virtual address of the ArrowBuf will be same as address of memory chunk
+ final long startAddress = allocationManager.memoryAddress();
- /**
- * Increment the ledger's reference count for associated
- * underlying memory chunk by the given amount.
- *
- * @param increment amount to increase the reference count by
- */
- @Override
- public void retain(int increment) {
- Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
+ // create ArrowBuf
+ final ArrowBuf buf = new ArrowBuf(newReferenceManager(), manager, length, startAddress);
+
+ // logging
if (BaseAllocator.DEBUG) {
- historicalLog.recordEvent("retain(%d)", increment);
+ logEvent(
+ "ArrowBuf(BufferLedger, BufferAllocator[%s], " +
+ "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d",
+ allocator.getName(), System.identityHashCode(buf), buf.toString(),
+ System.identityHashCode(this));
+
+ synchronized (buffers) {
+ buffers.put(buf.createLogger(), null);
+ }
}
- final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
- Preconditions.checkArgument(originalReferenceCount > 0);
+
+ return buf;
}
- /**
- * Derive a new ArrowBuf from a given source ArrowBuf. The new derived
- * ArrowBuf will share the same reference count as rest of the ArrowBufs
- * associated with this ledger. This operation is typically used for
- * slicing -- creating new ArrowBufs from a compound ArrowBuf starting at
- * a particular index in the underlying memory and having access to a
- * particular length (in bytes) of data in memory chunk.
- *
- * This method is also used as a helper for transferring ownership and retain to target
- * allocator.
- *
- * @param sourceBuffer source ArrowBuf
- * @param index index (relative to source ArrowBuf) new ArrowBuf should be
- * derived from
- * @param length length (bytes) of data in underlying memory that derived buffer will
- * have access to in underlying memory
- * @return derived buffer
- */
- @Override
- public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
+ ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long length) {
/*
* Usage type 1 for deriveBuffer():
* Used for slicing where index represents a relative index in the source ArrowBuf
@@ -226,7 +132,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt
// create new ArrowBuf
final ArrowBuf derivedBuf = new ArrowBuf(
- this,
+ newReferenceManager(),
null,
length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf
derivedBufferAddress // starting byte address in the underlying memory for this new ArrowBuf
@@ -234,7 +140,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt
// logging
if (BaseAllocator.DEBUG) {
- historicalLog.recordEvent(
+ logEvent(
"ArrowBuf(BufferLedger, BufferAllocator[%s], " +
"UnsafeDirectLittleEndian[identityHashCode == " +
"%d](%s)) => ledger hc == %d",
@@ -242,109 +148,32 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt
System.identityHashCode(this));
synchronized (buffers) {
- buffers.put(derivedBuf, null);
+ buffers.put(derivedBuf.createLogger(), null);
}
}
return derivedBuf;
}
- /**
- * Used by an allocator to create a new ArrowBuf. This is provided
- * as a helper method for the allocator when it allocates a new memory chunk
- * using a new instance of allocation manager and creates a new reference manager
- * too.
- *
- * @param length The length in bytes that this ArrowBuf will provide access to.
- * @param manager An optional BufferManager argument that can be used to manage expansion of
- * this ArrowBuf
- * @return A new ArrowBuf that shares references with all ArrowBufs associated
- * with this BufferLedger
- */
- ArrowBuf newArrowBuf(final long length, final BufferManager manager) {
- allocator.assertOpen();
-
- // the start virtual address of the ArrowBuf will be same as address of memory chunk
- final long startAddress = allocationManager.memoryAddress();
-
- // create ArrowBuf
- final ArrowBuf buf = new ArrowBuf(this, manager, length, startAddress);
-
- // logging
- if (BaseAllocator.DEBUG) {
- historicalLog.recordEvent(
- "ArrowBuf(BufferLedger, BufferAllocator[%s], " +
- "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d",
- allocator.getName(), System.identityHashCode(buf), buf.toString(),
- System.identityHashCode(this));
-
- synchronized (buffers) {
- buffers.put(buf, null);
- }
- }
-
- return buf;
- }
-
- /**
- * Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
- * memory ownership and accounting. This has no impact on the reference counting for the current
- * ArrowBuf except in the situation where the passed in Allocator is the same as the current buffer.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created
- * ArrowBuf with either have a reference count of 1 (in the case that this is the first time this
- * memory is being associated with the target allocator or in other words allocation manager currently
- * doesn't hold a mapping for the target allocator) or the current value of the reference count for
- * the target allocator-reference manager combination + 1 in the case that the provided allocator
- * already had an association to this underlying memory.
- *
- *
- * @param srcBuffer source ArrowBuf
- * @param target The target allocator to create an association with.
- * @return A new ArrowBuf which shares the same underlying memory as the provided ArrowBuf.
- */
- @Override
- public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) {
-
- if (BaseAllocator.DEBUG) {
- historicalLog.recordEvent("retain(%s)", target.getName());
- }
-
- // the call to associate will return the corresponding reference manager (buffer ledger) for
- // the target allocator. if the allocation manager didn't already have a mapping
- // for the target allocator, it will create one and return the new reference manager with a
- // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
- // alternatively, if there was already a mapping for in
- // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
- // and this will be true for all the existing buffers currently managed by targetrefmanager
- final BufferLedger targetRefManager = allocationManager.associate(target);
- // create a new ArrowBuf to associate with new allocator and target ref manager
- final long targetBufLength = srcBuffer.capacity();
- ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
- targetArrowBuf.readerIndex(srcBuffer.readerIndex());
- targetArrowBuf.writerIndex(srcBuffer.writerIndex());
- return targetArrowBuf;
- }
-
/**
* Transfer any balance the current ledger has to the target ledger. In the case
* that the current ledger holds no memory, no transfer is made to the new ledger.
*
- * @param targetReferenceManager The ledger to transfer ownership account to.
+ * @param targetLedger The ledger to transfer ownership account to.
* @return Whether transfer fit within target ledgers limits.
*/
- boolean transferBalance(final ReferenceManager targetReferenceManager) {
- Preconditions.checkArgument(targetReferenceManager != null,
+ boolean transferBalance(final BufferLedger targetLedger) {
+ Preconditions.checkArgument(targetLedger != null,
"Expecting valid target reference manager");
- final BufferAllocator targetAllocator = targetReferenceManager.getAllocator();
+ final BufferAllocator targetAllocator = targetLedger.getAllocator();
Preconditions.checkArgument(allocator.getRoot() == targetAllocator.getRoot(),
"You can only transfer between two allocators that share the same root.");
allocator.assertOpen();
- targetReferenceManager.getAllocator().assertOpen();
+ targetLedger.getAllocator().assertOpen();
// if we're transferring to ourself, just return.
- if (targetReferenceManager == this) {
+ if (targetLedger == this) {
return true;
}
@@ -360,8 +189,8 @@ boolean transferBalance(final ReferenceManager targetReferenceManager) {
}
if (BaseAllocator.DEBUG) {
- this.historicalLog.recordEvent("transferBalance(%s)",
- targetReferenceManager.getAllocator().getName());
+ logEvent("transferBalance(%s)",
+ targetLedger.getAllocator().getName());
}
boolean overlimit = targetAllocator.forceAllocate(allocationManager.getSize());
@@ -369,90 +198,15 @@ boolean transferBalance(final ReferenceManager targetReferenceManager) {
// since the transfer can only happen from the owning reference manager,
// we need to set the target ref manager as the new owning ref manager
// for the chunk of memory in allocation manager
- allocationManager.setOwningLedger((BufferLedger) targetReferenceManager);
+ allocationManager.setOwningLedger(targetLedger);
return overlimit;
}
}
- /**
- * Transfer the memory accounting ownership of this ArrowBuf to another allocator.
- * This will generate a new ArrowBuf that carries an association with the underlying memory
- * of this ArrowBuf. If this ArrowBuf is connected to the owning BufferLedger of this memory,
- * that memory ownership/accounting will be transferred to the target allocator. If this
- * ArrowBuf does not currently own the memory underlying it (and is only associated with it),
- * this does not transfer any ownership to the newly created ArrowBuf.
- *
- * This operation has no impact on the reference count of this ArrowBuf. The newly created
- * ArrowBuf with either have a reference count of 1 (in the case that this is the first time
- * this memory is being associated with the new allocator) or the current value of the reference
- * count for the other AllocationManager/BufferLedger combination + 1 in the case that the provided
- * allocator already had an association to this underlying memory.
- *
- *
- * Transfers will always succeed, even if that puts the other allocator into an overlimit
- * situation. This is possible due to the fact that the original owning allocator may have
- * allocated this memory out of a local reservation whereas the target allocator may need to
- * allocate new memory from a parent or RootAllocator. This operation is done n a mostly-lockless
- * but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
- * to an actual overlimit==true condition. This is simply conservative behavior which means we may
- * return overlimit slightly sooner than is necessary.
- *
- *
- * @param target The allocator to transfer ownership to.
- * @return A new transfer result with the impact of the transfer (whether it was overlimit) as
- * well as the newly created ArrowBuf.
- */
- @Override
- public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAllocator target) {
- // the call to associate will return the corresponding reference manager (buffer ledger) for
- // the target allocator. if the allocation manager didn't already have a mapping
- // for the target allocator, it will create one and return the new reference manager with a
- // reference count of 1. Thus the newly created buffer in this case will have a ref count of 1.
- // alternatively, if there was already a mapping for in
- // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1
- // and this will be true for all the existing buffers currently managed by targetrefmanager
- final BufferLedger targetRefManager = allocationManager.associate(target);
- // create a new ArrowBuf to associate with new allocator and target ref manager
- final long targetBufLength = srcBuffer.capacity();
- final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength);
- targetArrowBuf.readerIndex(srcBuffer.readerIndex());
- targetArrowBuf.writerIndex(srcBuffer.writerIndex());
- final boolean allocationFit = transferBalance(targetRefManager);
- return new TransferResult(allocationFit, targetArrowBuf);
- }
-
- /**
- * The outcome of a Transfer.
- */
- public class TransferResult implements OwnershipTransferResult {
-
- // Whether this transfer fit within the target allocator's capacity.
- final boolean allocationFit;
-
- // The newly created buffer associated with the target allocator
- public final ArrowBuf buffer;
-
- private TransferResult(boolean allocationFit, ArrowBuf buffer) {
- this.allocationFit = allocationFit;
- this.buffer = buffer;
- }
-
- @Override
- public ArrowBuf getTransferredBuffer() {
- return buffer;
- }
-
- @Override
- public boolean getAllocationFit() {
- return allocationFit;
- }
- }
-
/**
* Total size (in bytes) of memory underlying this reference manager.
* @return Size (in bytes) of the memory chunk
*/
- @Override
public long getSize() {
return allocationManager.getSize();
}
@@ -463,7 +217,6 @@ public long getSize() {
* is not the owning ledger associated with this memory.
* @return Amount of accounted(owned) memory associated with this ledger.
*/
- @Override
public long getAccountedSize() {
synchronized (allocationManager) {
if (allocationManager.getOwningLedger() == this) {
@@ -490,11 +243,11 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
.append("), isOwning: ")
.append(", size: ")
.append(", references: ")
- .append(bufRefCnt.get())
+ .append(getRefCount())
.append(", life: ")
.append(lCreationTime)
.append("..")
- .append(lDestructionTime)
+ .append(getDestructionTime())
.append(", allocatorManager: [")
.append(", life: ");
@@ -505,8 +258,8 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
sb.append("] holds ")
.append(buffers.size())
.append(" buffers. \n");
- for (ArrowBuf buf : buffers.keySet()) {
- buf.print(sb, indent + 2, verbosity);
+ for (ArrowBuf.Logger bufLogger : buffers.keySet()) {
+ bufLogger.print(sb, indent + 2, verbosity);
sb.append('\n');
}
}
@@ -518,8 +271,54 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) {
*
* @return The AllocationManager used by this BufferLedger.
*/
- public AllocationManager getAllocationManager() {
+ AllocationManager getAllocationManager() {
return allocationManager;
}
+ /**
+ * Record a single log line into this ledger's historical log.
+ */
+ protected void logEvent(final String noteFormat, Object... args) {
+ historicalLog.recordEvent(noteFormat, args);
+ }
+
+ /**
+ * If this ledger is the owning ledger of the underlying allocation manager.
+ *
+ * @return true if this ledger owns its allocation manager
+ */
+ boolean isOwningLedger() {
+ return this == allocationManager.getOwningLedger();
+ }
+
+ /**
+ * Get the buffer allocator associated with this reference manager.
+ * @return buffer allocator
+ */
+ BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ /**
+ * Get allocator key. Used by {@link LowCostIdentityHashMap}.
+ */
+ public BufferAllocator getKey() {
+ return allocator;
+ }
+
+ /**
+ * A factory interface for creating {@link BufferLedger}.
+ */
+ interface Factory {
+ /**
+ * Create an instance of {@link BufferLedger}.
+ *
+ * @param allocator The allocator that will bind to the newly created {@link BufferLedger}.
+ * @param allocationManager The {@link AllocationManager} that actually holds the underlying
+ * memory block. Note that the newly created {@link BufferLedger} will
+ * not be the one that actually owns this piece of memory by default.
+ * @return The created {@link BufferLedger}.
+ */
+ BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager);
+ }
}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java
new file mode 100644
index 0000000000000..76e69f9257076
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/LegacyBufferLedger.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.arrow.util.Preconditions;
+
+/**
+ * Legacy implementation of {@link BufferLedger}. The reference count should be manually managed by
+ * explicitly invoking methods {@link #retain()} and {@link #release()}, etc.
+ */
+public class LegacyBufferLedger extends BufferLedger {
+
+ public static final Factory FACTORY = new Factory() {
+ @Override
+ public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) {
+ return new LegacyBufferLedger(allocator, allocationManager);
+ }
+ };
+
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start at zero so we can
+ private volatile long lDestructionTime = 0;
+
+ LegacyBufferLedger(BufferAllocator allocator, AllocationManager allocationManager) {
+ super(allocator, allocationManager);
+ }
+
+ /**
+ * Get this ledger's reference count.
+ * @return reference count
+ */
+ @Override
+ public int getRefCount() {
+ return bufRefCnt.get();
+ }
+
+
+ @Override
+ protected void increment() {
+ bufRefCnt.incrementAndGet();
+ }
+
+ @Override
+ protected long getDestructionTime() {
+ return lDestructionTime;
+ }
+
+ @Override
+ protected ReferenceManager newReferenceManager() {
+ return new BaseReferenceManager(this);
+ }
+
+ /**
+ * Decrement the ledger's reference count for the associated underlying
+ * memory chunk. If the reference count drops to 0, it implies that
+ * no ArrowBufs managed by this reference manager need access to the memory
+ * chunk. In that case, the ledger should inform the allocation manager
+ * about releasing its ownership for the chunk. Whether or not the memory
+ * chunk will be released is something that {@link AllocationManager} will
+ * decide since tracks the usage of memory chunk across multiple reference
+ * managers and allocators.
+ *
+ * @param decrement amount to decrease the reference count by
+ * @return the new reference count
+ */
+ private int decrement(int decrement) {
+ getAllocator().assertOpen();
+ final int outcome;
+ synchronized (getAllocationManager()) {
+ outcome = bufRefCnt.addAndGet(-decrement);
+ if (outcome == 0) {
+ lDestructionTime = System.nanoTime();
+ // refcount of this reference manager has dropped to 0
+ // inform the allocation manager that this reference manager
+ // no longer holds references to underlying memory
+ getAllocationManager().release(this);
+ }
+ }
+ return outcome;
+ }
+
+ /**
+ * Decrement the ledger's reference count by 1 for the associated underlying
+ * memory chunk. If the reference count drops to 0, it implies that
+ * no ArrowBufs managed by this reference manager need access to the memory
+ * chunk. In that case, the ledger should inform the allocation manager
+ * about releasing its ownership for the chunk. Whether or not the memory
+ * chunk will be released is something that {@link AllocationManager} will
+ * decide since tracks the usage of memory chunk across multiple reference
+ * managers and allocators.
+ * @return true if the new ref count has dropped to 0, false otherwise
+ */
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ /**
+ * Decrement the ledger's reference count for the associated underlying
+ * memory chunk. If the reference count drops to 0, it implies that
+ * no ArrowBufs managed by this reference manager need access to the memory
+ * chunk. In that case, the ledger should inform the allocation manager
+ * about releasing its ownership for the chunk. Whether or not the memory
+ * chunk will be released is something that {@link AllocationManager} will
+ * decide since tracks the usage of memory chunk across multiple reference
+ * managers and allocators.
+ * @param decrement amount to decrease the reference count by
+ * @return true if the new ref count has dropped to 0, false otherwise
+ */
+ @Override
+ public boolean release(int decrement) {
+ Preconditions.checkState(decrement >= 1,
+ "ref count decrement should be greater than or equal to 1");
+ // decrement the ref count
+ final int refCnt = decrement(decrement);
+ if (BaseAllocator.DEBUG) {
+ logEvent("release(%d). original value: %d",
+ decrement, refCnt + decrement);
+ }
+ // the new ref count should be >= 0
+ Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative");
+ return refCnt == 0;
+ }
+
+ /**
+ * Increment the ledger's reference count for associated
+ * underlying memory chunk by 1.
+ */
+ @Override
+ public void retain() {
+ retain(1);
+ }
+
+ /**
+ * Increment the ledger's reference count for associated
+ * underlying memory chunk by the given amount.
+ *
+ * @param increment amount to increase the reference count by
+ */
+ @Override
+ public void retain(int increment) {
+ Preconditions.checkArgument(increment > 0, "retain(%s) argument is not positive", increment);
+ if (BaseAllocator.DEBUG) {
+ logEvent("retain(%d)", increment);
+ }
+ final int originalReferenceCount = bufRefCnt.getAndAdd(increment);
+ Preconditions.checkArgument(originalReferenceCount > 0);
+ }
+
+
+
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java
new file mode 100644
index 0000000000000..67aec2a8444c4
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceCountAware.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Base interface for reference counted facilities.
+ */
+public interface ReferenceCountAware {
+ /**
+ * Get current reference count.
+ *
+ * @return current reference count
+ */
+ int getRefCount();
+
+ /**
+ * Decrement reference count by 1.
+ *
+ * @return true if reference count has dropped to 0
+ */
+ boolean release();
+
+ /**
+ * Decrement reference count by specific amount of decrement.
+ *
+ * @param decrement the count to decrease the reference count by
+ * @return true if reference count has dropped to 0
+ */
+ boolean release(int decrement);
+
+ /**
+ * Increment reference count by 1.
+ */
+ void retain();
+
+ /**
+ * Increment reference count by specific amount of increment.
+ *
+ * @param increment the count to increase the reference count by
+ */
+ void retain(int increment);
+}
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
index 00ae274b744d7..812ef76bfb049 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/ReferenceManager.java
@@ -21,43 +21,7 @@
* Reference Manager manages one or more ArrowBufs that share the
* reference count for the underlying memory chunk.
*/
-public interface ReferenceManager {
-
- /**
- * Return the reference count.
- * @return reference count
- */
- int getRefCount();
-
- /**
- * Decrement this reference manager's reference count by 1 for the associated underlying
- * memory. If the reference count drops to 0, it implies that ArrowBufs managed by this
- * reference manager no longer need access to the underlying memory
- * @return true if ref count has dropped to 0, false otherwise
- */
- boolean release();
-
- /**
- * Decrement this reference manager's reference count for the associated underlying
- * memory. If the reference count drops to 0, it implies that ArrowBufs managed by this
- * reference manager no longer need access to the underlying memory
- * @param decrement the count to decrease the reference count by
- * @return the new reference count
- */
- boolean release(int decrement);
-
- /**
- * Increment this reference manager's reference count by 1 for the associated underlying
- * memory.
- */
- void retain();
-
- /**
- * Increment this reference manager's reference count by a given amount for the
- * associated underlying memory.
- * @param increment the count to increase the reference count by
- */
- void retain(int increment);
+public interface ReferenceManager extends ReferenceCountAware {
/**
* Create a new ArrowBuf that is associated with an alternative allocator for the purposes of
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java
index 16ef39702ca3e..d87b116d653ee 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java
@@ -20,10 +20,14 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.arrow.util.VisibleForTesting;
import sun.misc.Unsafe;
@@ -34,6 +38,10 @@ public class MemoryUtil {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryUtil.class);
private static final Constructor> DIRECT_BUFFER_CONSTRUCTOR;
+ private static final Method DIRECT_MEMORY_RESERVE;
+ private static final Method DIRECT_MEMORY_UNRESERVE;
+ private static final Field DIRECT_MEMORY_COUNTER;
+
/**
* The unsafe object from which to access the off-heap memory.
*/
@@ -132,6 +140,48 @@ public Object run() {
}
}
DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor;
+
+ DIRECT_MEMORY_RESERVE = AccessController.doPrivileged(new PrivilegedAction() {
+ @Override
+ public Method run() {
+ try {
+ final Class> classBits = Class.forName("java.nio.Bits");
+ Method methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
+ methodReserve.setAccessible(true);
+ return methodReserve;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ DIRECT_MEMORY_UNRESERVE = AccessController.doPrivileged(new PrivilegedAction() {
+ @Override
+ public Method run() {
+ try {
+ final Class> classBits = Class.forName("java.nio.Bits");
+ Method methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
+ methodUnreserve.setAccessible(true);
+ return methodUnreserve;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ DIRECT_MEMORY_COUNTER = AccessController.doPrivileged(new PrivilegedAction() {
+ @Override
+ public Field run() {
+ try {
+ final Class> classBits = Class.forName("java.nio.Bits");
+ final Field f = classBits.getDeclaredField("reservedMemory");
+ f.setAccessible(true);
+ return f;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize MemoryUtil.", e);
}
@@ -167,4 +217,48 @@ public static ByteBuffer directBuffer(long address, int capacity) {
throw new UnsupportedOperationException(
"sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available");
}
+
+ /**
+ * Reserve bytes from JVM direct memory. Garbage collection will be triggered once
+ * the total reserved amount reaches the limit specified via JVM option "-XX:MaxDirectMemorySize".
+ *
+ * @param size size in bytes to reserve
+ */
+ public static void reserveDirectMemory(long size) {
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ DIRECT_MEMORY_RESERVE.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Unreserve bytes from JVM direct memory.
+ *
+ * @param size size in bytes to unreserve
+ */
+ public static void unreserveDirectMemory(long size) {
+ try {
+ if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
+ }
+ DIRECT_MEMORY_UNRESERVE.invoke(null, (int) size, (int) size);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Get current reservation of jVM direct memory. Visible for testing.
+ */
+ public static long getCurrentDirectMemReservation() {
+ try {
+ return ((AtomicLong) DIRECT_MEMORY_COUNTER.get(null)).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
new file mode 100644
index 0000000000000..98d8968a1f2f3
--- /dev/null
+++ b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import sun.misc.JavaLangRefAccess;
+import sun.misc.SharedSecrets;
+
+public class TestAutoBufferLedger {
+
+ private static final int MAX_ALLOCATION = Integer.MAX_VALUE;
+ private static RootAllocator root;
+
+ @BeforeClass
+ public static void beforeClass() {
+ root = new RootAllocator(
+ BaseAllocator.configBuilder()
+ .maxAllocation(MAX_ALLOCATION)
+ .bufferLedgerFactory(AutoBufferLedger.FACTORY)
+ .build());
+ }
+
+ @Test
+ public void testBufferAllocation() {
+ final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator.buffer(2L);
+ assertEquals(2L, buf.capacity());
+ assertEquals(2L, allocator.getAllocatedMemory());
+ }
+
+ @Test
+ public void testBufferDerivation() {
+ BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator.buffer(2);
+ assertEquals(2, buf.capacity());
+ assertEquals(1, buf.slice(1, 1).capacity());
+ assertEquals(2, buf.slice(0, 2).capacity());
+ assertEquals(2L, allocator.getAllocatedMemory());
+ }
+
+ @Test
+ public void testBufferDeallocation() {
+ final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator.buffer(2L);
+ assertEquals(2L, buf.capacity());
+ assertEquals(2L, allocator.getAllocatedMemory());
+
+ // AutoBufferLedger ignores all release operations here.
+ buf.getReferenceManager().release();
+ assertEquals(2L, buf.capacity());
+ }
+
+ @Test
+ public void testDirectMemoryReservation() {
+ final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ long prevAlloc = MemoryUtil.getCurrentDirectMemReservation();
+ allocator.buffer(2L);
+ long alloc = MemoryUtil.getCurrentDirectMemReservation();
+ assertEquals(2L, alloc - prevAlloc);
+ }
+
+ @Test
+ public void testManualGC() {
+ final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator.buffer(2L);
+ assertEquals(2L, allocator.getAllocatedMemory());
+ buf = null; // make the buffer able to be discovered by garbage collector
+ cleanUpJvmReferences();
+ assertEquals(0L, allocator.getAllocatedMemory());
+ }
+
+ @Test
+ public void testManualGCOnSharing() {
+ final BufferAllocator allocator = root.newChildAllocator("TEST-CHILD", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator.buffer(2L);
+ ArrowBuf sliced1 = buf.slice(1, 1);
+ ArrowBuf sliced2 = buf.slice(0, 2);
+ assertEquals(2L, allocator.getAllocatedMemory());
+ buf = null;
+ cleanUpJvmReferences();
+ assertEquals(2L, allocator.getAllocatedMemory());
+ sliced1 = null;
+ cleanUpJvmReferences();
+ assertEquals(2L, allocator.getAllocatedMemory());
+ sliced2 = null;
+ cleanUpJvmReferences();
+ assertEquals(0L, allocator.getAllocatedMemory());
+ }
+
+ @Test
+ public void testManualGCOnCrossAllocatorSharing() {
+ final BufferAllocator allocator1 = root.newChildAllocator("TEST-CHILD-1", 0, MAX_ALLOCATION);
+ final BufferAllocator allocator2 = root.newChildAllocator("TEST-CHILD-2", 0, MAX_ALLOCATION);
+ ArrowBuf buf = allocator1.buffer(2L);
+ ArrowBuf other = buf.getReferenceManager().retain(buf, allocator2);
+ assertEquals(2L, allocator1.getAllocatedMemory());
+ assertEquals(0L, allocator2.getAllocatedMemory());
+ buf = null;
+ cleanUpJvmReferences();
+ assertEquals(0L, allocator1.getAllocatedMemory());
+ assertEquals(2L, allocator2.getAllocatedMemory());
+ other = null;
+ cleanUpJvmReferences();
+ assertEquals(0L, allocator1.getAllocatedMemory());
+ assertEquals(0L, allocator2.getAllocatedMemory());
+ }
+
+ private static void cleanUpJvmReferences() {
+ final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
+ System.gc();
+ long prev = System.nanoTime();
+ while (jlra.tryHandlePendingReference()) {
+ long elapsed = System.nanoTime() - prev;
+ if (TimeUnit.NANOSECONDS.toMillis(elapsed) > 1000L) {
+ break;
+ }
+ }
+ }
+}
From e855e035f9ef9c6b6d4480576335478eda371a05 Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Wed, 18 Aug 2021 15:04:37 +0800
Subject: [PATCH 2/9] reserved fix 1
---
.../java/org/apache/arrow/memory/AutoBufferLedger.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
index 88e567f080487..af2d7b7e86d3e 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -103,7 +103,10 @@ public void retain(int increment) {
private void reserve0() {
if (refCount.getAndAdd(1) == 0) {
- MemoryUtil.reserveDirectMemory(getAllocationManager().getSize());
+ long accountedSize = getAccountedSize();
+ if (accountedSize != 0) {
+ MemoryUtil.reserveDirectMemory(accountedSize);
+ }
}
}
@@ -113,7 +116,10 @@ private void release0() {
final AllocationManager am = getAllocationManager();
lDestructionTime[0] = System.nanoTime();
am.release(this);
- MemoryUtil.unreserveDirectMemory(am.getSize());
+ long accountedSize = getAccountedSize();
+ if (accountedSize != 0) {
+ MemoryUtil.unreserveDirectMemory(am.getSize());
+ }
}
}
}
From ffe64709452975c07b8ff672265f9d6a1483f9a8 Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Wed, 18 Aug 2021 16:04:07 +0800
Subject: [PATCH 3/9] reserved fix 2
---
.../org/apache/arrow/memory/AutoBufferLedger.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
index af2d7b7e86d3e..2d65dae54ce72 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -58,11 +58,6 @@ public BufferLedger create(BufferAllocator allocator, AllocationManager allocati
super(allocator, allocationManager);
}
- @Override
- protected void increment() {
- // no-op
- }
-
@Override
protected long getDestructionTime() {
return lDestructionTime[0];
@@ -78,7 +73,12 @@ protected ReferenceManager newReferenceManager() {
@Override
public int getRefCount() {
- return -1;
+ return refCount.get();
+ }
+
+ @Override
+ protected void increment() {
+ // no-op
}
@Override
From 86b6e20119418fe2445b8f9760133c906d42d91b Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Wed, 18 Aug 2021 23:11:35 +0800
Subject: [PATCH 4/9] reserved fix 3
---
.../apache/arrow/memory/AutoBufferLedger.java | 110 ++++++++++++++----
.../memory/DirectAllocationListener.java | 38 ++++++
.../arrow/memory/TestAutoBufferLedger.java | 54 ++++++++-
3 files changed, 174 insertions(+), 28 deletions(-)
create mode 100644 java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
index 2d65dae54ce72..cd525a86d4545 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -17,13 +17,11 @@
package org.apache.arrow.memory;
+import sun.misc.Cleaner;
+
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.arrow.memory.util.MemoryUtil;
-
-import sun.misc.Cleaner;
-
/**
* An alternative implementation of {@link BufferLedger}. The reference is auto managed by JVM garbage collector
* comparing to {@link LegacyBufferLedger}. Explicit calls to reference management methods such as
@@ -37,30 +35,91 @@
*
*
*
- * Also, comparing to {@link LegacyBufferLedger}, all instances of {@link AutoBufferLedger} will add its
- * size to JVM direct memory counter which can be limited by specifying JVM option "-XX:MaxDirectMemorySize".
+ * Also, to let the GC be aware of these allocations when off-heap based
+ * {@link AllocationManager}s are used, it's required to also add the allocated sizes to JVM direct
+ * memory counter (which can be limited by specifying JVM option "-XX:MaxDirectMemorySize"). To
+ * achieve this one can simply set allocator's {@link AllocationListener} to
+ * {@link DirectAllocationListener}.
* JVM should ensure that garbage collection will be performed once total reservation reached the limit.
*
*/
public class AutoBufferLedger extends BufferLedger {
- public static final Factory FACTORY = new Factory() {
+ public static class Factory implements BufferLedger.Factory, AutoCloseable {
+ private AutoBufferLedger tail = null;
+
@Override
public BufferLedger create(BufferAllocator allocator, AllocationManager allocationManager) {
- return new AutoBufferLedger(allocator, allocationManager);
+ return new AutoBufferLedger(allocator, allocationManager, this);
}
- };
- private volatile long[] lDestructionTime = new long[] {0};
+ private void link(AutoBufferLedger ledger) {
+ synchronized (this) {
+ if (ledger.next != null || ledger.prev != null) {
+ throw new IllegalStateException("already linked");
+ }
+ if (tail == null) {
+ tail = ledger;
+ return;
+ }
+ tail.next = ledger;
+ ledger.prev = tail;
+ tail = ledger;
+ System.out.println();
+ }
+ }
+
+ private void unlink(AutoBufferLedger ledger) {
+ synchronized (this) {
+ if (ledger.next == ledger) {
+ return;
+ }
+ if (ledger.prev == ledger) {
+ throw new IllegalStateException();
+ }
+ if (ledger.prev != null) {
+ ledger.prev.next = ledger.next;
+ }
+ if (ledger.next != null) {
+ ledger.next.prev = ledger.prev;
+ }
+ ledger.prev = ledger;
+ ledger.next = ledger;
+ }
+ }
+
+ @Override
+ public void close() {
+ while (tail != null) {
+ final AutoBufferLedger tmp = tail.prev;
+ tail.destruct();
+ tail = tmp;
+ }
+ }
+ }
+
+ public static Factory newFactory() {
+ return new Factory();
+ }
+
+ private volatile long lDestructionTime = 0;
private final AtomicInteger refCount = new AtomicInteger(0);
+ private final AtomicBoolean destructed = new AtomicBoolean(false);
+ private final Factory factory;
- AutoBufferLedger(BufferAllocator allocator, AllocationManager allocationManager) {
+ private AutoBufferLedger prev = null;
+ private AutoBufferLedger next = null;
+
+ AutoBufferLedger(BufferAllocator allocator, AllocationManager allocationManager,
+ Factory factory) {
super(allocator, allocationManager);
+ this.factory = factory;
+ factory.link(this);
}
@Override
protected long getDestructionTime() {
- return lDestructionTime[0];
+ return lDestructionTime;
}
@Override
@@ -103,25 +162,26 @@ public void retain(int increment) {
private void reserve0() {
if (refCount.getAndAdd(1) == 0) {
- long accountedSize = getAccountedSize();
- if (accountedSize != 0) {
- MemoryUtil.reserveDirectMemory(accountedSize);
- }
+ // no-op
}
}
private void release0() {
if (refCount.addAndGet(-1) == 0) {
- synchronized (getAllocationManager()) {
- final AllocationManager am = getAllocationManager();
- lDestructionTime[0] = System.nanoTime();
- am.release(this);
- long accountedSize = getAccountedSize();
- if (accountedSize != 0) {
- MemoryUtil.unreserveDirectMemory(am.getSize());
- }
- }
+ destruct();
+ }
+ }
+
+ private void destruct() {
+ if (!destructed.compareAndSet(false, true)) {
+ return;
+ }
+ synchronized (getAllocationManager()) {
+ final AllocationManager am = getAllocationManager();
+ lDestructionTime = System.nanoTime();
+ am.release(this);
}
+ factory.unlink(this);
}
/**
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java
new file mode 100644
index 0000000000000..0a4bd46e773b2
--- /dev/null
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/DirectAllocationListener.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import org.apache.arrow.memory.util.MemoryUtil;
+
+/**
+ * {@link AllocationListener} implementation to reserve bytes on JVM direct memory.
+ */
+public class DirectAllocationListener implements AllocationListener {
+
+ public static final DirectAllocationListener INSTANCE = new DirectAllocationListener();
+
+ @Override
+ public void onPreAllocation(long size) {
+ MemoryUtil.reserveDirectMemory(size);
+ }
+
+ @Override
+ public void onRelease(long size) {
+ MemoryUtil.unreserveDirectMemory(size);
+ }
+}
diff --git a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
index 98d8968a1f2f3..619fec5029c7e 100644
--- a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
+++ b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAutoBufferLedger.java
@@ -38,8 +38,10 @@ public static void beforeClass() {
root = new RootAllocator(
BaseAllocator.configBuilder()
.maxAllocation(MAX_ALLOCATION)
- .bufferLedgerFactory(AutoBufferLedger.FACTORY)
+ .bufferLedgerFactory(AutoBufferLedger.newFactory())
+ .listener(DirectAllocationListener.INSTANCE)
.build());
+ cleanUpJvmReferences();
}
@Test
@@ -127,15 +129,61 @@ public void testManualGCOnCrossAllocatorSharing() {
assertEquals(0L, allocator2.getAllocatedMemory());
}
+ @Test
+ public void testManualGCWithinDirectMemoryReservation() {
+ final BufferAllocator allocator1 = root.newChildAllocator("TEST-CHILD-1", 0, MAX_ALLOCATION);
+ final BufferAllocator allocator2 = root.newChildAllocator("TEST-CHILD-2", 0, MAX_ALLOCATION);
+ long prevAlloc = MemoryUtil.getCurrentDirectMemReservation();
+ ArrowBuf buffer1 = allocator1.buffer(2L);
+ ArrowBuf buffer2 = buffer1.getReferenceManager().retain(buffer1, allocator2);
+ long alloc1 = MemoryUtil.getCurrentDirectMemReservation();
+ assertEquals(2L, alloc1 - prevAlloc);
+ buffer1 = null;
+ cleanUpJvmReferences();
+ long alloc2 = MemoryUtil.getCurrentDirectMemReservation();
+ assertEquals(2L, alloc2 - prevAlloc);
+ buffer2 = null;
+ cleanUpJvmReferences();
+ long alloc3 = MemoryUtil.getCurrentDirectMemReservation();
+ assertEquals(prevAlloc, alloc3);
+ }
+
+ @Test
+ public void testFactoryClose() {
+ final AutoBufferLedger.Factory factory = AutoBufferLedger.newFactory();
+ final BufferAllocator alloc = new RootAllocator(
+ BaseAllocator.configBuilder()
+ .maxAllocation(MAX_ALLOCATION)
+ .bufferLedgerFactory(factory)
+ .build());
+ ArrowBuf buf = alloc.buffer(2);
+ assertEquals(2, buf.capacity());
+ assertEquals(1, buf.slice(1, 1).capacity());
+ assertEquals(2, buf.slice(0, 2).capacity());
+ assertEquals(2L, alloc.getAllocatedMemory());
+ factory.close();
+ assertEquals(0L, alloc.getAllocatedMemory());
+ }
+
private static void cleanUpJvmReferences() {
final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
System.gc();
long prev = System.nanoTime();
- while (jlra.tryHandlePendingReference()) {
+ long sleep = 1L;
+ while (true) {
long elapsed = System.nanoTime() - prev;
- if (TimeUnit.NANOSECONDS.toMillis(elapsed) > 1000L) {
+ if (TimeUnit.NANOSECONDS.toMillis(elapsed) > 500L) {
break;
}
+ if (!jlra.tryHandlePendingReference()) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ sleep = sleep << 1;
+ }
}
}
}
From 682c16604473d44e381f905245fd78a852c4e53d Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Wed, 18 Aug 2021 23:37:19 +0800
Subject: [PATCH 5/9] Visibility
---
.../src/main/java/org/apache/arrow/memory/BaseAllocator.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index d48c87501360a..28e3cbe4343cb 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -727,7 +727,7 @@ public static ImmutableConfig.Builder configBuilder() {
* Config class of {@link BaseAllocator}.
*/
@Value.Immutable
- abstract static class Config {
+ public abstract static class Config {
/**
* Factory for creating {@link AllocationManager} instances.
*/
From 2e59793afe19e314b5da1544c3c97f39eaed23cf Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Wed, 18 Aug 2021 23:51:17 +0800
Subject: [PATCH 6/9] fix
---
.../src/main/java/org/apache/arrow/memory/AutoBufferLedger.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
index cd525a86d4545..33a157ca304ff 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -65,7 +65,6 @@ private void link(AutoBufferLedger ledger) {
tail.next = ledger;
ledger.prev = tail;
tail = ledger;
- System.out.println();
}
}
From 6f6126fe81694f43de3ecaf7ab49d3ffbfc264f0 Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Thu, 19 Aug 2021 00:34:07 +0800
Subject: [PATCH 7/9] reserved fix 4
---
.../org/apache/arrow/memory/AutoBufferLedger.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
index 33a157ca304ff..cc61cb90cbe80 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AutoBufferLedger.java
@@ -76,6 +76,9 @@ private void unlink(AutoBufferLedger ledger) {
if (ledger.prev == ledger) {
throw new IllegalStateException();
}
+ if (ledger == tail) {
+ tail = ledger.prev;
+ }
if (ledger.prev != null) {
ledger.prev.next = ledger.next;
}
@@ -89,10 +92,12 @@ private void unlink(AutoBufferLedger ledger) {
@Override
public void close() {
- while (tail != null) {
- final AutoBufferLedger tmp = tail.prev;
- tail.destruct();
- tail = tmp;
+ synchronized (this) {
+ while (tail != null) {
+ final AutoBufferLedger tmp = tail.prev;
+ tail.destruct();
+ tail = tmp;
+ }
}
}
}
From baa8d63215ff66149accf31ef950a2f97bda0daf Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Mon, 23 Aug 2021 17:32:05 +0800
Subject: [PATCH 8/9] fix
---
cpp/src/arrow/jniutil/jni_util.cc | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/jniutil/jni_util.cc b/cpp/src/arrow/jniutil/jni_util.cc
index acc11bb236098..7f17f70ee45fe 100644
--- a/cpp/src/arrow/jniutil/jni_util.cc
+++ b/cpp/src/arrow/jniutil/jni_util.cc
@@ -111,12 +111,15 @@ class ReservationListenableMemoryPool::Impl {
}
int64_t bytes_granted = (new_block_count - blocks_reserved_) * block_size_;
blocks_reserved_ = new_block_count;
+ if (bytes_reserved_ > max_bytes_reserved_) {
+ max_bytes_reserved_ = bytes_reserved_;
+ }
return bytes_granted;
}
- int64_t bytes_allocated() { return pool_->bytes_allocated(); }
+ int64_t bytes_allocated() const { return bytes_reserved_; }
- int64_t max_memory() { return pool_->max_memory(); }
+ int64_t max_memory() const { return max_bytes_reserved_; }
std::string backend_name() { return pool_->backend_name(); }
@@ -128,6 +131,7 @@ class ReservationListenableMemoryPool::Impl {
int64_t block_size_;
int64_t blocks_reserved_;
int64_t bytes_reserved_;
+ int64_t max_bytes_reserved_ = 0L;
std::mutex mutex_;
};
From 1f00e572b127108555a05204344f5483e1df8d72 Mon Sep 17 00:00:00 2001
From: Hongze Zhang
Date: Mon, 23 Aug 2021 17:59:33 +0800
Subject: [PATCH 9/9] reserved fix 5
---
.../src/main/java/org/apache/arrow/memory/BufferLedger.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
index 7583e8f0035cc..42bd959c2554e 100644
--- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
+++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java
@@ -309,7 +309,7 @@ public BufferAllocator getKey() {
/**
* A factory interface for creating {@link BufferLedger}.
*/
- interface Factory {
+ public interface Factory {
/**
* Create an instance of {@link BufferLedger}.
*