Skip to content

Commit

Permalink
[WIP][fix][broker] Keep sorted list of cursors ordered by read positi…
Browse files Browse the repository at this point in the history
…on of active cursors when cacheEvictionByMarkDeletedPosition=false

Fixes apache#16054

- calculate the sorted list of when a read position gets updated
- this resolves apache#9958 in a proper way
  - apache#12045 broke the caching solution as explained in apache#16054
  • Loading branch information
lhotari committed Aug 23, 2022
1 parent 423ab75 commit c1220cb
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -50,28 +51,25 @@ private static class Item {
PositionImpl position;
int idx;

Item(ManagedCursor cursor, int idx) {
Item(ManagedCursor cursor, PositionImpl position, int idx) {
this.cursor = cursor;
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
this.position = position;
this.idx = idx;
}
}

public enum CursorType {
DurableCursor,
NonDurableCursor,
ALL
public ManagedCursorContainer() {
this(cursor -> (PositionImpl) cursor.getMarkDeletedPosition());
}

public ManagedCursorContainer() {
cursorType = CursorType.DurableCursor;
private ManagedCursorContainer(Function<ManagedCursor, PositionImpl> positionFunction) {
this.positionFunction = positionFunction;
}

public ManagedCursorContainer(CursorType cursorType) {
this.cursorType = cursorType;
public static ManagedCursorContainer createWithReadPositionOrdering() {
return new ManagedCursorContainer(cursor -> (PositionImpl) cursor.getReadPosition());
}

private final CursorType cursorType;

// Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = new ArrayList();
Expand All @@ -81,46 +79,26 @@ public ManagedCursorContainer(CursorType cursorType) {

private final StampedLock rwLock = new StampedLock();

private int durableCursorCount;

private final Function<ManagedCursor, PositionImpl> positionFunction;

public void add(ManagedCursor cursor) {
long stamp = rwLock.writeLock();
try {
// Append a new entry at the end of the list
Item item = new Item(cursor, heap.size());
Item item = new Item(cursor, positionFunction.apply(cursor), heap.size());
cursors.put(cursor.getName(), item);

if (shouldTrackInHeap(cursor)) {
heap.add(item);
siftUp(item);
heap.add(item);
siftUp(item);
if (cursor.isDurable()) {
durableCursorCount++;
}
} finally {
rwLock.unlockWrite(stamp);
}
}

private boolean shouldTrackInHeap(ManagedCursor cursor) {
return CursorType.ALL.equals(cursorType)
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
}

public PositionImpl getSlowestReadPositionForActiveCursors() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
} finally {
rwLock.unlockRead(stamp);
}
}

public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
} finally {
rwLock.unlockRead(stamp);
}
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand All @@ -135,13 +113,16 @@ public void removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
if (item != null && shouldTrackInHeap(item.cursor)) {
if (item != null) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
if (item.cursor.isDurable()) {
durableCursorCount--;
}
}
} finally {
rwLock.unlockWrite(stamp);
Expand All @@ -165,24 +146,20 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
return null;
}

PositionImpl previousSlowestConsumer = heap.get(0).position;

if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done

// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done

item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}
return null;

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}
Expand Down Expand Up @@ -237,18 +214,18 @@ public boolean isEmpty() {
*/
public boolean hasDurableCursors() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
int count = durableCursorCount;
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
count = durableCursorCount;
} finally {
rwLock.unlockRead(stamp);
}
}

return !isEmpty;
return count > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
persistentMarkDeletePosition = position;
inProgressMarkDeletePersistPosition = null;
readPosition = ledger.getNextValidPosition(position);
ledger.updateReadPosition(this, readPosition);
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
this.cursorLedger = recoveredFromCursorLedger;
Expand Down Expand Up @@ -1215,6 +1216,7 @@ public void operationComplete() {
ledger.getName(), newPosition, oldReadPosition, name);
}
readPosition = newPosition;
ledger.updateReadPosition(ManagedCursorImpl.this, newPosition);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1701,6 +1703,7 @@ boolean hasMoreEntries(PositionImpl position) {

void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
ledger.updateReadPosition(this, readPosition);
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
Expand Down Expand Up @@ -1775,6 +1778,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
}
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
return newReadPosition;
} else {
return currentReadPosition;
Expand Down Expand Up @@ -2356,6 +2360,7 @@ public void rewind() {
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);

readPosition = newReadPosition;
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
Expand All @@ -2373,6 +2378,7 @@ public void seek(Position newReadPositionInt, boolean force) {
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}
readPosition = newReadPosition;
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -2559,6 +2565,7 @@ void setReadPosition(Position newReadPositionInt) {
if (this.markDeletePosition == null
|| ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
this.readPosition = (PositionImpl) newReadPositionInt;
ledger.updateReadPosition(this, newReadPositionInt);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private final ManagedCursorContainer cursors = new ManagedCursorContainer();
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
private final ManagedCursorContainer nonDurableActiveCursors =
new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);

private final ManagedCursorContainer activeCursorsSortedByReadPosition =
ManagedCursorContainer.createWithReadPositionOrdering();

// Ever increasing counter of entries added
@VisibleForTesting
Expand Down Expand Up @@ -2192,11 +2193,9 @@ void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition)
public PositionImpl getEvictionPosition(){
PositionImpl evictionPos;
if (config.isCacheEvictionByMarkDeletedPosition()) {
PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
evictionPos = activeCursors.getSlowestReaderPosition();
} else {
// Always remove all entries already read by active cursors
evictionPos = getEarlierReadPositionForActiveCursors();
evictionPos = activeCursorsSortedByReadPosition.getSlowestReaderPosition();
}
return evictionPos;
}
Expand All @@ -2213,30 +2212,6 @@ void doCacheEviction(long maxTimestamp) {
entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
}

private PositionImpl getEarlierReadPositionForActiveCursors() {
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
if (nonDurablePosition == null) {
return durablePosition;
}
if (durablePosition == null) {
return nonDurablePosition;
}
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
if (nonDurablePosition == null) {
return durablePosition;
}
if (durablePosition == null) {
return nonDurablePosition;
}
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
if (pair == null) {
Expand All @@ -2259,6 +2234,12 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
}
}

public void updateReadPosition(ManagedCursorImpl cursor, Position newReadPosition) {
if (!config.isCacheEvictionByMarkDeletedPosition()) {
activeCursorsSortedByReadPosition.cursorUpdated(cursor, newReadPosition);
}
}

PositionImpl startReadOperationOnLedger(PositionImpl position) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (ledgerId != null && ledgerId != position.getLedgerId()) {
Expand Down Expand Up @@ -3512,16 +3493,19 @@ Pair<PositionImpl, Long> getFirstPositionAndCounter() {
public void activateCursor(ManagedCursor cursor) {
if (activeCursors.get(cursor.getName()) == null) {
activeCursors.add(cursor);
}
if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
nonDurableActiveCursors.add(cursor);
if (!config.isCacheEvictionByMarkDeletedPosition()) {
activeCursorsSortedByReadPosition.add(cursor);
}
}
}

public void deactivateCursor(ManagedCursor cursor) {
synchronized (activeCursors) {
if (activeCursors.get(cursor.getName()) != null) {
activeCursors.removeCursor(cursor.getName());
if (!config.isCacheEvictionByMarkDeletedPosition()) {
activeCursorsSortedByReadPosition.removeCursor(cursor.getName());
}
if (!activeCursors.hasDurableCursors()) {
// cleanup cache if there is no active subscription
entryCache.clear();
Expand All @@ -3532,9 +3516,6 @@ public void deactivateCursor(ManagedCursor cursor) {
getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
}
}
if (!cursor.isDurable()) {
nonDurableActiveCursors.removeCursor(cursor.getName());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,41 +402,40 @@ public boolean isClosed() {

@Test
public void testSlowestReadPositionForActiveCursors() throws Exception {
ManagedCursorContainer container =
new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
assertNull(container.getSlowestReadPositionForActiveCursors());
ManagedCursorContainer container = ManagedCursorContainer.createWithReadPositionOrdering();
assertNull(container.getSlowestReaderPosition());

// Add no durable cursor
PositionImpl position = PositionImpl.get(5,5);
ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
doReturn(false).when(cursor1).isDurable();
doReturn(position).when(cursor1).getReadPosition();
container.add(cursor1);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));

// Add no durable cursor
position = PositionImpl.get(1,1);
ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
doReturn(false).when(cursor2).isDurable();
doReturn(position).when(cursor2).getReadPosition();
container.add(cursor2);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(1, 1));

// Move forward cursor, cursor1 = 5:5, cursor2 = 5:6, slowest is 5:5
position = PositionImpl.get(5,6);
container.cursorUpdated(cursor2, position);
doReturn(position).when(cursor2).getReadPosition();
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));

// Move forward cursor, cursor1 = 5:8, cursor2 = 5:6, slowest is 5:6
position = PositionImpl.get(5,8);
doReturn(position).when(cursor1).getReadPosition();
container.cursorUpdated(cursor1, position);
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 6));

// Remove cursor, only cursor1 left, cursor1 = 5:8
container.removeCursor(cursor2.getName());
assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 8));
}

@Test
Expand Down

0 comments on commit c1220cb

Please sign in to comment.