diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index 5a96ee08de947..b5a5be733a136 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -30,17 +30,16 @@
import org.apache.commons.lang3.tuple.Pair;
/**
- * Contains all the cursors for a ManagedLedger.
+ * Contains cursors for a ManagedLedger.
*
*
The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
- * This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
- * in the linked-list. The list is a sorted double linked-list of cursors.
+ * This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
+ * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
+ * in a single array. More details about heap implementations:
+ * https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
- * When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
- *
- * To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
- * care about ledgers to be deleted.
+ * The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable {
@@ -50,30 +49,18 @@ private static class Item {
PositionImpl position;
int idx;
- Item(ManagedCursor cursor, int idx) {
+ Item(ManagedCursor cursor, PositionImpl position, int idx) {
this.cursor = cursor;
- this.position = (PositionImpl) cursor.getMarkDeletedPosition();
+ this.position = position;
this.idx = idx;
}
}
- public enum CursorType {
- DurableCursor,
- NonDurableCursor,
- ALL
- }
-
public ManagedCursorContainer() {
- cursorType = CursorType.DurableCursor;
- }
- public ManagedCursorContainer(CursorType cursorType) {
- this.cursorType = cursorType;
}
- private final CursorType cursorType;
-
- // Used to keep track of slowest cursor. Contains all of all active cursors.
+ // Used to keep track of slowest cursor.
private final ArrayList- heap = new ArrayList();
// Maps a cursor to its position in the heap
@@ -81,46 +68,35 @@ public ManagedCursorContainer(CursorType cursorType) {
private final StampedLock rwLock = new StampedLock();
- public void add(ManagedCursor cursor) {
+ private int durableCursorCount;
+
+
+ /**
+ * Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when
+ * a position is passed as the second argument. It is expected that the position is updated with
+ * {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes.
+ *
+ * @param cursor cursor to add
+ * @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
+ * tracked for the slowest reader.
+ */
+ public void add(ManagedCursor cursor, Position position) {
long stamp = rwLock.writeLock();
try {
- // Append a new entry at the end of the list
- Item item = new Item(cursor, heap.size());
+ Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1);
cursors.put(cursor.getName(), item);
-
- if (shouldTrackInHeap(cursor)) {
+ if (position != null) {
heap.add(item);
siftUp(item);
}
+ if (cursor.isDurable()) {
+ durableCursorCount++;
+ }
} finally {
rwLock.unlockWrite(stamp);
}
}
- private boolean shouldTrackInHeap(ManagedCursor cursor) {
- return CursorType.ALL.equals(cursorType)
- || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
- || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
- }
-
- public PositionImpl getSlowestReadPositionForActiveCursors() {
- long stamp = rwLock.readLock();
- try {
- return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
-
- public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
- long stamp = rwLock.readLock();
- try {
- return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
-
public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
@@ -131,17 +107,25 @@ public ManagedCursor get(String name) {
}
}
- public void removeCursor(String name) {
+ public boolean removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
- if (item != null && shouldTrackInHeap(item.cursor)) {
- // Move the item to the right end of the heap to be removed
- Item lastItem = heap.get(heap.size() - 1);
- swap(item, lastItem);
- heap.remove(item.idx);
- // Update the heap
- siftDown(lastItem);
+ if (item != null) {
+ if (item.idx >= 0) {
+ // Move the item to the right end of the heap to be removed
+ Item lastItem = heap.get(heap.size() - 1);
+ swap(item, lastItem);
+ heap.remove(item.idx);
+ // Update the heap
+ siftDown(lastItem);
+ }
+ if (item.cursor.isDurable()) {
+ durableCursorCount--;
+ }
+ return true;
+ } else {
+ return false;
}
} finally {
rwLock.unlockWrite(stamp);
@@ -149,10 +133,15 @@ public void removeCursor(String name) {
}
/**
- * Signal that a cursor position has been updated and that the container must re-order the cursor list.
+ * Signal that a cursor position has been updated and that the container must re-order the cursor heap
+ * tracking the slowest reader.
+ * Only those cursors are tracked and can be updated which were added to the container with the
+ * {@link #add(ManagedCursor, Position)} method that specified the initial position in the position
+ * parameter.
*
- * @param cursor
- * @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
+ * @param cursor the cursor to update the position for
+ * @param newPosition the updated position for the cursor
+ * @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
* update).
*/
public Pair cursorUpdated(ManagedCursor cursor, Position newPosition) {
@@ -161,35 +150,33 @@ public Pair cursorUpdated(ManagedCursor cursor, Posi
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
- if (item == null) {
+ if (item == null || item.idx == -1) {
return null;
}
+ PositionImpl previousSlowestConsumer = heap.get(0).position;
- if (shouldTrackInHeap(item.cursor)) {
- PositionImpl previousSlowestConsumer = heap.get(0).position;
-
+ item.position = (PositionImpl) newPosition;
+ if (heap.size() > 1) {
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done
- item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}
-
- PositionImpl newSlowestConsumer = heap.get(0).position;
- return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
- return null;
+
+ PositionImpl newSlowestConsumer = heap.get(0).position;
+ return Pair.of(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}
}
/**
- * Get the slowest reader position, meaning older acknowledged position between all the cursors.
+ * Get the slowest reader position for the cursors that are ordered.
*
* @return the slowest reader position
*/
@@ -237,18 +224,18 @@ public boolean isEmpty() {
*/
public boolean hasDurableCursors() {
long stamp = rwLock.tryOptimisticRead();
- boolean isEmpty = heap.isEmpty();
+ int count = durableCursorCount;
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
- isEmpty = heap.isEmpty();
+ count = durableCursorCount;
} finally {
rwLock.unlockRead(stamp);
}
}
- return !isEmpty;
+ return count > 0;
}
@Override
@@ -291,7 +278,7 @@ public ManagedCursor next() {
@Override
public void remove() {
- throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
+ throw new IllegalArgumentException("Cannot remove ManagedCursor from container");
}
};
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7d1f4e0fa2339..da855197df6b3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -635,6 +635,7 @@ private void recoveredCursor(PositionImpl position, Map properties
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
+ ledger.onCursorReadPositionUpdated(this, readPosition);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
this.cursorLedger = recoveredFromCursorLedger;
@@ -1215,6 +1216,7 @@ public void operationComplete() {
ledger.getName(), newPosition, oldReadPosition, name);
}
readPosition = newPosition;
+ ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
} finally {
lock.writeLock().unlock();
}
@@ -1708,6 +1710,7 @@ boolean hasMoreEntries(PositionImpl position) {
void initializeCursorPosition(Pair lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
+ ledger.onCursorReadPositionUpdated(this, readPosition);
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
@@ -1782,6 +1785,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
}
+ ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
return newReadPosition;
} else {
return currentReadPosition;
@@ -2021,7 +2025,7 @@ public void operationComplete() {
lock.writeLock().unlock();
}
- ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition);
+ ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);
decrementPendingMarkDeleteCount();
@@ -2384,6 +2388,7 @@ public void rewind() {
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
readPosition = newReadPosition;
+ ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
@@ -2401,6 +2406,7 @@ public void seek(Position newReadPositionInt, boolean force) {
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}
readPosition = newReadPosition;
+ ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
@@ -2601,6 +2607,7 @@ void setReadPosition(Position newReadPositionInt) {
if (this.markDeletePosition == null
|| ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
this.readPosition = (PositionImpl) newReadPositionInt;
+ ledger.onCursorReadPositionUpdated(this, newReadPositionInt);
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 230deb27d17d3..3c6af75dd0889 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -163,10 +163,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected final NavigableMap ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;
+ // contains all cursors, where durable cursors are ordered by mark delete position
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
+ // contains active cursors eligible for caching,
+ // ordered by read position (when cacheEvictionByMarkDeletedPosition=false) or by mark delete position
+ // (when cacheEvictionByMarkDeletedPosition=true)
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
- private final ManagedCursorContainer nonDurableActiveCursors =
- new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+
// Ever increasing counter of entries added
@VisibleForTesting
@@ -555,7 +558,7 @@ public void operationComplete() {
log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
- cursors.add(cursor);
+ addCursor(cursor);
if (cursorCount.decrementAndGet() == 0) {
// The initialization is now completed, register the jmx mbean
@@ -589,7 +592,7 @@ public void operationComplete() {
cursorName, cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
synchronized (ManagedLedgerImpl.this) {
- cursors.add(cursor);
+ addCursor(cursor);
uninitializedCursors.remove(cursor.getName()).complete(cursor);
}
}
@@ -616,6 +619,17 @@ public void operationFailed(MetaStoreException e) {
});
}
+ private void addCursor(ManagedCursorImpl cursor) {
+ Position positionForOrdering = null;
+ if (cursor.isDurable()) {
+ positionForOrdering = cursor.getMarkDeletedPosition();
+ if (positionForOrdering == null) {
+ positionForOrdering = PositionImpl.EARLIEST;
+ }
+ }
+ cursors.add(cursor, positionForOrdering);
+ }
+
@Override
public String getName() {
return name;
@@ -956,7 +970,7 @@ public void operationComplete() {
: getFirstPositionAndCounter());
synchronized (ManagedLedgerImpl.this) {
- cursors.add(cursor);
+ addCursor(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}
callback.openCursorComplete(cursor, ctx);
@@ -984,6 +998,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
return;
} else if (!cursor.isDurable()) {
cursors.removeCursor(consumerName);
+ deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
return;
}
@@ -995,17 +1010,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
public void operationComplete(Void result, Stat stat) {
cursor.asyncDeleteCursorLedger();
cursors.removeCursor(consumerName);
-
- // Redo invalidation of entries in cache
- PositionImpl slowestConsumerPosition = cursors.getSlowestReaderPosition();
- if (slowestConsumerPosition != null) {
- if (log.isDebugEnabled()) {
- log.debug("Doing cache invalidation up to {}", slowestConsumerPosition);
- }
- entryCache.invalidateEntries(slowestConsumerPosition);
- } else {
- entryCache.clear();
- }
+ deactivateCursorByName(consumerName);
trimConsumedLedgersInBackground();
@@ -1088,7 +1093,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
log.info("[{}] Opened new cursor: {}", name, cursor);
synchronized (this) {
- cursors.add(cursor);
+ addCursor(cursor);
}
return cursor;
@@ -2178,62 +2183,36 @@ public boolean hasMoreEntries(PositionImpl position) {
return result;
}
- void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) {
- Pair pair = activeCursors.cursorUpdated(cursor, newPosition);
- if (pair != null) {
- entryCache.invalidateEntries(pair.getRight());
+ void doCacheEviction(long maxTimestamp) {
+ if (entryCache.getSize() > 0) {
+ entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
}
}
- public PositionImpl getEvictionPosition(){
- PositionImpl evictionPos;
- if (config.isCacheEvictionByMarkDeletedPosition()) {
- PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
- evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
- } else {
- // Always remove all entries already read by active cursors
- evictionPos = getEarlierReadPositionForActiveCursors();
- }
- return evictionPos;
- }
- void doCacheEviction(long maxTimestamp) {
+ // slowest reader position is earliest mark delete position when cacheEvictionByMarkDeletedPosition=true
+ // it is the earliest read position when cacheEvictionByMarkDeletedPosition=false
+ private void invalidateEntriesUpToSlowestReaderPosition() {
if (entryCache.getSize() <= 0) {
return;
}
- PositionImpl evictionPos = getEvictionPosition();
- if (evictionPos != null) {
- entryCache.invalidateEntries(evictionPos);
- }
-
- // Remove entries older than the cutoff threshold
- entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
- }
-
- private PositionImpl getEarlierReadPositionForActiveCursors() {
- PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
- PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
- if (nonDurablePosition == null) {
- return durablePosition;
- }
- if (durablePosition == null) {
- return nonDurablePosition;
+ if (!activeCursors.isEmpty()) {
+ PositionImpl evictionPos = activeCursors.getSlowestReaderPosition();
+ if (evictionPos != null) {
+ entryCache.invalidateEntries(evictionPos);
+ }
+ } else {
+ entryCache.clear();
}
- return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}
- private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
- PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
- PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
- if (nonDurablePosition == null) {
- return durablePosition;
+ void onCursorMarkDeletePositionUpdated(ManagedCursorImpl cursor, PositionImpl newPosition) {
+ if (config.isCacheEvictionByMarkDeletedPosition()) {
+ updateActiveCursor(cursor, newPosition);
}
- if (durablePosition == null) {
- return nonDurablePosition;
+ if (!cursor.isDurable()) {
+ // non-durable cursors aren't tracked for trimming
+ return;
}
- return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
- }
-
- void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair pair = cursors.cursorUpdated(cursor, newPosition);
if (pair == null) {
// Cursor has been removed in the meantime
@@ -2255,6 +2234,20 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
}
}
+ private void updateActiveCursor(ManagedCursorImpl cursor, Position newPosition) {
+ Pair slowestPositions = activeCursors.cursorUpdated(cursor, newPosition);
+ if (slowestPositions != null
+ && !slowestPositions.getLeft().equals(slowestPositions.getRight())) {
+ invalidateEntriesUpToSlowestReaderPosition();
+ }
+ }
+
+ public void onCursorReadPositionUpdated(ManagedCursorImpl cursor, Position newReadPosition) {
+ if (!config.isCacheEvictionByMarkDeletedPosition()) {
+ updateActiveCursor(cursor, newReadPosition);
+ }
+ }
+
PositionImpl startReadOperationOnLedger(PositionImpl position) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (ledgerId != null && ledgerId != position.getLedgerId()) {
@@ -2315,7 +2308,7 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
try {
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
- updateCursor((ManagedCursorImpl) cursor, lastAckedPosition);
+ onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition);
} catch (Exception e) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
@@ -3499,34 +3492,32 @@ Pair getFirstPositionAndCounter() {
}
public void activateCursor(ManagedCursor cursor) {
- if (activeCursors.get(cursor.getName()) == null) {
- activeCursors.add(cursor);
- }
- if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
- nonDurableActiveCursors.add(cursor);
+ synchronized (activeCursors) {
+ if (activeCursors.get(cursor.getName()) == null) {
+ Position positionForOrdering = config.isCacheEvictionByMarkDeletedPosition()
+ ? cursor.getMarkDeletedPosition()
+ : cursor.getReadPosition();
+ if (positionForOrdering == null) {
+ positionForOrdering = PositionImpl.EARLIEST;
+ }
+ activeCursors.add(cursor, positionForOrdering);
+ }
}
}
public void deactivateCursor(ManagedCursor cursor) {
+ deactivateCursorByName(cursor.getName());
+ }
+
+ private void deactivateCursorByName(String cursorName) {
synchronized (activeCursors) {
- if (activeCursors.get(cursor.getName()) != null) {
- activeCursors.removeCursor(cursor.getName());
- if (!activeCursors.hasDurableCursors()) {
- // cleanup cache if there is no active subscription
- entryCache.clear();
- } else {
- // if removed subscription was the slowest subscription : update cursor and let it clear cache:
- // till new slowest-cursor's read-position
- discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(),
- getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
- }
- }
- if (!cursor.isDurable()) {
- nonDurableActiveCursors.removeCursor(cursor.getName());
+ if (activeCursors.removeCursor(cursorName)) {
+ invalidateEntriesUpToSlowestReaderPosition();
}
}
}
+
public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index de64cb67362c8..918cc22978b25 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -104,7 +104,7 @@ protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map result = new CompletableFuture<>();
- ManagedLedgerConfig config = new ManagedLedgerConfig();
- config.setCacheEvictionByMarkDeletedPosition(true);
- factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
- .toNanos(30000));
- factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
- @Override
- public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
- @Override
- public void openCursorComplete(ManagedCursor cursor, Object ctx) {
- ManagedLedger ledger = (ManagedLedger) ctx;
- String message1 = "test";
- ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData, Object ctx) {
- try {
- @SuppressWarnings("unchecked")
- Pair pair = (Pair) ctx;
- ManagedLedger ledger = pair.getLeft();
- ManagedCursor cursor = pair.getRight();
- if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
- result.complete(false);
- return;
- }
-
- ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
- ledgerImpl.getActiveCursors().removeCursor(cursor.getName());
- assertNull(ledgerImpl.getEvictionPosition());
- assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length);
- ledgerImpl.doCacheEviction(System.nanoTime());
- assertTrue(ledgerImpl.getCacheSize() <= 0);
- result.complete(true);
- } catch (Throwable e) {
- result.completeExceptionally(e);
- }
- }
-
- @Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, Pair.of(ledger, cursor));
- }
-
- @Override
- public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, ledger);
- }
+ public void shouldKeepEntriesInCacheByEarliestReadPosition() throws ManagedLedgerException, InterruptedException {
+ // This test case reproduces issue #16054
- @Override
- public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, null, null);
- assertTrue(result.get());
-
- log.info("Test completed");
- }
-
- @Test
- public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
- CompletableFuture result = new CompletableFuture<>();
ManagedLedgerConfig config = new ManagedLedgerConfig();
- config.setCacheEvictionByMarkDeletedPosition(true);
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
- factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
- @Override
- public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
- @Override
- public void openCursorComplete(ManagedCursor cursor, Object ctx) {
- ManagedLedger ledger = (ManagedLedger) ctx;
- String message1 = "test";
- ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData, Object ctx) {
- @SuppressWarnings("unchecked")
- Pair pair = (Pair) ctx;
- ManagedLedger ledger = pair.getLeft();
- ManagedCursor cursor = pair.getRight();
- if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
- result.complete(false);
- return;
- }
- cursor.asyncReadEntries(1, new ReadEntriesCallback() {
- @Override
- public void readEntriesComplete(List entries, Object ctx) {
- ManagedCursor cursor = (ManagedCursor) ctx;
- assertEquals(entries.size(), 1);
- Entry entry = entries.get(0);
- final Position position = entry.getPosition();
- if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
- result.complete(false);
- return;
- }
- ((ManagedLedgerImpl) ledger).doCacheEviction(
- System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
- if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
- result.complete(false);
- return;
- }
- log.debug("Mark-Deleting to position {}", position);
- cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
- @Override
- public void markDeleteComplete(Object ctx) {
- log.debug("Mark delete complete");
- ManagedCursor cursor = (ManagedCursor) ctx;
- if (cursor.hasMoreEntries()) {
- result.complete(false);
- return;
- }
- ((ManagedLedgerImpl) ledger).doCacheEviction(
- System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
- result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
- }
+ // GIVEN an opened ledger with 10 opened cursors
- @Override
- public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
+ ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestReadPosition",
+ config);
+ List cursors = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ ManagedCursor cursor = ledger.openCursor("c" + i);
+ cursors.add(cursor);
+ }
- }, cursor);
- }
+ ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+ int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
- @Override
- public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, cursor, PositionImpl.LATEST);
- }
+ // AND 100 added entries
- @Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, Pair.of(ledger, cursor));
- }
+ for (int i = 0; i < 100; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
- @Override
- public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
+ int insertedEntriesCount =
+ (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+ // EXPECT that 100 entries should have been inserted to the cache
+ assertEquals(insertedEntriesCount, 100);
- }, ledger);
- }
+ int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
- @Override
- public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, null, null);
+ // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+ for (int i = 0; i < 10; i++) {
+ ManagedCursor cursor = cursors.get(i);
+ // read entries farther of the earliest cursor
+ List entries = cursor.readEntries(20 - i);
+ // mark delete the least for the earliest cursor
+ cursor.markDelete(entries.get(i).getPosition());
+ entries.forEach(Entry::release);
+ }
- assertTrue(result.get());
+ // THEN it is expected that the cache evicts entries to the earliest read position
+ Thread.sleep(2 * factory.getConfig().getCacheEvictionIntervalMs());
+ int evictedEntriesCount =
+ (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+ assertEquals(evictedEntriesCount, 11,
+ "It is expected that the cache evicts entries to the earliest read position");
- log.info("Test completed");
+ ledger.close();
}
@Test
- public void testCacheEvictionByReadPosition() throws Throwable {
- CompletableFuture result = new CompletableFuture<>();
+ public void shouldKeepEntriesInCacheByEarliestMarkDeletePosition() throws ManagedLedgerException, InterruptedException {
+ // This test case reproduces issue #16054
+
ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setCacheEvictionByMarkDeletedPosition(true);
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
- factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
- @Override
- public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
- @Override
- public void openCursorComplete(ManagedCursor cursor, Object ctx) {
- ManagedLedger ledger = (ManagedLedger) ctx;
- String message1 = "test";
- ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData, Object ctx) {
- @SuppressWarnings("unchecked")
- Pair pair = (Pair) ctx;
- ManagedLedger ledger = pair.getLeft();
- ManagedCursor cursor = pair.getRight();
- if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
- result.complete(false);
- return;
- }
- cursor.asyncReadEntries(1, new ReadEntriesCallback() {
- @Override
- public void readEntriesComplete(List entries, Object ctx) {
- ManagedCursor cursor = (ManagedCursor) ctx;
- assertEquals(entries.size(), 1);
- Entry entry = entries.get(0);
- final Position position = entry.getPosition();
- if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) {
- result.complete(false);
- return;
- }
- ((ManagedLedgerImpl) ledger).doCacheEviction(
- System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000));
- if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) {
- result.complete(false);
- return;
- }
+ // GIVEN an opened ledger with 10 opened cursors
- log.debug("Mark-Deleting to position {}", position);
- cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
- @Override
- public void markDeleteComplete(Object ctx) {
- log.debug("Mark delete complete");
- ManagedCursor cursor = (ManagedCursor) ctx;
- if (cursor.hasMoreEntries()) {
- result.complete(false);
- return;
- }
- result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
- }
+ ManagedLedger ledger = factory.open("test_ledger_for_shouldKeepEntriesInCacheByEarliestMarkDeletePosition",
+ config);
+ List cursors = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ ManagedCursor cursor = ledger.openCursor("c" + i);
+ cursors.add(cursor);
+ }
- @Override
- public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
+ ManagedLedgerFactoryMXBean cacheStats = factory.getCacheStats();
+ int insertedEntriesCountBefore = (int) cacheStats.getCacheInsertedEntriesCount();
- }, cursor);
- }
+ // AND 100 added entries
- @Override
- public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, cursor, PositionImpl.LATEST);
- }
+ for (int i = 0; i < 100; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
- @Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, Pair.of(ledger, cursor));
- }
+ int insertedEntriesCount =
+ (int) cacheStats.getCacheInsertedEntriesCount() - insertedEntriesCountBefore;
+ // EXPECT that 100 entries should have been inserted to the cache
+ assertEquals(insertedEntriesCount, 100);
- @Override
- public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
+ int evictedEntriesCountBefore = (int) cacheStats.getCacheEvictedEntriesCount();
- }, ledger);
- }
+ // WHEN entries are read for the cursors so that the farthest cursor has most entries read
+ Position lastMarkDeletePos = null;
+ for (int i = 0; i < 10; i++) {
+ ManagedCursor cursor = cursors.get(i);
+ // read 50 (+ index) entries for each cursor
+ List entries = cursor.readEntries(50 + (5 * i));
+ // mark delete the most for the earliest cursor
+ lastMarkDeletePos = entries.get(20 - i).getPosition();
+ cursor.markDelete(lastMarkDeletePos);
+ entries.forEach(Entry::release);
+ }
- @Override
- public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
- result.completeExceptionally(exception);
- }
- }, null, null);
+ Thread.sleep(1000 + 2 * factory.getConfig().getCacheEvictionIntervalMs());
- assertTrue(result.get());
+ ManagedCursorContainer activeCursors = (ManagedCursorContainer) ledger.getActiveCursors();
+ assertEquals(activeCursors.getSlowestReaderPosition(), lastMarkDeletePos);
- log.info("Test completed");
+ // THEN it is expected that the cache evicts entries to the earliest read position
+ int evictedEntriesCount =
+ (int) cacheStats.getCacheEvictedEntriesCount() - evictedEntriesCountBefore;
+ assertEquals(evictedEntriesCount, 11,
+ "It is expected that the cache evicts entries to the earliest read position");
+
+ ledger.close();
}
@Test(timeOut = 20000)
@@ -1783,10 +1639,14 @@ public void testOpenRaceCondition() throws Exception {
@Test
public void invalidateConsumedEntriesFromCache() throws Exception {
- ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateConsumedEntriesFromCache",
+ config);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
EntryCache entryCache = ledger.entryCache;
+ entryCache.clear();
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
@@ -1799,28 +1659,78 @@ public void invalidateConsumedEntriesFromCache() throws Exception {
assertEquals(entryCache.getSize(), 7 * 4);
assertEquals(cacheManager.getSize(), entryCache.getSize());
+
c2.setReadPosition(p3);
- ledger.discardEntriesFromCache(c2, p2);
assertEquals(entryCache.getSize(), 7 * 4);
assertEquals(cacheManager.getSize(), entryCache.getSize());
c1.setReadPosition(p2);
- ledger.discardEntriesFromCache(c1, p2);
assertEquals(entryCache.getSize(), 7 * 3);
assertEquals(cacheManager.getSize(), entryCache.getSize());
c1.setReadPosition(p3);
- ledger.discardEntriesFromCache(c1, p3);
- assertEquals(entryCache.getSize(), 7 * 3);
+ assertEquals(entryCache.getSize(), 7 * 2);
assertEquals(cacheManager.getSize(), entryCache.getSize());
ledger.deactivateCursor(c1);
- assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4
+ assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ c2.setReadPosition(p4);
+ assertEquals(entryCache.getSize(), 7);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ ledger.deactivateCursor(c2);
+ assertEquals(entryCache.getSize(), 0);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+ }
+
+ @Test
+ public void invalidateEntriesFromCacheByMarkDeletePosition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setCacheEvictionByMarkDeletedPosition(true);
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("my_test_ledger_for_invalidateEntriesFromCacheByMarkDeletePosition",
+ config);
+
+ EntryCacheManager cacheManager = factory.getEntryCacheManager();
+ EntryCache entryCache = ledger.entryCache;
+ entryCache.clear();
+
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ ManagedCursorImpl c2 = (ManagedCursorImpl) ledger.openCursor("c2");
+
+ PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
+ PositionImpl p2 = (PositionImpl) ledger.addEntry("entry-2".getBytes());
+ PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes());
+ PositionImpl p4 = (PositionImpl) ledger.addEntry("entry-4".getBytes());
+
+ assertEquals(entryCache.getSize(), 7 * 4);
assertEquals(cacheManager.getSize(), entryCache.getSize());
+
c2.setReadPosition(p4);
- ledger.discardEntriesFromCache(c2, p4);
+ c2.markDelete(p3);
+
+ assertEquals(entryCache.getSize(), 7 * 4);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ c1.setReadPosition(p3);
+ c1.markDelete(p2);
+ assertEquals(entryCache.getSize(), 7 * 3);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ c1.setReadPosition(p4);
+ c1.markDelete(p3);
+ assertEquals(entryCache.getSize(), 7 * 2);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ ledger.deactivateCursor(c1);
+ assertEquals(entryCache.getSize(), 7 * 2);
+ assertEquals(cacheManager.getSize(), entryCache.getSize());
+
+ c2.markDelete(p4);
assertEquals(entryCache.getSize(), 7);
assertEquals(cacheManager.getSize(), entryCache.getSize());
@@ -2104,10 +2014,10 @@ public void testMaximumRolloverTime() throws Exception {
ledger.addEntry("data".getBytes());
ledger.addEntry("data".getBytes());
-
+
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getLedgersInfoAsList().size(), 2);
- });
+ });
}
@Test
@@ -2729,8 +2639,8 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti
}
// (3) Validate: cache should remove all entries read by both active cursors
- log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
- assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+ log.info("expected, found : {}, {}", 5 * (totalInsertedEntries - readEntries), entryCache.getSize());
+ assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
final int remainingEntries = totalInsertedEntries - readEntries;
entries1 = cursor1.readEntries(remainingEntries);
@@ -2744,7 +2654,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti
// (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these
// entries
- assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+ assertEquals(entryCache.getSize(), 5 * (totalInsertedEntries - readEntries));
ledger.deactivateCursor(cursor1);
ledger.deactivateCursor(cursor2);
@@ -2769,7 +2679,7 @@ public void testActiveDeactiveCursor() throws Exception {
}
// (1) Validate: cache not stores entries as no active cursor
- assertEquals(0, entryCache.getSize());
+ assertEquals(entryCache.getSize(), 0);
// Open Cursor also adds cursor into activeCursor-container
ManagedCursor cursor1 = ledger.openCursor("c1");
@@ -2782,7 +2692,7 @@ public void testActiveDeactiveCursor() throws Exception {
}
// (2) Validate: cache stores entries as active cursor has not read message
- assertEquals((5 * totalInsertedEntries), entryCache.getSize());
+ assertEquals(entryCache.getSize(), 5 * totalInsertedEntries);
// read 20 entries
List entries1 = cursor1.readEntries(totalInsertedEntries);
@@ -2793,7 +2703,7 @@ public void testActiveDeactiveCursor() throws Exception {
// (3) Validate: cache discards all entries after all cursors are deactivated
ledger.deactivateCursor(cursor1);
- assertEquals(0, entryCache.getSize());
+ assertEquals(entryCache.getSize(), 0);
ledger.close();
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 0fd8902f8253e..1e960c32bf678 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -26,6 +26,7 @@
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -79,6 +80,9 @@ public final void setUp(Method method) throws Exception {
throw e;
}
+ ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
+ // increase default cache eviction interval so that caching could be tested with less flakyness
+ managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
setUpTestCase();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 7a3eabdb31821..307244a64478c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -581,7 +581,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
field.setAccessible(true);
ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
managedCursors.removeCursor("transaction-buffer-sub");
- managedCursors.add(managedCursor);
+ managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
@@ -600,7 +600,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
- managedCursors.add(managedCursor);
+ managedCursors.add(managedCursor, managedCursor.getMarkDeletedPosition());
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats(false).state, "Ready"));