Skip to content

Commit

Permalink
[cherry-pick][branch-2.10]Fix broker cache eviction of entries read b…
Browse files Browse the repository at this point in the history
…y active cursor (#18980)

Co-authored-by: LinChen <1572139390@qq.com>
Co-authored-by: AloysZhang <lofterzhang@gmail.com>
Co-authored-by: Matteo Merli <mmerli@apache.org>
Co-authored-by: Michael Marshall <mmarshall@apache.org>
Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
  • Loading branch information
6 people authored Dec 21, 2022
1 parent af0bc20 commit 070c356
Show file tree
Hide file tree
Showing 42 changed files with 1,008 additions and 505 deletions.
9 changes: 6 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1481,4 +1481,7 @@ tlsEnabled=false

# Enable Key_Shared subscription (default is enabled)
# @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable.
subscriptionKeySharedEnable=true
subscriptionKeySharedEnable=true

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0
13 changes: 11 additions & 2 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1134,6 +1134,15 @@ replicationTlsEnabled=false
# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
brokerServicePurgeInactiveFrequencyInSeconds=60

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0

### --- Transaction config variables --- ###

# Enable transaction coordinator in broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -75,6 +77,9 @@ public class ManagedLedgerConfig {
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;
private int inactiveLedgerRollOverTimeMs = 0;
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down Expand Up @@ -179,4 +180,21 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

/**
* @return return EntryCacheManager.
*/
EntryCacheManager getEntryCacheManager();

/**
* update cache evictionTimeThreshold.
*
* @param cacheEvictionTimeThresholdNanos time threshold for eviction.
*/
void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);

/**
* @return time threshold for eviction.
* */
long getCacheEvictionTimeThreshold();

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig {
private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();

/**
* Frequency of cache eviction triggering. Default is 100 times per second.
* Interval of cache eviction triggering. Default is 10 ms times.
*/
private double cacheEvictionFrequency = 100;
private long cacheEvictionIntervalMs = 10;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,19 @@ public interface ManagedLedgerFactoryMXBean {
* Get the number of cache evictions during the last minute.
*/
long getNumberOfCacheEvictions();

/**
* Cumulative number of entries inserted into the cache.
*/
long getCacheInsertedEntriesCount();

/**
* Cumulative number of entries evicted from the cache.
*/
long getCacheEvictedEntriesCount();

/**
* Current number of entries in the cache.
*/
long getCacheEntriesCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;

import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -31,17 +30,16 @@
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains all the cursors for a ManagedLedger.
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
* in the linked-list. The list is a sorted double linked-list of cursors.
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
*
* <p/>To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
* care about ledgers to be deleted.
* <p/>The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {
Expand All @@ -51,77 +49,54 @@ private static class Item {
PositionImpl position;
int idx;

Item(ManagedCursor cursor, int idx) {
Item(ManagedCursor cursor, PositionImpl position, int idx) {
this.cursor = cursor;
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
this.position = position;
this.idx = idx;
}
}

public enum CursorType {
DurableCursor,
NonDurableCursor,
ALL
}

public ManagedCursorContainer() {
cursorType = CursorType.DurableCursor;
}

public ManagedCursorContainer(CursorType cursorType) {
this.cursorType = cursorType;
}

private final CursorType cursorType;

// Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = Lists.newArrayList();
// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList();

// Maps a cursor to its position in the heap
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<>();

private final StampedLock rwLock = new StampedLock();

public void add(ManagedCursor cursor) {
private int durableCursorCount;


/**
* Add a cursor to the container. The cursor will be optionally tracked for the slowest reader when
* a position is passed as the second argument. It is expected that the position is updated with
* {@link #cursorUpdated(ManagedCursor, Position)} method when the position changes.
*
* @param cursor cursor to add
* @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
* tracked for the slowest reader.
*/
public void add(ManagedCursor cursor, Position position) {
long stamp = rwLock.writeLock();
try {
// Append a new entry at the end of the list
Item item = new Item(cursor, heap.size());
Item item = new Item(cursor, (PositionImpl) position, position != null ? heap.size() : -1);
cursors.put(cursor.getName(), item);

if (shouldTrackInHeap(cursor)) {
if (position != null) {
heap.add(item);
siftUp(item);
}
if (cursor.isDurable()) {
durableCursorCount++;
}
} finally {
rwLock.unlockWrite(stamp);
}
}

private boolean shouldTrackInHeap(ManagedCursor cursor) {
return CursorType.ALL.equals(cursorType)
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
}

public PositionImpl getSlowestReadPositionForActiveCursors() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
} finally {
rwLock.unlockRead(stamp);
}
}

public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
} finally {
rwLock.unlockRead(stamp);
}
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand All @@ -132,28 +107,41 @@ public ManagedCursor get(String name) {
}
}

public void removeCursor(String name) {
public boolean removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
if (item != null && shouldTrackInHeap(item.cursor)) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
if (item != null) {
if (item.idx >= 0) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
}
if (item.cursor.isDurable()) {
durableCursorCount--;
}
return true;
} else {
return false;
}
} finally {
rwLock.unlockWrite(stamp);
}
}

/**
* Signal that a cursor position has been updated and that the container must re-order the cursor list.
* Signal that a cursor position has been updated and that the container must re-order the cursor heap
* tracking the slowest reader.
* Only those cursors are tracked and can be updated which were added to the container with the
* {@link #add(ManagedCursor, Position)} method that specified the initial position in the position
* parameter.
*
* @param cursor
* @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
* @param cursor the cursor to update the position for
* @param newPosition the updated position for the cursor
* @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
* update).
*/
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
Expand All @@ -162,35 +150,33 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
if (item == null) {
if (item == null || item.idx == -1) {
return null;
}

PositionImpl previousSlowestConsumer = heap.get(0).position;

if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;

item.position = (PositionImpl) newPosition;
if (heap.size() > 1) {
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done

item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
return null;

PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}
}

/**
* Get the slowest reader position, meaning older acknowledged position between all the cursors.
* Get the slowest reader position for the cursors that are ordered.
*
* @return the slowest reader position
*/
Expand Down Expand Up @@ -238,18 +224,18 @@ public boolean isEmpty() {
*/
public boolean hasDurableCursors() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
int count = durableCursorCount;
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
count = durableCursorCount;
} finally {
rwLock.unlockRead(stamp);
}
}

return !isEmpty;
return count > 0;
}

@Override
Expand Down Expand Up @@ -292,7 +278,7 @@ public ManagedCursor next() {

@Override
public void remove() {
throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
throw new IllegalArgumentException("Cannot remove ManagedCursor from container");
}
};
}
Expand Down
Loading

0 comments on commit 070c356

Please sign in to comment.