diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 5a96ee08de947b..5ceadc4934085f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.StampedLock; +import java.util.function.Function; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; @@ -50,28 +51,25 @@ private static class Item { PositionImpl position; int idx; - Item(ManagedCursor cursor, int idx) { + Item(ManagedCursor cursor, PositionImpl position, int idx) { this.cursor = cursor; - this.position = (PositionImpl) cursor.getMarkDeletedPosition(); + this.position = position; this.idx = idx; } } - public enum CursorType { - DurableCursor, - NonDurableCursor, - ALL + public ManagedCursorContainer() { + this(cursor -> (PositionImpl) cursor.getMarkDeletedPosition()); } - public ManagedCursorContainer() { - cursorType = CursorType.DurableCursor; + private ManagedCursorContainer(Function positionFunction) { + this.positionFunction = positionFunction; } - public ManagedCursorContainer(CursorType cursorType) { - this.cursorType = cursorType; + public static ManagedCursorContainer createWithReadPositionOrdering() { + return new ManagedCursorContainer(cursor -> (PositionImpl) cursor.getReadPosition()); } - private final CursorType cursorType; // Used to keep track of slowest cursor. Contains all of all active cursors. private final ArrayList heap = new ArrayList(); @@ -81,46 +79,26 @@ public ManagedCursorContainer(CursorType cursorType) { private final StampedLock rwLock = new StampedLock(); + private int durableCursorCount; + + private final Function positionFunction; + public void add(ManagedCursor cursor) { long stamp = rwLock.writeLock(); try { // Append a new entry at the end of the list - Item item = new Item(cursor, heap.size()); + Item item = new Item(cursor, positionFunction.apply(cursor), heap.size()); cursors.put(cursor.getName(), item); - - if (shouldTrackInHeap(cursor)) { - heap.add(item); - siftUp(item); + heap.add(item); + siftUp(item); + if (cursor.isDurable()) { + durableCursorCount++; } } finally { rwLock.unlockWrite(stamp); } } - private boolean shouldTrackInHeap(ManagedCursor cursor) { - return CursorType.ALL.equals(cursorType) - || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType)) - || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType)); - } - - public PositionImpl getSlowestReadPositionForActiveCursors() { - long stamp = rwLock.readLock(); - try { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); - } finally { - rwLock.unlockRead(stamp); - } - } - - public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() { - long stamp = rwLock.readLock(); - try { - return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition(); - } finally { - rwLock.unlockRead(stamp); - } - } - public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { @@ -135,13 +113,16 @@ public void removeCursor(String name) { long stamp = rwLock.writeLock(); try { Item item = cursors.remove(name); - if (item != null && shouldTrackInHeap(item.cursor)) { + if (item != null) { // Move the item to the right end of the heap to be removed Item lastItem = heap.get(heap.size() - 1); swap(item, lastItem); heap.remove(item.idx); // Update the heap siftDown(lastItem); + if (item.cursor.isDurable()) { + durableCursorCount--; + } } } finally { rwLock.unlockWrite(stamp); @@ -165,24 +146,20 @@ public Pair cursorUpdated(ManagedCursor cursor, Posi return null; } + PositionImpl previousSlowestConsumer = heap.get(0).position; - if (shouldTrackInHeap(item.cursor)) { - PositionImpl previousSlowestConsumer = heap.get(0).position; + // When the cursor moves forward, we need to push it toward the + // bottom of the tree and push it up if a reset was done - // When the cursor moves forward, we need to push it toward the - // bottom of the tree and push it up if a reset was done - - item.position = (PositionImpl) newPosition; - if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { - siftDown(item); - } else { - siftUp(item); - } - - PositionImpl newSlowestConsumer = heap.get(0).position; - return Pair.of(previousSlowestConsumer, newSlowestConsumer); + item.position = (PositionImpl) newPosition; + if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) { + siftDown(item); + } else { + siftUp(item); } - return null; + + PositionImpl newSlowestConsumer = heap.get(0).position; + return Pair.of(previousSlowestConsumer, newSlowestConsumer); } finally { rwLock.unlockWrite(stamp); } @@ -237,18 +214,18 @@ public boolean isEmpty() { */ public boolean hasDurableCursors() { long stamp = rwLock.tryOptimisticRead(); - boolean isEmpty = heap.isEmpty(); + int count = durableCursorCount; if (!rwLock.validate(stamp)) { // Fallback to read lock stamp = rwLock.readLock(); try { - isEmpty = heap.isEmpty(); + count = durableCursorCount; } finally { rwLock.unlockRead(stamp); } } - return !isEmpty; + return count > 0; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 72b8dc5a8e2ff1..3943750fdf4e75 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -635,6 +635,7 @@ private void recoveredCursor(PositionImpl position, Map properties persistentMarkDeletePosition = position; inProgressMarkDeletePersistPosition = null; readPosition = ledger.getNextValidPosition(position); + ledger.updateReadPosition(this, readPosition); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null); // assign cursor-ledger so, it can be deleted when new ledger will be switched this.cursorLedger = recoveredFromCursorLedger; @@ -1215,6 +1216,7 @@ public void operationComplete() { ledger.getName(), newPosition, oldReadPosition, name); } readPosition = newPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newPosition); } finally { lock.writeLock().unlock(); } @@ -1701,6 +1703,7 @@ boolean hasMoreEntries(PositionImpl position) { void initializeCursorPosition(Pair lastPositionCounter) { readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft()); + ledger.updateReadPosition(this, readPosition); markDeletePosition = lastPositionCounter.getLeft(); lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null); persistentMarkDeletePosition = null; @@ -1775,6 +1778,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition); } + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); return newReadPosition; } else { return currentReadPosition; @@ -2356,6 +2360,7 @@ public void rewind() { log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition); readPosition = newReadPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2373,6 +2378,7 @@ public void seek(Position newReadPositionInt, boolean force) { newReadPosition = ledger.getNextValidPosition(markDeletePosition); } readPosition = newReadPosition; + ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); } @@ -2559,6 +2565,7 @@ void setReadPosition(Position newReadPositionInt) { if (this.markDeletePosition == null || ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) { this.readPosition = (PositionImpl) newReadPositionInt; + ledger.updateReadPosition(this, newReadPositionInt); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index aa7c19b32bd020..68a38fb5138678 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -166,8 +166,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final ManagedCursorContainer cursors = new ManagedCursorContainer(); private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); - private final ManagedCursorContainer nonDurableActiveCursors = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); + + private final ManagedCursorContainer activeCursorsSortedByReadPosition = + ManagedCursorContainer.createWithReadPositionOrdering(); // Ever increasing counter of entries added @VisibleForTesting @@ -2192,11 +2193,9 @@ void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) public PositionImpl getEvictionPosition(){ PositionImpl evictionPos; if (config.isCacheEvictionByMarkDeletedPosition()) { - PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors(); - evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null; + evictionPos = activeCursors.getSlowestReaderPosition(); } else { - // Always remove all entries already read by active cursors - evictionPos = getEarlierReadPositionForActiveCursors(); + evictionPos = activeCursorsSortedByReadPosition.getSlowestReaderPosition(); } return evictionPos; } @@ -2213,30 +2212,6 @@ void doCacheEviction(long maxTimestamp) { entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); } - private PositionImpl getEarlierReadPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; - } - if (durablePosition == null) { - return nonDurablePosition; - } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; - } - - private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { - PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors(); - PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors(); - if (nonDurablePosition == null) { - return durablePosition; - } - if (durablePosition == null) { - return nonDurablePosition; - } - return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; - } - void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); if (pair == null) { @@ -2259,6 +2234,12 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { } } + public void updateReadPosition(ManagedCursorImpl cursor, Position newReadPosition) { + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.cursorUpdated(cursor, newReadPosition); + } + } + PositionImpl startReadOperationOnLedger(PositionImpl position) { Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); if (ledgerId != null && ledgerId != position.getLedgerId()) { @@ -3512,9 +3493,9 @@ Pair getFirstPositionAndCounter() { public void activateCursor(ManagedCursor cursor) { if (activeCursors.get(cursor.getName()) == null) { activeCursors.add(cursor); - } - if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) { - nonDurableActiveCursors.add(cursor); + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.add(cursor); + } } } @@ -3522,6 +3503,9 @@ public void deactivateCursor(ManagedCursor cursor) { synchronized (activeCursors) { if (activeCursors.get(cursor.getName()) != null) { activeCursors.removeCursor(cursor.getName()); + if (!config.isCacheEvictionByMarkDeletedPosition()) { + activeCursorsSortedByReadPosition.removeCursor(cursor.getName()); + } if (!activeCursors.hasDurableCursors()) { // cleanup cache if there is no active subscription entryCache.clear(); @@ -3532,9 +3516,6 @@ public void deactivateCursor(ManagedCursor cursor) { getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition())); } } - if (!cursor.isDurable()) { - nonDurableActiveCursors.removeCursor(cursor.getName()); - } } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 1d9315ee2967d9..d63238ea8cb55f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -402,9 +402,8 @@ public boolean isClosed() { @Test public void testSlowestReadPositionForActiveCursors() throws Exception { - ManagedCursorContainer container = - new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); - assertNull(container.getSlowestReadPositionForActiveCursors()); + ManagedCursorContainer container = ManagedCursorContainer.createWithReadPositionOrdering(); + assertNull(container.getSlowestReaderPosition()); // Add no durable cursor PositionImpl position = PositionImpl.get(5,5); @@ -412,7 +411,7 @@ public void testSlowestReadPositionForActiveCursors() throws Exception { doReturn(false).when(cursor1).isDurable(); doReturn(position).when(cursor1).getReadPosition(); container.add(cursor1); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Add no durable cursor position = PositionImpl.get(1,1); @@ -420,23 +419,23 @@ public void testSlowestReadPositionForActiveCursors() throws Exception { doReturn(false).when(cursor2).isDurable(); doReturn(position).when(cursor2).getReadPosition(); container.add(cursor2); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1)); // Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5 position = PositionImpl.get(5,6); container.cursorUpdated(cursor2, position); doReturn(position).when(cursor2).getReadPosition(); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5)); // Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6 position = PositionImpl.get(5,8); doReturn(position).when(cursor1).getReadPosition(); container.cursorUpdated(cursor1, position); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6)); // Remove cursor, only cursor1 left, cursor1 = 5:8 container.removeCursor(cursor2.getName()); - assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8)); + assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8)); } @Test