Skip to content

Commit

Permalink
[GH-33743] [Java] Release outstanding buffers when BaseAllocator is b…
Browse files Browse the repository at this point in the history
…eing closed
  • Loading branch information
zhztheplayer committed Jan 18, 2023
1 parent c8d6110 commit eb7f2e6
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
private final BaseAllocator parentAllocator;
private final Map<BaseAllocator, Object> childAllocators;
private final ArrowBuf empty;

// linked list to hold references to alive ledgers
private BufferLedger tail = null;

private final Object REF_LOCK = new Object();

// members used purely for debugging
private final IdentityHashMap<BufferLedger, Object> childLedgers;
private final IdentityHashMap<Reservation, Object> reservations;
Expand Down Expand Up @@ -177,13 +183,50 @@ public ArrowBuf getEmpty() {
return empty;
}

private void appendToRefList(BufferLedger ledger) {
synchronized (REF_LOCK) {
if (ledger.next != null || ledger.prev != null) {
throw new IllegalStateException("already linked");
}
if (tail == null) {
tail = ledger;
return;
}
tail.next = ledger;
ledger.prev = tail;
tail = ledger;
}
}

private void removeFromRefList(BufferLedger ledger) {
synchronized (REF_LOCK) {
if (ledger.next == ledger) {
return;
}
if (ledger.prev == ledger) {
throw new IllegalStateException();
}
if (ledger == tail) {
tail = ledger.prev;
}
if (ledger.prev != null) {
ledger.prev.next = ledger.next;
}
if (ledger.next != null) {
ledger.next.prev = ledger.prev;
}
ledger.prev = ledger;
ledger.next = ledger;
}
}

/**
* For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
* we have a new ledger
* Allows an AllocationManager to tell the allocator that we have a new ledger
* associated with this allocator.
*/
void associateLedger(BufferLedger ledger) {
assertOpen();
appendToRefList(ledger);
if (DEBUG) {
synchronized (DEBUG_LOCK) {
childLedgers.put(ledger, null);
Expand All @@ -192,12 +235,12 @@ void associateLedger(BufferLedger ledger) {
}

/**
* For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
* we are removing a
* Allows an AllocationManager to tell the allocator that we are removing a
* ledger associated with this allocator
*/
void dissociateLedger(BufferLedger ledger) {
assertOpen();
removeFromRefList(ledger);
if (DEBUG) {
synchronized (DEBUG_LOCK) {
if (!childLedgers.containsKey(ledger)) {
Expand Down Expand Up @@ -376,8 +419,6 @@ public synchronized void close() {
return;
}

isClosed = true;

StringBuilder outstandingChildAllocators = new StringBuilder();
if (DEBUG) {
synchronized (DEBUG_LOCK) {
Expand Down Expand Up @@ -434,12 +475,35 @@ public synchronized void close() {
String msg = String.format("Memory was leaked by query. Memory leaked: (%d)\n%s%s", allocated,
outstandingChildAllocators.toString(), toString());
logger.error(msg);
}

// Release outstanding buffers
synchronized (REF_LOCK) {
int releasedLedgerCount = 0;
while (tail != null) {
final BufferLedger tmp = tail.prev;
tail.destroy();
tail = tmp;
releasedLedgerCount++;
}
if (releasedLedgerCount != 0) {
String msg = String.format("Released %d outstanding buffer ledgers.", releasedLedgerCount);
logger.warn(msg);
}
}

final long remaining = getAllocatedMemory();
if (remaining > 0) {
String msg = String.format("Memory is still leaked after final clean-up: (%d)\n%s%s", remaining,
outstandingChildAllocators.toString(), toString());
throw new IllegalStateException(msg);
}

// we need to release our memory to our parent before we tell it we've closed.
super.close();

isClosed = true;

// Inform our parent allocator that we've closed
if (parentAllocator != null) {
parentAllocator.childClosed(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class BufferLedger implements ValueWithKeyIncluded<BufferAllocator>, Refe
private final HistoricalLog historicalLog =
BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1) : null;

BufferLedger prev = null;

BufferLedger next = null;

private volatile long lDestructionTime = 0;

BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
Expand Down Expand Up @@ -131,6 +136,17 @@ public boolean release(int decrement) {
return refCnt == 0;
}

/**
* Forcibly release the buffer ledger by setting reference count
* to zero. Used by BaseAllocator to reclaim outstanding buffers
* when being closed.
*/
void destroy() {
synchronized (allocationManager) {
release(bufRefCnt.get());
}
}

/**
* Decrement the ledger's reference count for the associated underlying
* memory chunk. If the reference count drops to 0, it implies that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void test_privateMax() throws Exception {
}
}

@Test(expected = IllegalStateException.class)
@Test()
public void testRootAllocator_closeWithOutstanding() throws Exception {
try {
try (final RootAllocator rootAllocator =
Expand All @@ -110,6 +110,19 @@ public void testRootAllocator_closeWithOutstanding() throws Exception {
}
}

@Test()
public void testRootAllocator_closeChildWithOutstanding() throws Exception {
try (final RootAllocator parent =
new RootAllocator(MAX_ALLOCATION)) {
final ArrowBuf parentBuf = parent.buffer(512);
assertNotNull("allocation failed", parentBuf);
try (BufferAllocator child1 = parent.newChildAllocator("child1", 0L, MAX_ALLOCATION)) {
final ArrowBuf childBuf = child1.buffer(512);
assertNotNull("allocation failed", childBuf);
}
}
}

@Test
public void testRootAllocator_getEmpty() throws Exception {
try (final RootAllocator rootAllocator =
Expand Down

0 comments on commit eb7f2e6

Please sign in to comment.