diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java new file mode 100644 index 0000000000000..4c374d8ace606 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import io.prometheus.client.Counter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.AllArgsConstructor; +import lombok.Value; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; + +/** + * PendingReadsManager tries to prevent sending duplicate reads to BK. + */ +public class PendingReadsManager { + + private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter + .build() + .name("pulsar_ml_cache_pendingreads_entries_read") + .help("Total number of entries read from BK") + .register(); + + private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter + .build() + .name("pulsar_ml_cache_pendingreads_entries_notread") + .help("Total number of entries not read from BK") + .register(); + + private static final Counter COUNT_PENDING_READS_MATCHED = Counter + .build() + .name("pulsar_ml_cache_pendingreads_matched") + .help("Pending reads reused with perfect range match") + .register(); + private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter + .build() + .name("pulsar_ml_cache_pendingreads_matched_included") + .help("Pending reads reused by attaching to a read with a larger range") + .register(); + private static final Counter COUNT_PENDING_READS_MISSED = Counter + .build() + .name("pulsar_ml_cache_pendingreads_missed") + .help("Pending reads that didn't find a match") + .register(); + + private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter + .build() + .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left") + .help("Pending reads that didn't find a match but they partially overlap with another read") + .register(); + + private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter + .build() + .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right") + .help("Pending reads that didn't find a match but they partially overlap with another read") + .register(); + + private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter + .build() + .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both") + .help("Pending reads that didn't find a match but they partially overlap with another read") + .register(); + + private final RangeEntryCacheImpl rangeEntryCache; + private final ConcurrentHashMap> cachedPendingReads = + new ConcurrentHashMap<>(); + + public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) { + this.rangeEntryCache = rangeEntryCache; + } + + @Value + private static class PendingReadKey { + private final long startEntry; + private final long endEntry; + long size() { + return endEntry - startEntry + 1; + } + + + boolean includes(PendingReadKey other) { + return startEntry <= other.startEntry && other.endEntry <= endEntry; + } + + boolean overlaps(PendingReadKey other) { + return (other.startEntry <= startEntry && startEntry <= other.endEntry) + || (other.startEntry <= endEntry && endEntry <= other.endEntry); + } + + PendingReadKey reminderOnLeft(PendingReadKey other) { + // S******-----E + // S----------E + if (other.startEntry <= endEntry + && other.startEntry > startEntry) { + return new PendingReadKey(startEntry, other.startEntry - 1); + } + return null; + } + + PendingReadKey reminderOnRight(PendingReadKey other) { + // S-----*******E + // S-----------E + if (startEntry <= other.endEntry + && other.endEntry < endEntry) { + return new PendingReadKey(other.endEntry + 1, endEntry); + } + return null; + } + + } + + @AllArgsConstructor + private static final class ReadEntriesCallbackWithContext { + final AsyncCallbacks.ReadEntriesCallback callback; + final Object ctx; + final long startEntry; + final long endEntry; + } + + @AllArgsConstructor + private static final class FindPendingReadOutcome { + final PendingRead pendingRead; + final PendingReadKey missingOnLeft; + final PendingReadKey missingOnRight; + boolean needsAdditionalReads() { + return missingOnLeft != null || missingOnRight != null; + } + } + + private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map ledgerCache, AtomicBoolean created) { + synchronized (ledgerCache) { + PendingRead existing = ledgerCache.get(key); + if (existing != null) { + COUNT_PENDING_READS_MATCHED.inc(key.size()); + COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size()); + return new FindPendingReadOutcome(existing, null, null); + } + FindPendingReadOutcome foundButMissingSomethingOnLeft = null; + FindPendingReadOutcome foundButMissingSomethingOnRight = null; + FindPendingReadOutcome foundButMissingSomethingOnBoth = null; + + for (Map.Entry entry : ledgerCache.entrySet()) { + PendingReadKey entryKey = entry.getKey(); + if (entryKey.includes(key)) { + COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size()); + COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size()); + return new FindPendingReadOutcome(entry.getValue(), null, null); + } + if (entryKey.overlaps(key)) { + PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey); + PendingReadKey reminderOnRight = key.reminderOnRight(entryKey); + if (reminderOnLeft != null && reminderOnRight != null) { + foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(), + reminderOnLeft, reminderOnRight); + } else if (reminderOnRight != null && reminderOnLeft == null) { + foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(), + null, reminderOnRight); + } else if (reminderOnLeft != null && reminderOnRight == null) { + foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(), + reminderOnLeft, null); + } + } + } + + if (foundButMissingSomethingOnRight != null) { + long delta = key.size() + - foundButMissingSomethingOnRight.missingOnRight.size(); + COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta); + COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta); + return foundButMissingSomethingOnRight; + } else if (foundButMissingSomethingOnLeft != null) { + long delta = key.size() + - foundButMissingSomethingOnLeft.missingOnLeft.size(); + COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta); + COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta); + return foundButMissingSomethingOnLeft; + } else if (foundButMissingSomethingOnBoth != null) { + long delta = key.size() + - foundButMissingSomethingOnBoth.missingOnRight.size() + - foundButMissingSomethingOnBoth.missingOnLeft.size(); + COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta); + COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta); + return foundButMissingSomethingOnBoth; + } + + created.set(true); + PendingRead newRead = new PendingRead(key, ledgerCache); + ledgerCache.put(key, newRead); + long delta = key.size(); + COUNT_PENDING_READS_MISSED.inc(delta); + COUNT_ENTRIES_READ_FROM_BK.inc(delta); + return new FindPendingReadOutcome(newRead, null, null); + } + } + + private class PendingRead { + final PendingReadKey key; + final Map ledgerCache; + final List callbacks = new ArrayList<>(1); + boolean completed = false; + + public PendingRead(PendingReadKey key, + Map ledgerCache) { + this.key = key; + this.ledgerCache = ledgerCache; + } + + private List keepEntries(List list, long startEntry, long endEntry) { + List result = new ArrayList<>((int) (endEntry - startEntry)); + for (EntryImpl entry : list) { + long entryId = entry.getEntryId(); + if (startEntry <= entryId && entryId <= endEntry) { + result.add(entry); + } else { + entry.release(); + } + } + return result; + } + + public void attach(CompletableFuture> handle) { + // when the future is done remove this from the map + // new reads will go to a new instance + // this is required because we are going to do refcount management + // on the results of the callback + handle.whenComplete((___, error) -> { + synchronized (PendingRead.this) { + completed = true; + synchronized (ledgerCache) { + ledgerCache.remove(key, this); + } + } + }); + + handle.thenAcceptAsync(entriesToReturn -> { + synchronized (PendingRead.this) { + if (callbacks.size() == 1) { + ReadEntriesCallbackWithContext first = callbacks.get(0); + if (first.startEntry == key.startEntry + && first.endEntry == key.endEntry) { + // perfect match, no copy, this is the most common case + first.callback.readEntriesComplete((List) entriesToReturn, + first.ctx); + } else { + first.callback.readEntriesComplete( + (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry), + first.ctx); + } + } else { + for (ReadEntriesCallbackWithContext callback : callbacks) { + long callbackStartEntry = callback.startEntry; + long callbackEndEntry = callback.endEntry; + List copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1)); + for (EntryImpl entry : entriesToReturn) { + long entryId = entry.getEntryId(); + if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) { + EntryImpl entryCopy = EntryImpl.create(entry); + copy.add(entryCopy); + } + } + callback.callback.readEntriesComplete((List) copy, callback.ctx); + } + for (EntryImpl entry : entriesToReturn) { + entry.release(); + } + } + } + }, rangeEntryCache.getManagedLedger().getExecutor() + .chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> { + synchronized (PendingRead.this) { + for (ReadEntriesCallbackWithContext callback : callbacks) { + ManagedLedgerException mlException = createManagedLedgerException(exception); + callback.callback.readEntriesFailed(mlException, callback.ctx); + } + } + return null; + }); + } + + synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback, + Object ctx, long startEntry, long endEntry) { + if (completed) { + return false; + } + callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry)); + return true; + } + } + + + void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + + + final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry); + + Map pendingReadsForLedger = + cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new ConcurrentHashMap<>()); + + boolean listenerAdded = false; + while (!listenerAdded) { + AtomicBoolean createdByThisThread = new AtomicBoolean(); + FindPendingReadOutcome findBestCandidateOutcome = findPendingRead(key, + pendingReadsForLedger, createdByThisThread); + PendingRead pendingRead = findBestCandidateOutcome.pendingRead; + if (findBestCandidateOutcome.needsAdditionalReads()) { + AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + PendingReadKey missingOnLeft = findBestCandidateOutcome.missingOnLeft; + PendingReadKey missingOnRight = findBestCandidateOutcome.missingOnRight; + if (missingOnRight != null && missingOnLeft != null) { + AsyncCallbacks.ReadEntriesCallback readFromLeftCallback = + new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entriesFromLeft, Object dummyCtx1) { + AsyncCallbacks.ReadEntriesCallback readFromRightCallback = + new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entriesFromRight, + Object dummyCtx2) { + List finalResult = + new ArrayList<>(entriesFromLeft.size() + + entries.size() + entriesFromRight.size()); + finalResult.addAll(entriesFromLeft); + finalResult.addAll(entries); + finalResult.addAll(entriesFromRight); + callback.readEntriesComplete(finalResult, ctx); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, + Object dummyCtx3) { + callback.readEntriesFailed(exception, ctx); + } + }; + rangeEntryCache.asyncReadEntry0(lh, + missingOnRight.startEntry, missingOnRight.endEntry, + shouldCacheEntry, readFromRightCallback, null); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) { + callback.readEntriesFailed(exception, ctx); + } + }; + rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, + shouldCacheEntry, readFromLeftCallback, null); + } else if (missingOnLeft != null) { + AsyncCallbacks.ReadEntriesCallback readFromLeftCallback = + new AsyncCallbacks.ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List entriesFromLeft, + Object dummyCtx5) { + List finalResult = + new ArrayList<>(entriesFromLeft.size() + entries.size()); + finalResult.addAll(entriesFromLeft); + finalResult.addAll(entries); + callback.readEntriesComplete(finalResult, ctx); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, + Object dummyCtx6) { + callback.readEntriesFailed(exception, ctx); + } + }; + rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, + shouldCacheEntry, readFromLeftCallback, null); + } else if (missingOnRight != null) { + AsyncCallbacks.ReadEntriesCallback readFromRightCallback = + new AsyncCallbacks.ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List entriesFromRight, + Object dummyCtx7) { + List finalResult = + new ArrayList<>(entriesFromRight.size() + entries.size()); + finalResult.addAll(entries); + finalResult.addAll(entriesFromRight); + callback.readEntriesComplete(finalResult, ctx); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, + Object dummyCtx8) { + callback.readEntriesFailed(exception, ctx); + } + }; + rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry, + shouldCacheEntry, readFromRightCallback, null); + } + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + callback.readEntriesFailed(exception, ctx); + } + }; + listenerAdded = pendingRead.addListener(wrappedCallback, ctx, key.startEntry, key.endEntry); + } else { + listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry); + } + + + if (createdByThisThread.get()) { + CompletableFuture> readResult = rangeEntryCache.readFromStorage(lh, firstEntry, + lastEntry, shouldCacheEntry); + pendingRead.attach(readResult); + } + } + } + + void clear() { + cachedPendingReads.clear(); + } + + void invalidateLedger(long id) { + cachedPendingReads.remove(id); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index f8f5c328cd9e2..163728e0f0429 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -21,12 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -48,15 +50,18 @@ public class RangeEntryCacheImpl implements EntryCache { private final RangeEntryCacheManagerImpl manager; - private final ManagedLedgerImpl ml; + final ManagedLedgerImpl ml; private ManagedLedgerInterceptor interceptor; private final RangeCache entries; private final boolean copyEntries; + private final PendingReadsManager pendingReadsManager; + private static final double MB = 1024 * 1024; public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; + this.pendingReadsManager = new PendingReadsManager(this); this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); @@ -67,6 +72,11 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl } } + @VisibleForTesting + ManagedLedgerImpl getManagedLedger() { + return ml; + } + @Override public String getName() { return ml.getName(); @@ -185,6 +195,7 @@ public void invalidateAllEntries(long ledgerId) { } manager.entriesRemoved(sizeRemoved, entriesRemoved); + pendingReadsManager.invalidateLedger(ledgerId); } @Override @@ -235,6 +246,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt } }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { ml.invalidateLedgerHandle(lh); + pendingReadsManager.invalidateLedger(lh.getId()); callback.readEntryFailed(createManagedLedgerException(exception), ctx); return null; }); @@ -257,7 +269,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole } @SuppressWarnings({ "unchecked", "rawtypes" }) - private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, final ReadEntriesCallback callback, Object ctx) { final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; @@ -295,51 +307,71 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo } // Read all the entries from bookkeeper - lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { - requireNonNull(ml.getName()); - requireNonNull(ml.getExecutor()); + pendingReadsManager.readEntries(lh, firstEntry, lastEntry, + shouldCacheEntry, callback, ctx); - try { - // We got the entries, we need to transform them to a List<> type - long totalSize = 0; - final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); - for (LedgerEntry e : ledgerEntries) { - EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); - entriesToReturn.add(entry); - totalSize += entry.getLength(); - if (shouldCacheEntry) { - EntryImpl cacheEntry = EntryImpl.create(entry); - insert(cacheEntry); - cacheEntry.release(); + } + } + + /** + * Reads the entries from Storage. + * @param lh the handle + * @param firstEntry the first entry + * @param lastEntry the last entry + * @param shouldCacheEntry if we should put the entry into the cache + * @return a handle to the operation + */ + CompletableFuture> readFromStorage(ReadHandle lh, + long firstEntry, long lastEntry, boolean shouldCacheEntry) { + final int entriesToRead = (int) (lastEntry - firstEntry) + 1; + CompletableFuture> readResult = lh.readAsync(firstEntry, lastEntry) + .thenApply( + ledgerEntries -> { + requireNonNull(ml.getName()); + requireNonNull(ml.getExecutor()); + + try { + // We got the entries, we need to transform them to a List<> type + long totalSize = 0; + final List entriesToReturn = + Lists.newArrayListWithExpectedSize(entriesToRead); + for (LedgerEntry e : ledgerEntries) { + EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); + entriesToReturn.add(entry); + totalSize += entry.getLength(); + if (shouldCacheEntry) { + EntryImpl cacheEntry = EntryImpl.create(entry); + insert(cacheEntry); + cacheEntry.release(); + } } - } - manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); - ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); + manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); + ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); - callback.readEntriesComplete((List) entriesToReturn, ctx); - } finally { - ledgerEntries.close(); - } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { - if (exception instanceof BKException - && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - } else { - ml.invalidateLedgerHandle(lh); - ManagedLedgerException mlException = createManagedLedgerException(exception); - callback.readEntriesFailed(mlException, ctx); - } - return null; - }); - } + return entriesToReturn; + } finally { + ledgerEntries.close(); + } + }); + // handle LH invalidation + readResult.exceptionally(exception -> { + if (exception instanceof BKException + && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) { + } else { + ml.invalidateLedgerHandle(lh); + pendingReadsManager.invalidateLedger(lh.getId()); + } + return null; + }); + return readResult; } @Override public void clear() { Pair removedPair = entries.clear(); manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft()); + pendingReadsManager.clear(); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java new file mode 100644 index 0000000000000..f6d3ac881568e --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertNotSame; +import static org.testng.AssertJUnit.assertSame; + +@Slf4j +public class PendingReadsManagerTest { + + static final Object CTX = "foo"; + static final Object CTX2 = "far"; + static final long ledgerId = 123414L; + OrderedExecutor orderedExecutor; + + PendingReadsManagerTest() { + } + + @BeforeClass(alwaysRun = true) + void before() { + orderedExecutor = OrderedExecutor.newBuilder().build(); + } + + @AfterClass(alwaysRun = true) + void after() { + if (orderedExecutor != null) { + orderedExecutor.shutdown(); + orderedExecutor = null; + } + } + + + RangeEntryCacheImpl rangeEntryCache; + PendingReadsManager pendingReadsManager; + ReadHandle lh; + ManagedLedgerImpl ml; + + @BeforeMethod(alwaysRun = true) + void setupMocks() { + rangeEntryCache = mock(RangeEntryCacheImpl.class); + pendingReadsManager = new PendingReadsManager(rangeEntryCache); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock); + ReadHandle rh = invocationOnMock.getArgument(0); + long startEntry = invocationOnMock.getArgument(1); + long endEntry = invocationOnMock.getArgument(2); + boolean shouldCacheEntry = invocationOnMock.getArgument(3); + AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4); + Object ctx = invocationOnMock.getArgument(5); + pendingReadsManager.readEntries(lh, startEntry, endEntry, shouldCacheEntry, callback, ctx); + return null; + } + }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(), + anyBoolean(), any(), any()); + + lh = mock(ReadHandle.class); + ml = mock(ManagedLedgerImpl.class); + when(ml.getExecutor()).thenReturn(orderedExecutor); + when(rangeEntryCache.getManagedLedger()).thenReturn(ml); + } + + + @Data + private static class CapturingReadEntriesCallback extends CompletableFuture + implements AsyncCallbacks.ReadEntriesCallback { + List entries; + Object ctx; + Throwable error; + + @Override + public synchronized void readEntriesComplete(List entries, Object ctx) { + this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList()); + this.ctx = ctx; + this.error = null; + this.complete(null); + } + + @Override + public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + this.entries = null; + this.ctx = ctx; + this.error = exception; + this.completeExceptionally(exception); + } + + } + + private static List buildList(long start, long end) { + List result = new ArrayList<>(); + for (long i = start; i <= end; i++) { + long entryId = i; + EntryImpl entry = EntryImpl.create(ledgerId, entryId, "data".getBytes(StandardCharsets.UTF_8)); + result.add(entry); + } + return result; + } + + + private void verifyRange(List entries, long firstEntry, long endEntry) { + int pos = 0; + log.info("verifyRange numEntries {}", entries.size()); + for (long entry = firstEntry; entry <= endEntry; entry++) { + assertEquals(entries.get(pos++).getEntryId(), entry); + } + } + + private static class PreparedReadFromStorage extends CompletableFuture> { + final long firstEntry; + final long endEntry; + final boolean shouldCacheEntry; + + public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) { + this.firstEntry = firstEntry; + this.endEntry = endEntry; + this.shouldCacheEntry = shouldCacheEntry; + } + + @Override + public String toString() { + return "PreparedReadFromStorage("+firstEntry+","+endEntry+","+shouldCacheEntry+")"; + } + + public void storageReadCompleted() { + this.complete(buildList(firstEntry, endEntry)); + } + } + + private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache, + long firstEntry, long endEntry, boolean shouldCacheEntry) { + PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry); + log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry); + when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), eq(shouldCacheEntry))).thenAnswer( + (invocationOnMock -> { + log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry); + return read; + }) + ); + return read; + } + + @Test + public void simpleRead() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 + = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + // complete the read + read1.storageReadCompleted(); + + // wait for the callback to complete + callback.get(); + assertSame(callback.getCtx(), CTX); + + // verify + verifyRange(callback.entries, firstEntry, endEntry); + } + + + @Test + public void simpleConcurrentReadPerfectMatch() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback2, CTX2); + + // complete the read from BK + // only one read completes 2 callbacks + read1.storageReadCompleted(); + + callback.get(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntry, endEntry); + + int pos = 0; + for (long entry = firstEntry; entry <= endEntry; entry++) {; + assertNotSame(callback.entries.get(pos), callback2.entries.get(pos)); + assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(pos).getEntryId()); + pos++; + } + + } + + @Test + public void simpleConcurrentReadIncluding() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + + long firstEntrySecondRead = firstEntry + 10; + long endEntrySecondRead = endEntry - 10; + + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + + // complete the read from BK + // only one read completes 2 callbacks + read1.storageReadCompleted(); + + callback.get(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + + int pos = 0; + for (long entry = firstEntry; entry <= endEntry; entry++) {; + if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) { + int posInSecondList = (int) (pos - (firstEntrySecondRead - firstEntry)); + assertNotSame(callback.entries.get(pos), callback2.entries.get(posInSecondList)); + assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(posInSecondList).getEntryId()); + } + pos++; + } + + } + + @Test + public void simpleConcurrentReadMissingLeft() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + + long firstEntrySecondRead = firstEntry - 10; + long endEntrySecondRead = endEntry; + + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PreparedReadFromStorage readForLeft = + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + + // complete the read from BK + read1.storageReadCompleted(); + // the first read can move forward + callback.get(); + + readForLeft.storageReadCompleted(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + + } + + @Test + public void simpleConcurrentReadMissingRight() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + + long firstEntrySecondRead = firstEntry; + long endEntrySecondRead = endEntry + 10; + + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PreparedReadFromStorage readForRight = + prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + + // complete the read from BK + read1.storageReadCompleted(); + // the first read can move forward + callback.get(); + + readForRight.storageReadCompleted(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + + } + + @Test + public void simpleConcurrentReadMissingBoth() throws Exception { + + long firstEntry = 100; + long endEntry = 199; + + long firstEntrySecondRead = firstEntry - 10; + long endEntrySecondRead = endEntry + 10; + + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PreparedReadFromStorage readForLeft = + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry); + + PreparedReadFromStorage readForRight = + prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + + // complete the read from BK + read1.storageReadCompleted(); + // the first read can move forward + callback.get(); + + readForLeft.storageReadCompleted(); + readForRight.storageReadCompleted(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + + } + + + @Test + public void simpleConcurrentReadNoMatch() throws Exception { + long firstEntry = 100; + long endEntry = 199; + + long firstEntrySecondRead = 1000; + long endEntrySecondRead = 1099; + + boolean shouldCacheEntry = false; + + PreparedReadFromStorage read1 = + prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry); + + PreparedReadFromStorage read2 = + prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry); + + PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache); + CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX); + + CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback(); + pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2); + + read1.storageReadCompleted(); + callback.get(); + + read2.storageReadCompleted(); + callback2.get(); + + assertSame(callback.getCtx(), CTX); + assertSame(callback2.getCtx(), CTX2); + + verifyRange(callback.entries, firstEntry, endEntry); + verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead); + + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 9ef28842dfe1b..1d0b1be365934 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; @@ -1236,6 +1237,13 @@ public void testBacklogConsumerCacheReads() throws Exception { RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger)); cacheField.set(ledger, entryCache); + Field pendingReadsManagerField = RangeEntryCacheImpl.class.getDeclaredField("pendingReadsManager"); + pendingReadsManagerField.setAccessible(true); + PendingReadsManager pendingReadsManager = (PendingReadsManager) pendingReadsManagerField.get(entryCache); + Field cacheFieldInManager = PendingReadsManager.class.getDeclaredField("rangeEntryCache"); + cacheFieldInManager.setAccessible(true); + cacheFieldInManager.set(pendingReadsManager, entryCache); + // 2. Produce messages for (int i = 0; i < totalMessages; i++) { String message = "my-message-" + i;