From eb7f2e6e574637f8d1f0cedce39f93af57157a0e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 18 Jan 2023 18:20:15 +0800 Subject: [PATCH] [GH-33743] [Java] Release outstanding buffers when BaseAllocator is being closed --- .../apache/arrow/memory/BaseAllocator.java | 76 +++++++++++++++++-- .../org/apache/arrow/memory/BufferLedger.java | 16 ++++ .../arrow/memory/TestBaseAllocator.java | 15 +++- 3 files changed, 100 insertions(+), 7 deletions(-) diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 8d21cef7aa382..1472398c24998 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -68,6 +68,12 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator { private final BaseAllocator parentAllocator; private final Map childAllocators; private final ArrowBuf empty; + + // linked list to hold references to alive ledgers + private BufferLedger tail = null; + + private final Object REF_LOCK = new Object(); + // members used purely for debugging private final IdentityHashMap childLedgers; private final IdentityHashMap reservations; @@ -177,13 +183,50 @@ public ArrowBuf getEmpty() { return empty; } + private void appendToRefList(BufferLedger ledger) { + synchronized (REF_LOCK) { + if (ledger.next != null || ledger.prev != null) { + throw new IllegalStateException("already linked"); + } + if (tail == null) { + tail = ledger; + return; + } + tail.next = ledger; + ledger.prev = tail; + tail = ledger; + } + } + + private void removeFromRefList(BufferLedger ledger) { + synchronized (REF_LOCK) { + if (ledger.next == ledger) { + return; + } + if (ledger.prev == ledger) { + throw new IllegalStateException(); + } + if (ledger == tail) { + tail = ledger.prev; + } + if (ledger.prev != null) { + ledger.prev.next = ledger.next; + } + if (ledger.next != null) { + ledger.next.prev = ledger.prev; + } + ledger.prev = ledger; + ledger.next = ledger; + } + } + /** - * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that - * we have a new ledger + * Allows an AllocationManager to tell the allocator that we have a new ledger * associated with this allocator. */ void associateLedger(BufferLedger ledger) { assertOpen(); + appendToRefList(ledger); if (DEBUG) { synchronized (DEBUG_LOCK) { childLedgers.put(ledger, null); @@ -192,12 +235,12 @@ void associateLedger(BufferLedger ledger) { } /** - * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that - * we are removing a + * Allows an AllocationManager to tell the allocator that we are removing a * ledger associated with this allocator */ void dissociateLedger(BufferLedger ledger) { assertOpen(); + removeFromRefList(ledger); if (DEBUG) { synchronized (DEBUG_LOCK) { if (!childLedgers.containsKey(ledger)) { @@ -376,8 +419,6 @@ public synchronized void close() { return; } - isClosed = true; - StringBuilder outstandingChildAllocators = new StringBuilder(); if (DEBUG) { synchronized (DEBUG_LOCK) { @@ -434,12 +475,35 @@ public synchronized void close() { String msg = String.format("Memory was leaked by query. Memory leaked: (%d)\n%s%s", allocated, outstandingChildAllocators.toString(), toString()); logger.error(msg); + } + + // Release outstanding buffers + synchronized (REF_LOCK) { + int releasedLedgerCount = 0; + while (tail != null) { + final BufferLedger tmp = tail.prev; + tail.destroy(); + tail = tmp; + releasedLedgerCount++; + } + if (releasedLedgerCount != 0) { + String msg = String.format("Released %d outstanding buffer ledgers.", releasedLedgerCount); + logger.warn(msg); + } + } + + final long remaining = getAllocatedMemory(); + if (remaining > 0) { + String msg = String.format("Memory is still leaked after final clean-up: (%d)\n%s%s", remaining, + outstandingChildAllocators.toString(), toString()); throw new IllegalStateException(msg); } // we need to release our memory to our parent before we tell it we've closed. super.close(); + isClosed = true; + // Inform our parent allocator that we've closed if (parentAllocator != null) { parentAllocator.childClosed(this); diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java index 48b3e183d5ae0..e7673a55ba547 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -46,6 +46,11 @@ public class BufferLedger implements ValueWithKeyIncluded, Refe private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; + + BufferLedger prev = null; + + BufferLedger next = null; + private volatile long lDestructionTime = 0; BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) { @@ -131,6 +136,17 @@ public boolean release(int decrement) { return refCnt == 0; } + /** + * Forcibly release the buffer ledger by setting reference count + * to zero. Used by BaseAllocator to reclaim outstanding buffers + * when being closed. + */ + void destroy() { + synchronized (allocationManager) { + release(bufRefCnt.get()); + } + } + /** * Decrement the ledger's reference count for the associated underlying * memory chunk. If the reference count drops to 0, it implies that diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index ef49e41785fb6..01980bee469a2 100644 --- a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -87,7 +87,7 @@ public void test_privateMax() throws Exception { } } - @Test(expected = IllegalStateException.class) + @Test() public void testRootAllocator_closeWithOutstanding() throws Exception { try { try (final RootAllocator rootAllocator = @@ -110,6 +110,19 @@ public void testRootAllocator_closeWithOutstanding() throws Exception { } } + @Test() + public void testRootAllocator_closeChildWithOutstanding() throws Exception { + try (final RootAllocator parent = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf parentBuf = parent.buffer(512); + assertNotNull("allocation failed", parentBuf); + try (BufferAllocator child1 = parent.newChildAllocator("child1", 0L, MAX_ALLOCATION)) { + final ArrowBuf childBuf = child1.buffer(512); + assertNotNull("allocation failed", childBuf); + } + } + } + @Test public void testRootAllocator_getEmpty() throws Exception { try (final RootAllocator rootAllocator =