Skip to content

Commit eeb80e1

Browse files
[fix][broker][branch-2.10] limit the memory used by reads end-to-end
1 parent 47c98e5 commit eeb80e1

File tree

5 files changed

+14
-20
lines changed

5 files changed

+14
-20
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/**
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Collection;
3030
import java.util.Iterator;
3131
import java.util.List;
32-
import java.util.concurrent.CompletableFuture;
3332
import java.util.concurrent.atomic.AtomicInteger;
3433
import org.apache.bookkeeper.client.api.BKException;
3534
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -72,7 +71,6 @@ public class RangeEntryCacheImpl implements EntryCache {
7271
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
7372
this.manager = manager;
7473
this.ml = ml;
75-
this.pendingReadsManager = new PendingReadsManager(this);
7674
this.interceptor = ml.getManagedLedgerInterceptor();
7775
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
7876
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -281,14 +279,14 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
281279
@SuppressWarnings({ "unchecked", "rawtypes" })
282280
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
283281
final ReadEntriesCallback callback, Object ctx) {
284-
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
282+
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null);
285283
}
286284

287-
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
285+
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
288286
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
289287

290288
final AsyncCallbacks.ReadEntriesCallback callback =
291-
handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
289+
handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader,
292290
originalCallback, ctx, handle);
293291
if (callback == null) {
294292
return;
@@ -371,8 +369,10 @@ void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, b
371369
private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
372370
long firstEntry, long lastEntry,
373371
boolean shouldCacheEntry,
374-
AsyncCallbacks.ReadEntriesCallback originalCallback,
375-
Object ctx, InflightReadsLimiter.Handle handle) {
372+
AsyncCallbacks.ReadEntriesCallback
373+
originalCallback,
374+
Object ctx,
375+
InflightReadsLimiter.Handle handle) {
376376
InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
377377
if (pendingReadsLimiter.isDisabled()) {
378378
return originalCallback;

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/**
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -527,12 +527,9 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
527527
}
528528
long size = entries.stream().mapToLong(Entry::getLength).sum();
529529
updatePendingBytesToDispatch(size);
530-
if (sendMessagesToConsumers(readType, entries)) {
531-
updatePendingBytesToDispatch(-size);
532-
readMoreEntriesAsync();
533-
} else {
534-
updatePendingBytesToDispatch(-size);
535-
} }
530+
sendMessagesToConsumers(readType, entries);
531+
updatePendingBytesToDispatch(-size);
532+
}
536533

537534
protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
538535
sendInProgress = true;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,8 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest
9393
.getNextValidPosition((PositionImpl) entry.getPosition()));
9494
long size = entry.getLength();
9595
updatePendingBytesToDispatch(size);
96-
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
97-
readMoreEntriesAsync();
98-
} else {
99-
updatePendingBytesToDispatch(-size);
100-
}
96+
sendMessagesToConsumers(readType, Lists.newArrayList(entry));
97+
updatePendingBytesToDispatch(-size);
10198
ctx.recycle();
10299
}
103100

0 commit comments

Comments
 (0)