-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[managed-ledger] Do not send duplicate reads to BK/offloaders #17241
[managed-ledger] Do not send duplicate reads to BK/offloaders #17241
Conversation
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Good work @eolivelli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem that the PR wants to resolve for now. And I think it's a good point of the reads improvement.
I have some questions
- We are using the PendingReadKey as the key to determining whether the new one is duplicated read or not. It looks can only cover the case that all the subscriptions with the same read position. It will be great if we can have a more general solution for the duplicated reads which able to handle the case like [0, 20], [2, 22]
- After this change, even if the subscription with different read position e.g. sub-a 1:10, sub-b 3:20. We will always introduce the new heap memory overhead.
I haven't thought of a specific way yet, but I think it's very necessary to discuss looking for a general solution for duplicated reads.
I think we can start the discussion on the mailing list. So that many contributors can share their ideas.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@zymap thank you for your idea. |
@codelipenghui @Jason918 @zymap @lhotari @nicoloboschi We did more testing and actually trying to find pending reads with ranges that "include" the requested range allows us to have many more "hits". in the picture:
the rate of "partial matches" is usually higher or equals to the rate of "misses" both during tailing reads and catch up reads. |
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
Outdated
Show resolved
Hide resolved
AtomicBoolean createdByThisThread = new AtomicBoolean(); | ||
CachedPendingRead cachedPendingRead = findBestCandidate(key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can only make the findBestCandidate() return null if no candidate and then create a new one to continue the read operation.
So that we don't need AtomicBoolean createdByThisThread = new AtomicBoolean();
and the code is easy to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findBestCandidate is kind of a computeIfAbsent method (when I was not looking for "includes" it was actually computeIfAbsent on a ConcurrentHashMap).
So I have to create the object and put it into the map inside the "lock"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just want to avoid creating AtomicBoolean
here.
Can we use the callback size of the cachedPendingRead
or a boolean inside CachedPendingRead
?
return result; | ||
} | ||
|
||
public void attach(CompletableFuture<List<EntryImpl>> handle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to remove the CachedPendingRead
from the cachedPendingReads
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
|
||
@Override | ||
public void clear() { | ||
Pair<Integer, Long> removedPair = entries.clear(); | ||
manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft()); | ||
cachedPendingReads.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed the element of the cachedPendingReads
only be removed here. This one will be called if all the cached data should be clean up
It's better to add a unit test for this one to make sure we will not introduce any heap memory leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the elements here are one entry per ledger.
the per ledger maps are evicted here
https://github.com/apache/pulsar/pull/17241/files#diff-c55509e3ab1389d89a58fd564f2e318dbb95f50121ab33c729a7ca4a21d02ef1R340
We could remove the entry for a ledger in case of rollover.
but I thought it requires more coordination.
I will try to improve this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove the entry for a ledger in case of rollover.
but I thought it requires more coordination.
I will try to improve this
Yes, we are on the same page.
|
||
private CachedPendingRead findBestCandidate(PendingReadKey key, Map<PendingReadKey, CachedPendingRead> ledgerCache, | ||
AtomicBoolean created) { | ||
synchronized (ledgerCache) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move the synchronized to method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to synchronize only on the Map that is for the specific "ledger", not on the whole RangeEntryCacheImpl
this way the lock is more fine grained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry. I read it wrong, I thought it was equal to synchronized(this).
List<EntryImpl> copy = new ArrayList<>(entriesToReturn.size()); | ||
long callbackStartEntry = callback.startEntry; | ||
long callbackEndEntry = callback.endEntry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can change to
long callbackStartEntry = callback.startEntry;
long callbackEndEntry = callback.endEntry
List copy = new ArrayList<>(callbackEndEntry - callbackStartEntry + 1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { | ||
synchronized (CachedPendingRead.this) { | ||
for (ReadEntriesCallbackWithContext callback : callbacks) { | ||
if (exception instanceof BKException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should unwrap the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And looks like we can just use createManagedLedgerException
directly without the if
check here.
public static ManagedLedgerException createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
} else if (t instanceof CompletionException
&& !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
} else {
log.error("Unknown exception for ManagedLedgerException.", t);
return new ManagedLedgerException("Other exception", t);
}
}
It already handled the CompletionException
and TooManyRequestsException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, thanks. previous code had a "invalidateLedger" that has been moved
5d07b94
to
64b8101
Compare
@zymap @codelipenghui @Jason918 @lhotari
I tried to do something more sophisticated, like reusing only overlapping pending reads only if the amount of overlapping entries is big enough, but in my testing I have seen that it seems always a good idea to reuse pending reads. |
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
Outdated
Show resolved
Hide resolved
@Jason918 thanks for your suggestions. I have applied them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improve! Looks good to me.
Do we need to add a unit test for the PendingReadsManager?
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
Show resolved
Hide resolved
I haven't pushed this part of my patch. I will update this PR soon. Thanks for the reminder. |
@zymap tests added |
Motivation
When you have many subscriptions on the same topic and they are catching up (for instance after some downtime of the consumer application) Pulsar will start to perform many reads from BK for the same entries.
Modifications
Prevent concurrent reads of the same entries.
Verifying this change
This change is already covered by existing tests.
I also tested manually that with this fix we are reducing a lot the pressure on the bookies. This is something we cannot code in a integration test as it will use too many resources on CI.
With some tests with only 64 subscription we saw a reduction from 100K reads/s from BK to 250 reads/s.