Skip to content

Commit 8402e07

Browse files
committed
[fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false
Fixes apache#16054 - calculate the sorted list of when a read position gets updated - this resolves apache#9958 in a proper way - apache#12045 broke the caching solution as explained in apache#16054 - remove invalid tests - fix tests - add more tests to handle corner cases
1 parent 6b8c6a3 commit 8402e07

File tree

7 files changed

+238
-353
lines changed

7 files changed

+238
-353
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java

+36-59
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ConcurrentMap;
2626
import java.util.concurrent.ConcurrentSkipListMap;
2727
import java.util.concurrent.locks.StampedLock;
28+
import java.util.function.Function;
2829
import org.apache.bookkeeper.mledger.ManagedCursor;
2930
import org.apache.bookkeeper.mledger.Position;
3031
import org.apache.commons.lang3.tuple.Pair;
@@ -50,28 +51,25 @@ private static class Item {
5051
PositionImpl position;
5152
int idx;
5253

53-
Item(ManagedCursor cursor, int idx) {
54+
Item(ManagedCursor cursor, PositionImpl position, int idx) {
5455
this.cursor = cursor;
55-
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
56+
this.position = position;
5657
this.idx = idx;
5758
}
5859
}
5960

60-
public enum CursorType {
61-
DurableCursor,
62-
NonDurableCursor,
63-
ALL
61+
public ManagedCursorContainer() {
62+
this(cursor -> (PositionImpl) cursor.getMarkDeletedPosition());
6463
}
6564

66-
public ManagedCursorContainer() {
67-
cursorType = CursorType.DurableCursor;
65+
private ManagedCursorContainer(Function<ManagedCursor, PositionImpl> positionFunction) {
66+
this.positionFunction = positionFunction;
6867
}
6968

70-
public ManagedCursorContainer(CursorType cursorType) {
71-
this.cursorType = cursorType;
69+
public static ManagedCursorContainer createWithReadPositionOrdering() {
70+
return new ManagedCursorContainer(cursor -> (PositionImpl) cursor.getReadPosition());
7271
}
7372

74-
private final CursorType cursorType;
7573

7674
// Used to keep track of slowest cursor. Contains all of all active cursors.
7775
private final ArrayList<Item> heap = new ArrayList();
@@ -81,46 +79,26 @@ public ManagedCursorContainer(CursorType cursorType) {
8179

8280
private final StampedLock rwLock = new StampedLock();
8381

82+
private int durableCursorCount;
83+
84+
private final Function<ManagedCursor, PositionImpl> positionFunction;
85+
8486
public void add(ManagedCursor cursor) {
8587
long stamp = rwLock.writeLock();
8688
try {
8789
// Append a new entry at the end of the list
88-
Item item = new Item(cursor, heap.size());
90+
Item item = new Item(cursor, positionFunction.apply(cursor), heap.size());
8991
cursors.put(cursor.getName(), item);
90-
91-
if (shouldTrackInHeap(cursor)) {
92-
heap.add(item);
93-
siftUp(item);
92+
heap.add(item);
93+
siftUp(item);
94+
if (cursor.isDurable()) {
95+
durableCursorCount++;
9496
}
9597
} finally {
9698
rwLock.unlockWrite(stamp);
9799
}
98100
}
99101

100-
private boolean shouldTrackInHeap(ManagedCursor cursor) {
101-
return CursorType.ALL.equals(cursorType)
102-
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
103-
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
104-
}
105-
106-
public PositionImpl getSlowestReadPositionForActiveCursors() {
107-
long stamp = rwLock.readLock();
108-
try {
109-
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
110-
} finally {
111-
rwLock.unlockRead(stamp);
112-
}
113-
}
114-
115-
public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
116-
long stamp = rwLock.readLock();
117-
try {
118-
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
119-
} finally {
120-
rwLock.unlockRead(stamp);
121-
}
122-
}
123-
124102
public ManagedCursor get(String name) {
125103
long stamp = rwLock.readLock();
126104
try {
@@ -135,13 +113,16 @@ public void removeCursor(String name) {
135113
long stamp = rwLock.writeLock();
136114
try {
137115
Item item = cursors.remove(name);
138-
if (item != null && shouldTrackInHeap(item.cursor)) {
116+
if (item != null) {
139117
// Move the item to the right end of the heap to be removed
140118
Item lastItem = heap.get(heap.size() - 1);
141119
swap(item, lastItem);
142120
heap.remove(item.idx);
143121
// Update the heap
144122
siftDown(lastItem);
123+
if (item.cursor.isDurable()) {
124+
durableCursorCount--;
125+
}
145126
}
146127
} finally {
147128
rwLock.unlockWrite(stamp);
@@ -165,24 +146,20 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
165146
return null;
166147
}
167148

149+
PositionImpl previousSlowestConsumer = heap.get(0).position;
168150

169-
if (shouldTrackInHeap(item.cursor)) {
170-
PositionImpl previousSlowestConsumer = heap.get(0).position;
151+
// When the cursor moves forward, we need to push it toward the
152+
// bottom of the tree and push it up if a reset was done
171153

172-
// When the cursor moves forward, we need to push it toward the
173-
// bottom of the tree and push it up if a reset was done
174-
175-
item.position = (PositionImpl) newPosition;
176-
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
177-
siftDown(item);
178-
} else {
179-
siftUp(item);
180-
}
181-
182-
PositionImpl newSlowestConsumer = heap.get(0).position;
183-
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
154+
item.position = (PositionImpl) newPosition;
155+
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
156+
siftDown(item);
157+
} else {
158+
siftUp(item);
184159
}
185-
return null;
160+
161+
PositionImpl newSlowestConsumer = heap.get(0).position;
162+
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
186163
} finally {
187164
rwLock.unlockWrite(stamp);
188165
}
@@ -237,18 +214,18 @@ public boolean isEmpty() {
237214
*/
238215
public boolean hasDurableCursors() {
239216
long stamp = rwLock.tryOptimisticRead();
240-
boolean isEmpty = heap.isEmpty();
217+
int count = durableCursorCount;
241218
if (!rwLock.validate(stamp)) {
242219
// Fallback to read lock
243220
stamp = rwLock.readLock();
244221
try {
245-
isEmpty = heap.isEmpty();
222+
count = durableCursorCount;
246223
} finally {
247224
rwLock.unlockRead(stamp);
248225
}
249226
}
250227

251-
return !isEmpty;
228+
return count > 0;
252229
}
253230

254231
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
635635
persistentMarkDeletePosition = position;
636636
inProgressMarkDeletePersistPosition = null;
637637
readPosition = ledger.getNextValidPosition(position);
638+
ledger.updateReadPosition(this, readPosition);
638639
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
639640
// assign cursor-ledger so, it can be deleted when new ledger will be switched
640641
this.cursorLedger = recoveredFromCursorLedger;
@@ -1215,6 +1216,7 @@ public void operationComplete() {
12151216
ledger.getName(), newPosition, oldReadPosition, name);
12161217
}
12171218
readPosition = newPosition;
1219+
ledger.updateReadPosition(ManagedCursorImpl.this, newPosition);
12181220
} finally {
12191221
lock.writeLock().unlock();
12201222
}
@@ -1701,6 +1703,7 @@ boolean hasMoreEntries(PositionImpl position) {
17011703

17021704
void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
17031705
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
1706+
ledger.updateReadPosition(this, readPosition);
17041707
markDeletePosition = lastPositionCounter.getLeft();
17051708
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
17061709
persistentMarkDeletePosition = null;
@@ -1775,6 +1778,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
17751778
log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}",
17761779
ledger.getName(), currentReadPosition, newReadPosition, markDeletePosition);
17771780
}
1781+
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
17781782
return newReadPosition;
17791783
} else {
17801784
return currentReadPosition;
@@ -2356,6 +2360,7 @@ public void rewind() {
23562360
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
23572361

23582362
readPosition = newReadPosition;
2363+
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
23592364
} finally {
23602365
lock.writeLock().unlock();
23612366
}
@@ -2373,6 +2378,7 @@ public void seek(Position newReadPositionInt, boolean force) {
23732378
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
23742379
}
23752380
readPosition = newReadPosition;
2381+
ledger.updateReadPosition(ManagedCursorImpl.this, newReadPosition);
23762382
} finally {
23772383
lock.writeLock().unlock();
23782384
}
@@ -2573,6 +2579,7 @@ void setReadPosition(Position newReadPositionInt) {
25732579
if (this.markDeletePosition == null
25742580
|| ((PositionImpl) newReadPositionInt).compareTo(this.markDeletePosition) > 0) {
25752581
this.readPosition = (PositionImpl) newReadPositionInt;
2582+
ledger.updateReadPosition(this, newReadPositionInt);
25762583
}
25772584
}
25782585

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+18-48
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
166166

167167
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
168168
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
169-
private final ManagedCursorContainer nonDurableActiveCursors =
170-
new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
169+
170+
private final ManagedCursorContainer activeCursorsSortedByReadPosition =
171+
ManagedCursorContainer.createWithReadPositionOrdering();
171172

172173
// Ever increasing counter of entries added
173174
@VisibleForTesting
@@ -2182,21 +2183,12 @@ public boolean hasMoreEntries(PositionImpl position) {
21822183
return result;
21832184
}
21842185

2185-
void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl newPosition) {
2186-
Pair<PositionImpl, PositionImpl> pair = activeCursors.cursorUpdated(cursor, newPosition);
2187-
if (pair != null) {
2188-
entryCache.invalidateEntries(pair.getRight());
2189-
}
2190-
}
2191-
21922186
public PositionImpl getEvictionPosition(){
21932187
PositionImpl evictionPos;
21942188
if (config.isCacheEvictionByMarkDeletedPosition()) {
2195-
PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
2196-
evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
2189+
evictionPos = activeCursors.getSlowestReaderPosition();
21972190
} else {
2198-
// Always remove all entries already read by active cursors
2199-
evictionPos = getEarlierReadPositionForActiveCursors();
2191+
evictionPos = activeCursorsSortedByReadPosition.getSlowestReaderPosition();
22002192
}
22012193
return evictionPos;
22022194
}
@@ -2213,32 +2205,9 @@ void doCacheEviction(long maxTimestamp) {
22132205
entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
22142206
}
22152207

2216-
private PositionImpl getEarlierReadPositionForActiveCursors() {
2217-
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
2218-
PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
2219-
if (nonDurablePosition == null) {
2220-
return durablePosition;
2221-
}
2222-
if (durablePosition == null) {
2223-
return nonDurablePosition;
2224-
}
2225-
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
2226-
}
2227-
2228-
private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
2229-
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
2230-
PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
2231-
if (nonDurablePosition == null) {
2232-
return durablePosition;
2233-
}
2234-
if (durablePosition == null) {
2235-
return nonDurablePosition;
2236-
}
2237-
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
2238-
}
2239-
22402208
void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
22412209
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
2210+
activeCursors.cursorUpdated(cursor, newPosition);
22422211
if (pair == null) {
22432212
// Cursor has been removed in the meantime
22442213
trimConsumedLedgersInBackground();
@@ -2259,6 +2228,12 @@ void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
22592228
}
22602229
}
22612230

2231+
public void updateReadPosition(ManagedCursorImpl cursor, Position newReadPosition) {
2232+
if (!config.isCacheEvictionByMarkDeletedPosition()) {
2233+
activeCursorsSortedByReadPosition.cursorUpdated(cursor, newReadPosition);
2234+
}
2235+
}
2236+
22622237
PositionImpl startReadOperationOnLedger(PositionImpl position) {
22632238
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
22642239
if (ledgerId != null && ledgerId != position.getLedgerId()) {
@@ -3512,29 +3487,24 @@ Pair<PositionImpl, Long> getFirstPositionAndCounter() {
35123487
public void activateCursor(ManagedCursor cursor) {
35133488
if (activeCursors.get(cursor.getName()) == null) {
35143489
activeCursors.add(cursor);
3515-
}
3516-
if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
3517-
nonDurableActiveCursors.add(cursor);
3490+
if (!config.isCacheEvictionByMarkDeletedPosition()) {
3491+
activeCursorsSortedByReadPosition.add(cursor);
3492+
}
35183493
}
35193494
}
35203495

35213496
public void deactivateCursor(ManagedCursor cursor) {
35223497
synchronized (activeCursors) {
35233498
if (activeCursors.get(cursor.getName()) != null) {
35243499
activeCursors.removeCursor(cursor.getName());
3500+
if (!config.isCacheEvictionByMarkDeletedPosition()) {
3501+
activeCursorsSortedByReadPosition.removeCursor(cursor.getName());
3502+
}
35253503
if (!activeCursors.hasDurableCursors()) {
35263504
// cleanup cache if there is no active subscription
35273505
entryCache.clear();
3528-
} else {
3529-
// if removed subscription was the slowest subscription : update cursor and let it clear cache:
3530-
// till new slowest-cursor's read-position
3531-
discardEntriesFromCache((ManagedCursorImpl) activeCursors.getSlowestReader(),
3532-
getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
35333506
}
35343507
}
3535-
if (!cursor.isDurable()) {
3536-
nonDurableActiveCursors.removeCursor(cursor.getName());
3537-
}
35383508
}
35393509
}
35403510

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public void verifyHitsMisses() throws Exception {
332332

333333
PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
334334
c2.setReadPosition(pos);
335-
ledger.discardEntriesFromCache(c2, pos);
335+
ledger.doCacheEviction(0);
336336
entries.forEach(Entry::release);
337337

338338
factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)