Skip to content

Commit 47c98e5

Browse files
eolivelliliangyepianzhou
authored andcommitted
InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (apache#18245)
* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) Motivation: Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value) The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache. When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them Modifications: - Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads. - Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB - The feature is disabled by default - Add new metrics to track the values * Change error message * checkstyle * Fix license * remove duplicate method after cherry-pick * Rename onDeallocate (cherry picked from commit 6fec66b)
1 parent 84902ac commit 47c98e5

File tree

15 files changed

+515
-7
lines changed

15 files changed

+515
-7
lines changed

conf/broker.conf

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,15 @@ managedLedgerCursorRolloverTimeInSeconds=14400
10641064
# crashes.
10651065
managedLedgerMaxUnackedRangesToPersist=10000
10661066

1067-
# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
1067+
# Maximum amount of memory used hold data read from storage (or from the cache).
1068+
# This mechanism prevents the broker to have too many concurrent
1069+
# reads from storage and fall into Out of Memory errors in case
1070+
# of multiple concurrent reads to multiple concurrent consumers.
1071+
# Set 0 in order to disable the feature.
1072+
#
1073+
managedLedgerMaxReadsInFlightSizeInMB=0
1074+
1075+
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
10681076
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
10691077
# zookeeper.
10701078
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public class ManagedLedgerFactoryConfig {
5757
*/
5858
private boolean copyEntriesInCache = false;
5959

60+
/**
61+
* Maximum number of (estimated) data in-flight reading from storage and the cache.
62+
*/
63+
private long managedLedgerMaxReadsInFlightSize = 0;
64+
6065
/**
6166
* Whether trace managed ledger task execution time.
6267
*/

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
4545
private long entryId;
4646
ByteBuf data;
4747

48+
private Runnable onDeallocate;
49+
4850
public static EntryImpl create(LedgerEntry ledgerEntry) {
4951
EntryImpl entry = RECYCLER.get();
5052
entry.timestamp = System.nanoTime();
@@ -103,6 +105,22 @@ private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
103105
this.recyclerHandle = recyclerHandle;
104106
}
105107

108+
public void onDeallocate(Runnable r) {
109+
if (this.onDeallocate == null) {
110+
this.onDeallocate = r;
111+
} else {
112+
// this is not expected to happen
113+
Runnable previous = this.onDeallocate;
114+
this.onDeallocate = () -> {
115+
try {
116+
previous.run();
117+
} finally {
118+
r.run();
119+
}
120+
};
121+
}
122+
}
123+
106124
public long getTimestamp() {
107125
return timestamp;
108126
}
@@ -160,6 +178,13 @@ public ReferenceCounted touch(Object hint) {
160178
@Override
161179
protected void deallocate() {
162180
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
181+
if (onDeallocate != null) {
182+
try {
183+
onDeallocate.run();
184+
} finally {
185+
onDeallocate = null;
186+
}
187+
}
163188
data.release();
164189
data = null;
165190
timestamp = -1;
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl.cache;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import io.prometheus.client.Gauge;
23+
import lombok.AllArgsConstructor;
24+
import lombok.ToString;
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
@Slf4j
28+
public class InflightReadsLimiter {
29+
30+
private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
31+
.build()
32+
.name("pulsar_ml_reads_inflight_bytes")
33+
.help("Estimated number of bytes retained by data read from storage or cache")
34+
.register();
35+
36+
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
37+
.build()
38+
.name("pulsar_ml_reads_available_inflight_bytes")
39+
.help("Available space for inflight data read from storage or cache")
40+
.register();
41+
42+
private final long maxReadsInFlightSize;
43+
private long remainingBytes;
44+
45+
public InflightReadsLimiter(long maxReadsInFlightSize) {
46+
if (maxReadsInFlightSize <= 0) {
47+
// set it to -1 in order to show in the metrics that the metric is not available
48+
PULSAR_ML_READS_BUFFER_SIZE.set(-1);
49+
PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1);
50+
}
51+
this.maxReadsInFlightSize = maxReadsInFlightSize;
52+
this.remainingBytes = maxReadsInFlightSize;
53+
}
54+
55+
@VisibleForTesting
56+
public synchronized long getRemainingBytes() {
57+
return remainingBytes;
58+
}
59+
60+
@AllArgsConstructor
61+
@ToString
62+
static class Handle {
63+
final long acquiredPermits;
64+
final boolean success;
65+
final int trials;
66+
67+
final long creationTime;
68+
}
69+
70+
private static final Handle DISABLED = new Handle(0, true, 0, -1);
71+
72+
Handle acquire(long permits, Handle current) {
73+
if (maxReadsInFlightSize <= 0) {
74+
// feature is disabled
75+
return DISABLED;
76+
}
77+
synchronized (this) {
78+
try {
79+
if (current == null) {
80+
if (remainingBytes == 0) {
81+
return new Handle(0, false, 1, System.currentTimeMillis());
82+
}
83+
if (remainingBytes >= permits) {
84+
remainingBytes -= permits;
85+
return new Handle(permits, true, 1, System.currentTimeMillis());
86+
} else {
87+
long possible = remainingBytes;
88+
remainingBytes = 0;
89+
return new Handle(possible, false, 1, System.currentTimeMillis());
90+
}
91+
} else {
92+
if (current.trials >= 4 && current.acquiredPermits > 0) {
93+
remainingBytes += current.acquiredPermits;
94+
return new Handle(0, false, 1, current.creationTime);
95+
}
96+
if (remainingBytes == 0) {
97+
return new Handle(current.acquiredPermits, false, current.trials + 1,
98+
current.creationTime);
99+
}
100+
long needed = permits - current.acquiredPermits;
101+
if (remainingBytes >= needed) {
102+
remainingBytes -= needed;
103+
return new Handle(permits, true, current.trials + 1, current.creationTime);
104+
} else {
105+
long possible = remainingBytes;
106+
remainingBytes = 0;
107+
return new Handle(current.acquiredPermits + possible, false,
108+
current.trials + 1, current.creationTime);
109+
}
110+
}
111+
} finally {
112+
updateMetrics();
113+
}
114+
}
115+
}
116+
117+
void release(Handle handle) {
118+
if (handle == DISABLED) {
119+
return;
120+
}
121+
synchronized (this) {
122+
remainingBytes += handle.acquiredPermits;
123+
updateMetrics();
124+
}
125+
}
126+
127+
private synchronized void updateMetrics() {
128+
PULSAR_ML_READS_BUFFER_SIZE.set(maxReadsInFlightSize - remainingBytes);
129+
PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(remainingBytes);
130+
}
131+
132+
public boolean isDisabled() {
133+
return maxReadsInFlightSize <= 0;
134+
}
135+
136+
137+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,24 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static com.google.common.base.Preconditions.checkNotNull;
2323
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.collect.Lists;
2526
import com.google.common.primitives.Longs;
2627
import io.netty.buffer.ByteBuf;
2728
import io.netty.buffer.PooledByteBufAllocator;
2829
import java.util.Collection;
2930
import java.util.Iterator;
3031
import java.util.List;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.atomic.AtomicInteger;
3134
import org.apache.bookkeeper.client.api.BKException;
3235
import org.apache.bookkeeper.client.api.LedgerEntry;
3336
import org.apache.bookkeeper.client.api.ReadHandle;
37+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
3438
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
3539
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
40+
import org.apache.bookkeeper.mledger.Entry;
41+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
3642
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3743
import org.apache.bookkeeper.mledger.impl.EntryImpl;
3844
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -48,18 +54,27 @@
4854
*/
4955
public class RangeEntryCacheImpl implements EntryCache {
5056

57+
/**
58+
* Overhead per-entry to take into account the envelope.
59+
*/
60+
private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
61+
5162
private final RangeEntryCacheManagerImpl manager;
5263
private final ManagedLedgerImpl ml;
5364
private ManagedLedgerInterceptor interceptor;
5465
private final RangeCache<PositionImpl, EntryImpl> entries;
5566
private final boolean copyEntries;
67+
private volatile long estimatedEntrySize = 10 * 1024;
5668

69+
private final long readEntryTimeoutMillis;
5770
private static final double MB = 1024 * 1024;
5871

5972
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
6073
this.manager = manager;
6174
this.ml = ml;
75+
this.pendingReadsManager = new PendingReadsManager(this);
6276
this.interceptor = ml.getManagedLedgerInterceptor();
77+
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
6378
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
6479
this.copyEntries = copyEntries;
6580

@@ -68,11 +83,21 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl
6883
}
6984
}
7085

86+
@VisibleForTesting
87+
ManagedLedgerConfig getManagedLedgerConfig() {
88+
return ml.getConfig();
89+
}
90+
7191
@Override
7292
public String getName() {
7393
return ml.getName();
7494
}
7595

96+
@VisibleForTesting
97+
InflightReadsLimiter getPendingReadsLimiter() {
98+
return manager.getInflightReadsLimiter();
99+
}
100+
76101
public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, // preferDirect
77102
0, // nHeapArenas,
78103
PooledByteBufAllocator.defaultNumDirectArena(), // nDirectArena
@@ -256,6 +281,19 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
256281
@SuppressWarnings({ "unchecked", "rawtypes" })
257282
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
258283
final ReadEntriesCallback callback, Object ctx) {
284+
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
285+
}
286+
287+
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
288+
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
289+
290+
final AsyncCallbacks.ReadEntriesCallback callback =
291+
handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
292+
originalCallback, ctx, handle);
293+
if (callback == null) {
294+
return;
295+
}
296+
259297
final long ledgerId = lh.getId();
260298
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
261299
final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
@@ -329,6 +367,75 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
329367
}
330368
}
331369

370+
371+
private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
372+
long firstEntry, long lastEntry,
373+
boolean shouldCacheEntry,
374+
AsyncCallbacks.ReadEntriesCallback originalCallback,
375+
Object ctx, InflightReadsLimiter.Handle handle) {
376+
InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
377+
if (pendingReadsLimiter.isDisabled()) {
378+
return originalCallback;
379+
}
380+
long estimatedReadSize = (1 + lastEntry - firstEntry)
381+
* (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
382+
final AsyncCallbacks.ReadEntriesCallback callback;
383+
InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle);
384+
if (!newHandle.success) {
385+
long now = System.currentTimeMillis();
386+
if (now - newHandle.creationTime > readEntryTimeoutMillis) {
387+
String message = "Time-out elapsed while acquiring enough permits "
388+
+ "on the memory limiter to read from ledger "
389+
+ lh.getId()
390+
+ ", " + getName()
391+
+ ", estimated read size " + estimatedReadSize + " bytes"
392+
+ " for " + (1 + lastEntry - firstEntry)
393+
+ " entries (check managedLedgerMaxReadsInFlightSizeInMB)";
394+
log.error(message);
395+
pendingReadsLimiter.release(newHandle);
396+
originalCallback.readEntriesFailed(
397+
new ManagedLedgerException.TooManyRequestsException(message), ctx);
398+
return null;
399+
}
400+
ml.getExecutor().submitOrdered(lh.getId(), () -> {
401+
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
402+
originalCallback, ctx, newHandle);
403+
return null;
404+
});
405+
return null;
406+
} else {
407+
callback = new AsyncCallbacks.ReadEntriesCallback() {
408+
409+
@Override
410+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
411+
if (!entries.isEmpty()) {
412+
long size = entries.get(0).getLength();
413+
estimatedEntrySize = size;
414+
415+
AtomicInteger remainingCount = new AtomicInteger(entries.size());
416+
for (Entry entry : entries) {
417+
((EntryImpl) entry).onDeallocate(() -> {
418+
if (remainingCount.decrementAndGet() <= 0) {
419+
pendingReadsLimiter.release(newHandle);
420+
}
421+
});
422+
}
423+
} else {
424+
pendingReadsLimiter.release(newHandle);
425+
}
426+
originalCallback.readEntriesComplete(entries, ctx);
427+
}
428+
429+
@Override
430+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
431+
pendingReadsLimiter.release(newHandle);
432+
originalCallback.readEntriesFailed(exception, ctx);
433+
}
434+
};
435+
}
436+
return callback;
437+
}
438+
332439
@Override
333440
public void clear() {
334441
Pair<Integer, Long> removedPair = entries.clear();

0 commit comments

Comments
 (0)