Skip to content

Commit

Permalink
[fix][broker] Fix broker cache eviction of entries read by active cur…
Browse files Browse the repository at this point in the history
…sors (#17273)

* [fix][broker] Fix broken build caused by conflict between #17195 and #16605

- #17195 changed the method signature that #16605 depended upon

* [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false

Fixes #16054

- calculate the sorted list of when a read position gets updated
- this resolves #9958 in a proper way
  - #12045 broke the caching solution as explained in #16054
- remove invalid tests
- fix tests
- add more tests to handle corner cases

* Address review comment

* Handle durable & non-durable in the correct way

* Fix cache tests since now entries get evicted reactively

* Address review comment about method names

* Change signature for add method so that position must be passed

- this is more consistent with cursorUpdated method where the position is passed

* Update javadoc for ManagedCursorContainer

* Address review comment

* Simplify ManagedCursorContainer

* Clarify javadoc

* Ensure that cursors are tracked by making sure that initial position isn't null unintentionally

* Prevent race in updating activeCursors

(cherry picked from commit 856ef15)
  • Loading branch information
lhotari authored and congbobo184 committed Dec 7, 2022
1 parent 06b4d58 commit aa1f660
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains all the cursors for a ManagedLedger.
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
* in the linked-list. The list is a sorted double linked-list of cursors.
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
*
* <p/>To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
* care about ledgers to be deleted.
* <p/>The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {
Expand All @@ -51,67 +50,54 @@ private static class Item {
PositionImpl position;
int idx;

Item(ManagedCursor cursor, int idx) {
Item(ManagedCursor cursor, PositionImpl position, int idx) {
this.cursor = cursor;
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
this.position = position;
this.idx = idx;
}
}

public enum CursorType {
DurableCursor,
NonDurableCursor,
ALL
}

public ManagedCursorContainer() {
cursorType = CursorType.DurableCursor;
}

public ManagedCursorContainer(CursorType cursorType) {
this.cursorType = cursorType;
}

private final CursorType cursorType;

// Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = Lists.newArrayList();
// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList();

// Maps a cursor to its position in the heap
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<>();

private final StampedLock rwLock = new StampedLock();

public void add(ManagedCursor cursor) {
private int durableCursorCount;


/**
* Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when
* a position is passed as the second argument. It is expected that the position is updated with
* {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes.
*
* @param cursor cursor to add
* @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
* tracked for the slowest reader.
*/
public void add(ManagedCursor cursor, Position position) {
long stamp = rwLock.writeLock();
try {
// Append a new entry at the end of the list
Item item = new Item(cursor, heap.size());
Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1);
cursors.put(cursor.getName(), item);

if (shouldTrackInHeap(cursor)) {
if (position != null) {
heap.add(item);
siftUp(item);
}
if (cursor.isDurable()) {
durableCursorCount++;
}
} finally {
rwLock.unlockWrite(stamp);
}
}

private boolean shouldTrackInHeap(ManagedCursor cursor) {
return CursorType.ALL.equals(cursorType)
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
}

public PositionImpl getSlowestReadPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
}

public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand All @@ -122,28 +108,41 @@ public ManagedCursor get(String name) {
}
}

public void removeCursor(String name) {
public boolean removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
if (item != null && shouldTrackInHeap(item.cursor)) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
if (item != null) {
if (item.idx >= 0) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
}
if (item.cursor.isDurable()) {
durableCursorCount--;
}
return true;
} else {
return false;
}
} finally {
rwLock.unlockWrite(stamp);
}
}

/**
* Signal that a cursor position has been updated and that the container must re-order the cursor list.
* Signal that a cursor position has been updated and that the container must re-order the cursor heap
* tracking the slowest reader.
* Only those cursors are tracked and can be updated which were added to the container with the
* {@link #add(ManagedCursor, Position)} method that specified the initial position in the position
* parameter.
*
* @param cursor
* @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
* @param cursor the cursor to update the position for
* @param newPosition the updated position for the cursor
* @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
* update).
*/
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
Expand All @@ -152,35 +151,33 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
if (item == null) {
if (item == null || item.idx == -1) {
return null;
}

PositionImpl previousSlowestConsumer = heap.get(0).position;

if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;

item.position = (PositionImpl) newPosition;
if (heap.size() > 1) {
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done

item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
return null;

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}
}

/**
* Get the slowest reader position, meaning older acknowledged position between all the cursors.
* Get the slowest reader position for the cursors that are ordered.
*
* @return the slowest reader position
*/
Expand Down Expand Up @@ -228,18 +225,18 @@ public boolean isEmpty() {
*/
public boolean hasDurableCursors() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
int count = durableCursorCount;
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
count = durableCursorCount;
} finally {
rwLock.unlockRead(stamp);
}
}

return !isEmpty;
return count > 0;
}

@Override
Expand Down Expand Up @@ -282,7 +279,7 @@ public ManagedCursor next() {

@Override
public void remove() {
throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
throw new IllegalArgumentException("Cannot remove ManagedCursor from container");
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
ledger.onCursorReadPositionUpdated(this, readPosition);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
this.cursorLedger = recoveredFromCursorLedger;
Expand Down Expand Up @@ -1129,6 +1130,7 @@ public void operationComplete() {
ledger.getName(), newPosition, oldReadPosition, name);
}
readPosition = newPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1613,6 +1615,7 @@ boolean hasMoreEntries(PositionImpl position) {

void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
ledger.onCursorReadPositionUpdated(this, readPosition);
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
Expand Down Expand Up @@ -1687,6 +1690,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", ledger.getName(),
currentReadPosition, newReadPosition, markDeletePosition);
}
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
return newReadPosition;
} else {
return currentReadPosition;
Expand Down Expand Up @@ -1905,7 +1909,7 @@ public void operationComplete() {
lock.writeLock().unlock();
}

ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition);
ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);

decrementPendingMarkDeleteCount();

Expand Down Expand Up @@ -2269,6 +2273,7 @@ public void rewind() {
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);

readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
Expand All @@ -2286,6 +2291,7 @@ public void seek(Position newReadPositionInt, boolean force) {
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -2485,6 +2491,7 @@ void setReadPosition(Position newReadPositionInt) {
if (this.markDeletePosition == null
|| ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
this.readPosition = (PositionImpl) newReadPositionInt;
ledger.onCursorReadPositionUpdated(this, newReadPositionInt);
}
}

Expand Down
Loading

0 comments on commit aa1f660

Please sign in to comment.