-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-174: New managed ledger entry cache implementation #15955
base: master
Are you sure you want to change the base?
Conversation
@merlimat:Thanks for your contribution. For this PR, do we need to update docs? |
1 similar comment
@merlimat:Thanks for your contribution. For this PR, do we need to update docs? |
int offset = (int) (value >> 32); | ||
int entryLen = (int) value; | ||
|
||
ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's draft PR, I'm writing here an idea that crossed my mind.
On each get, we pay the penalty of creating ByteBuf, both heap object and direct memory allocation, then copying.
If would return a ByteBuf which is a linked ByteBuf (view) to the original ByteBuf?
It's still valid as long as we don't call clear().
Perhaps we can maintain an ever-increasing version number, which upon clear we increase it.
We can return a CachedByteBuf, which has a link to the cache and version it was cut from. It version got bigger, it means it's invalidated and can't be used anymore.
CachedByteBuf also be pooled if needed, since they are just long and a ByteBuf.
Just an idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could return a "retained slice" (a ByteBuf
that increments the ref-count and points to a portion of the original buffer) and avoid the copy on the read path.
The problem would be that this buffer could stay alive for an indefinite amount of time, in the case of some consumer connections being slow. With this, we'd be retaining a whole 1GB buffer even if a small message is pending on a TCP connection, and we cannot just overwrite the old cache segment when rotating because the reader could still be there.
Since we already have a flag to control the copy/not-copy of the cache, another approach I was thinking of was to keep maps of the original ByteBuf (so that we also eliminate the copy on insertion in the cache).
We still do the rotation based on rotating the segments, where each segment has its own hash map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have a flag to control the copy/not-copy of the cache, another approach I was thinking of was to keep maps of the original ByteBuf (so that we also eliminate the copy on insertion in the cache).
We still do the rotation based on rotating the segments, where each segment has its own hash map.
I didn't understand that part. What do you mean by maps of original bytebuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just incrementing the ref-count. It's similar to what we are currently doing, though without the overly-complex logic for cache eviction.
The pr had no activity for 30 days, mark with Stale label. |
ea6f491
to
5544ed2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just left some minor comments.
} else { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment.
+1
if (cachedEntries.size() == entriesToRead) { | ||
// All entries found in cache | ||
entryCacheManager.getFactoryMBean().recordCacheHits(entriesToRead, totalCachedSize); | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry, | ||
lastEntry); | ||
} | ||
|
||
callback.readEntriesComplete(cachedEntries, ctx); | ||
|
||
} else { | ||
if (!cachedEntries.isEmpty()) { | ||
cachedEntries.forEach(entry -> entry.release()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are safe to return part of the data from the cache? I'm not sure if I missed something, a little waste of resources to skip a partially hit cache data. The old implementation also follows this way, so we can also use a separate PR to improve this part if possible.
|
||
@Override | ||
public long getSize() { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add some comments here, return 0 here to avoid the cache eviction, and will not expose topic-level cache size metrics since the implementation shared the cache across all topics.
Checkstyle failed @merlimat |
The test |
/pulsarbot run-failure-checks |
@@ -204,6 +204,12 @@ public LongPair get(long key1, long key2) { | |||
return getSection(h).get(key1, key2, (int) h); | |||
} | |||
|
|||
public long getFirstValue(long key1, long key2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add some doc here. The method name is a bit confusing.
public long getFirstValue(long key1, long key2) { | |
/** | |
* @return get(key1, key2).first; | |
*/ | |
public long getFirstValue(long key1, long key2) { |
|
||
@Override | ||
public void updateCacheSizeAndThreshold(long maxSize) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should be supported when user update managedLedgerCacheSizeMB
. We can add an error log here to let user know this.
} else { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment.
+1
...src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java
Show resolved
Hide resolved
...src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java
Show resolved
Hide resolved
...ger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferCopy.java
Show resolved
Hide resolved
d620513
to
2cc107d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I would commit this to 2.11 only in case of keeping the old cache strategy as default, because nobody had enough time to test this new cache implementation and it is very dangerous to make it the default implementation now that we are close to the release.
import org.testng.annotations.Test; | ||
|
||
public class EntryCacheManagerTest extends MockedBookKeeperTestCase { | ||
|
||
ManagedLedgerImpl ml1; | ||
ManagedLedgerImpl ml2; | ||
|
||
@DataProvider(name = "EntryCacheManagerClass") | ||
public static Object[][] primeNumbers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: primeNumbers ?
The pr had no activity for 30 days, mark with Stale label. |
@merlimat it seems the review has been already done but we conflict a few files here. Could you rebase the patch onto master so that we can proceed the PR? |
The pr had no activity for 30 days, mark with Stale label. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase and adapt PendingReadsManager so that it can be used in SharedEntryCacheImpl besides RangeEntryCacheImpl. PendingReadsManager was introduced by @eolivelli in PR #17241 and it resulted in huge improvements.
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, | ||
AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { | ||
final long ledgerId = lh.getId(); | ||
final int entriesToRead = (int) (lastEntry - firstEntry) + 1; | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry); | ||
} | ||
|
||
List<Entry> cachedEntries = new ArrayList<>(entriesToRead); | ||
long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries); | ||
|
||
if (cachedEntries.size() == entriesToRead) { | ||
final List<Entry> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); | ||
// All entries found in cache | ||
for (Entry entry : cachedEntries) { | ||
entriesToReturn.add(EntryImpl.create((EntryImpl) entry)); | ||
entry.release(); | ||
} | ||
// All entries found in cache | ||
entryCacheManager.getFactoryMBean().recordCacheHits(entriesToReturn.size(), totalCachedSize); | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry, | ||
lastEntry); | ||
} | ||
callback.readEntriesComplete(entriesToReturn, ctx); | ||
|
||
} else { | ||
if (!cachedEntries.isEmpty()) { | ||
cachedEntries.forEach(entry -> entry.release()); | ||
} | ||
|
||
// Read all the entries from bookkeeper | ||
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( | ||
ledgerEntries -> { | ||
requireNonNull(ml.getName()); | ||
requireNonNull(ml.getExecutor()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess PendingReadsManager should be adapted and used here? It was introduced by #17241 and resulted in huge improvements.
Motivation
PIP-174: #15954
Provide new
SharedEntryCacheManagerImpl
implementationdoc-complete