diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index a83994d1cd328..27ee3d4cb441f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -26,7 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback; -import org.apache.bookkeeper.mledger.impl.EntryCacheManager; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; /** * A factory to open/create managed ledgers and delete them. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 51b6ebaababa5..ab97c10b68791 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2766,8 +2766,8 @@ public void operationFailed(MetaStoreException e) { boolean shouldCloseLedger(LedgerHandle lh) { long now = clock.millis(); - if (ledger.factory.isMetadataServiceAvailable() && - (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger() + if (ledger.getFactory().isMetadataServiceAvailable() + && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger() || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000)) && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) { // It's safe to modify the timestamp since this method will be only called from a callback, implying that diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index c3e391ffab242..dce1789cd6c49 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -67,6 +67,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -89,10 +91,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory; private final boolean isBookkeeperManaged; private final ManagedLedgerFactoryConfig config; + @Getter protected final OrderedScheduler scheduledExecutor; private final ExecutorService cacheEvictionExecutor; + @Getter protected final ManagedLedgerFactoryMBeanImpl mbean; protected final ConcurrentHashMap> ledgers = new ConcurrentHashMap<>(); @@ -188,7 +192,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, config.getManagedCursorInfoCompressionType()); this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); - this.entryCacheManager = new EntryCacheManager(this); + this.entryCacheManager = new RangeEntryCacheManagerImpl(this); this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats), 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS); this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index cd47cf31ba3b2..345393384aaea 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -66,6 +66,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -114,6 +115,7 @@ import org.apache.bookkeeper.mledger.WaitingEntryCallBack; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -255,9 +257,16 @@ public enum PositionBound { .newUpdater(ManagedLedgerImpl.class, State.class, "state"); protected volatile State state = null; + @Getter private final OrderedScheduler scheduledExecutor; + + @Getter private final OrderedExecutor executor; - final ManagedLedgerFactoryImpl factory; + + @Getter + private final ManagedLedgerFactoryImpl factory; + + @Getter protected final ManagedLedgerMBeanImpl mbean; protected final Clock clock; @@ -1823,7 +1832,7 @@ void invalidateReadHandle(long ledgerId) { } } - void invalidateLedgerHandle(ReadHandle ledgerHandle) { + public void invalidateLedgerHandle(ReadHandle ledgerHandle) { long ledgerId = ledgerHandle.getId(); LedgerHandle currentLedger = this.currentLedger; @@ -3540,14 +3549,6 @@ public NavigableMap getLedgersInfo() { return ledgers; } - OrderedScheduler getScheduledExecutor() { - return scheduledExecutor; - } - - OrderedExecutor getExecutor() { - return executor; - } - private ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } @@ -3665,10 +3666,6 @@ public State getState() { return STATE_UPDATER.get(this); } - public ManagedLedgerMBeanImpl getMBean() { - return mbean; - } - public long getCacheSize() { return entryCache.getSize(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java similarity index 96% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java index 0e20a0facffb2..8f5b3e9b19e12 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger.impl.cache; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java similarity index 96% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java index df70188800fe6..8401fe32245ab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger.impl.cache; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Collections.reverseOrder; @@ -90,7 +90,7 @@ public void doEviction(List caches, long sizeToFree) { } log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries, - cachesToEvict.size(), evictedSize / EntryCacheManager.MB); + cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB); } private static final Logger log = LoggerFactory.getLogger(EntryCacheDefaultEvictionPolicy.class); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java new file mode 100644 index 0000000000000..a09b8ba27fc24 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import java.util.Iterator; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Implementation of cache that always read from BookKeeper. + */ +public class EntryCacheDisabled implements EntryCache { + private final ManagedLedgerImpl ml; + private final ManagedLedgerInterceptor interceptor; + + public EntryCacheDisabled(ManagedLedgerImpl ml) { + this.ml = ml; + this.interceptor = ml.getManagedLedgerInterceptor(); + } + + @Override + public String getName() { + return ml.getName(); + } + + @Override + public boolean insert(EntryImpl entry) { + return false; + } + + @Override + public void invalidateEntries(PositionImpl lastPosition) { + } + + @Override + public void invalidateAllEntries(long ledgerId) { + } + + @Override + public void clear() { + } + + @Override + public Pair evictEntries(long sizeToFree) { + return Pair.of(0, (long) 0); + } + + @Override + public void invalidateEntriesBeforeTimestamp(long timestamp) { + } + + @Override + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( + ledgerEntries -> { + List entries = Lists.newArrayList(); + long totalSize = 0; + try { + for (LedgerEntry e : ledgerEntries) { + // Insert the entries at the end of the list (they will be unsorted for now) + EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); + entries.add(entry); + totalSize += entry.getLength(); + } + } finally { + ledgerEntries.close(); + } + ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize); + ml.getMbean().addReadEntriesSample(entries.size(), totalSize); + + callback.readEntriesComplete(entries, ctx); + }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + return null; + }); + } + + @Override + public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, + Object ctx) { + lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync( + (ledgerEntries, exception) -> { + if (exception != null) { + ml.invalidateLedgerHandle(lh); + callback.readEntryFailed(createManagedLedgerException(exception), ctx); + return; + } + + try { + Iterator iterator = ledgerEntries.iterator(); + if (iterator.hasNext()) { + LedgerEntry ledgerEntry = iterator.next(); + EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor); + + ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength()); + ml.getMbean().addReadEntriesSample(1, returnEntry.getLength()); + callback.readEntryComplete(returnEntry, ctx); + } else { + callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), + ctx); + } + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())); + } + + @Override + public long getSize() { + return 0; + } + + @Override + public int compareTo(EntryCache other) { + return Longs.compare(getSize(), other.getSize()); + } + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java similarity index 96% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java index 341c5c328ade8..8c55ce7cf7f41 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger.impl.cache; import java.util.List; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java new file mode 100644 index 0000000000000..12cbb023f8691 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; + +public interface EntryCacheManager { + EntryCache getEntryCache(ManagedLedgerImpl ml); + + void removeEntryCache(String name); + + long getSize(); + + long getMaxSize(); + + void clear(); + + void updateCacheSizeAndThreshold(long maxSize); + + void updateCacheEvictionWatermark(double cacheEvictionWatermark); + + double getCacheEvictionWatermark(); +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java similarity index 94% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 0851d942bd1cb..5a99ce9ea4eb2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger.impl.cache; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -38,6 +38,9 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.RangeCache; import org.apache.commons.lang3.tuple.Pair; @@ -47,9 +50,9 @@ /** * Cache data payload for entries of all ledgers. */ -public class EntryCacheImpl implements EntryCache { +public class RangeEntryCacheImpl implements EntryCache { - private final EntryCacheManager manager; + private final RangeEntryCacheManagerImpl manager; private final ManagedLedgerImpl ml; private ManagedLedgerInterceptor interceptor; private final RangeCache entries; @@ -57,7 +60,7 @@ public class EntryCacheImpl implements EntryCache { private static final double MB = 1024 * 1024; - public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) { + public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); @@ -218,10 +221,10 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt Iterator iterator = ledgerEntries.iterator(); if (iterator.hasNext()) { LedgerEntry ledgerEntry = iterator.next(); - EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor); + EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor); manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); - ml.mbean.addReadEntriesSample(1, returnEntry.getLength()); + ml.getMbean().addReadEntriesSample(1, returnEntry.getLength()); callback.readEntryComplete(returnEntry, ctx); } else { // got an empty sequence @@ -304,14 +307,14 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); for (LedgerEntry e : ledgerEntries) { - EntryImpl entry = EntryCacheManager.create(e, interceptor); + EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); entriesToReturn.add(entry); totalSize += entry.getLength(); } manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); - ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); + ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); callback.readEntriesComplete((List) entriesToReturn, ctx); } finally { @@ -369,5 +372,5 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { manager.entriesRemoved(evictedSize); } - private static final Logger log = LoggerFactory.getLogger(EntryCacheImpl.class); + private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java similarity index 56% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 0165fc5b1970d..3c12c4605b2ec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -16,35 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bookkeeper.mledger.impl; +package org.apache.bookkeeper.mledger.impl.cache; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.primitives.Longs; import io.netty.buffer.ByteBuf; -import java.util.Iterator; -import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.api.LedgerEntry; -import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings("checkstyle:javadoctype") -public class EntryCacheManager { +public class RangeEntryCacheManagerImpl implements EntryCacheManager { private volatile long maxSize; private volatile long evictionTriggerThreshold; @@ -63,13 +58,13 @@ public class EntryCacheManager { private static final double evictionTriggerThresholdPercent = 0.98; - public EntryCacheManager(ManagedLedgerFactoryImpl factory) { + public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) { this.maxSize = factory.getConfig().getMaxCacheSize(); this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark(); this.evictionPolicy = new EntryCacheDefaultEvictionPolicy(); this.mlFactory = factory; - this.mlFactoryMBean = factory.mbean; + this.mlFactoryMBean = factory.getMbean(); log.info("Initialized managed-ledger entry cache of {} Mb", maxSize / MB); } @@ -80,7 +75,7 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) { return new EntryCacheDisabled(ml); } - EntryCache newEntryCache = new EntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache()); + EntryCache newEntryCache = new RangeEntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache()); EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache); if (currentEntryCache != null) { return currentEntryCache; @@ -89,16 +84,19 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) { } } + @Override public void updateCacheSizeAndThreshold(long maxSize) { this.maxSize = maxSize; this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); } + @Override public void updateCacheEvictionWatermark(double cacheEvictionWatermark) { this.cacheEvictionWatermark = cacheEvictionWatermark; } - void removeEntryCache(String name) { + @Override + public void removeEntryCache(String name) { EntryCache entryCache = caches.remove(name); if (entryCache == null) { return; @@ -117,7 +115,7 @@ boolean hasSpaceInCache() { // Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) { - mlFactory.scheduledExecutor.execute(safeRun(() -> { + mlFactory.getScheduledExecutor().execute(safeRun(() -> { // Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark // percentage limit long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermark); @@ -151,130 +149,26 @@ void entriesRemoved(long size) { currentSize.addAndGet(-size); } + @Override public long getSize() { return currentSize.get(); } + @Override public long getMaxSize() { return maxSize; } + @Override public double getCacheEvictionWatermark() { return cacheEvictionWatermark; } + @Override public void clear() { caches.values().forEach(EntryCache::clear); } - protected class EntryCacheDisabled implements EntryCache { - private final ManagedLedgerImpl ml; - private final ManagedLedgerInterceptor interceptor; - - public EntryCacheDisabled(ManagedLedgerImpl ml) { - this.ml = ml; - this.interceptor = ml.getManagedLedgerInterceptor(); - } - - @Override - public String getName() { - return ml.getName(); - } - - @Override - public boolean insert(EntryImpl entry) { - return false; - } - - @Override - public void invalidateEntries(PositionImpl lastPosition) { - } - - @Override - public void invalidateAllEntries(long ledgerId) { - } - - @Override - public void clear() { - } - - @Override - public Pair evictEntries(long sizeToFree) { - return Pair.of(0, (long) 0); - } - - @Override - public void invalidateEntriesBeforeTimestamp(long timestamp) { - } - - @Override - public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, - final ReadEntriesCallback callback, Object ctx) { - lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { - List entries = Lists.newArrayList(); - long totalSize = 0; - try { - for (LedgerEntry e : ledgerEntries) { - // Insert the entries at the end of the list (they will be unsorted for now) - EntryImpl entry = create(e, interceptor); - entries.add(entry); - totalSize += entry.getLength(); - } - } finally { - ledgerEntries.close(); - } - mlFactoryMBean.recordCacheMiss(entries.size(), totalSize); - ml.mbean.addReadEntriesSample(entries.size(), totalSize); - - callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - return null; - }); - } - - @Override - public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, - Object ctx) { - lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync( - (ledgerEntries, exception) -> { - if (exception != null) { - ml.invalidateLedgerHandle(lh); - callback.readEntryFailed(createManagedLedgerException(exception), ctx); - return; - } - - try { - Iterator iterator = ledgerEntries.iterator(); - if (iterator.hasNext()) { - LedgerEntry ledgerEntry = iterator.next(); - EntryImpl returnEntry = create(ledgerEntry, interceptor); - - mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); - ml.getMBean().addReadEntriesSample(1, returnEntry.getLength()); - callback.readEntryComplete(returnEntry, ctx); - } else { - callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx); - } - } finally { - ledgerEntries.close(); - } - }, ml.getExecutor().chooseThread(ml.getName())); - } - - @Override - public long getSize() { - return 0; - } - - @Override - public int compareTo(EntryCache other) { - return Longs.compare(getSize(), other.getSize()); - } - - } - public static Entry create(long ledgerId, long entryId, ByteBuf data) { return EntryImpl.create(ledgerId, entryId, data); } @@ -300,5 +194,5 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor return returnEntry; } - private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class); + private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java new file mode 100644 index 0000000000000..898d250aa7f54 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; \ No newline at end of file diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 315eafa5a3989..ae4820b2323a0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -40,6 +40,9 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.testng.Assert; import org.testng.annotations.Test; @@ -56,8 +59,9 @@ protected void setUpTestCase() throws Exception { ml1 = mock(ManagedLedgerImpl.class); when(ml1.getScheduledExecutor()).thenReturn(executor); when(ml1.getName()).thenReturn("cache1"); - when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); + when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); when(ml1.getExecutor()).thenReturn(super.executor); + when(ml1.getFactory()).thenReturn(factory); ml2 = mock(ManagedLedgerImpl.class); when(ml2.getScheduledExecutor()).thenReturn(executor); @@ -83,13 +87,13 @@ public void simple() throws Exception { assertEquals(cache1.getSize(), 7); assertEquals(cacheManager.getSize(), 7); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheMaxSize(), 10); + assertEquals(factory2.getMbean().getCacheUsedSize(), 7); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); cache2.insert(EntryImpl.create(2, 0, new byte[1])); cache2.insert(EntryImpl.create(2, 1, new byte[1])); @@ -117,14 +121,14 @@ public void simple() throws Exception { assertEquals(cacheManager.getSize(), 2); assertEquals(cache2.getSize(), 2); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 2); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 1); + assertEquals(factory2.getMbean().getCacheMaxSize(), 10); + assertEquals(factory2.getMbean().getCacheUsedSize(), 2); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1); } @Test @@ -197,8 +201,8 @@ public void cacheDisabled() throws Exception { EntryCache cache1 = cacheManager.getEntryCache(ml1); EntryCache cache2 = cacheManager.getEntryCache(ml2); - assertTrue(cache1 instanceof EntryCacheManager.EntryCacheDisabled); - assertTrue(cache2 instanceof EntryCacheManager.EntryCacheDisabled); + assertTrue(cache1 instanceof EntryCacheDisabled); + assertTrue(cache2 instanceof EntryCacheDisabled); cache1.insert(EntryImpl.create(1, 1, new byte[4])); cache1.insert(EntryImpl.create(1, 0, new byte[3])); @@ -206,13 +210,13 @@ public void cacheDisabled() throws Exception { assertEquals(cache1.getSize(), 0); assertEquals(cacheManager.getSize(), 0); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 0); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheMaxSize(), 0); + assertEquals(factory2.getMbean().getCacheUsedSize(), 0); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); cache2.insert(EntryImpl.create(2, 0, new byte[1])); cache2.insert(EntryImpl.create(2, 1, new byte[1])); @@ -242,13 +246,13 @@ public void verifyNoCacheIfNoConsumer() throws Exception { assertEquals(cache1.getSize(), 0); assertEquals(cacheManager.getSize(), 0); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 7 * 10); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheMaxSize(), 7 * 10); + assertEquals(factory2.getMbean().getCacheUsedSize(), 0); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); } @Test @@ -271,54 +275,54 @@ public void verifyHitsMisses() throws Exception { ledger.addEntry(("entry-" + i).getBytes()); } - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheUsedSize(), 70); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); List entries = c1.readEntries(10); assertEquals(entries.size(), 10); entries.forEach(e -> e.release()); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheUsedSize(), 70); + assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); ledger.deactivateCursor(c1); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheUsedSize(), 70); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); entries = c2.readEntries(10); assertEquals(entries.size(), 10); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheUsedSize(), 70); + assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition(); c2.setReadPosition(pos); ledger.discardEntriesFromCache(c2, pos); entries.forEach(e -> e.release()); - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); - assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + factory2.getMbean().refreshStats(1, TimeUnit.SECONDS); + assertEquals(factory2.getMbean().getCacheUsedSize(), 7); + assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0); + assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0); + assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0); + assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index d64ef31e49a58..888cbcdbbd75f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -42,6 +42,8 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.mockito.Mockito; import org.testng.Assert; @@ -57,7 +59,7 @@ protected void setUpTestCase() throws Exception { ml = mock(ManagedLedgerImpl.class); when(ml.getName()).thenReturn("name"); when(ml.getExecutor()).thenReturn(executor); - when(ml.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); + when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); } @Test(timeOut = 5000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index fe990468a7c4b..c1fee27b796ba 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.testng.annotations.Test; @@ -216,12 +217,12 @@ public void verifyConcurrentUsage() throws Exception { future.get(); } - cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS); + factory.getMbean().refreshStats(1, TimeUnit.SECONDS); - assertTrue(cacheManager.mlFactoryMBean.getCacheHitsRate() > 0.0); - assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0); - assertTrue(cacheManager.mlFactoryMBean.getCacheHitsThroughput() > 0.0); - assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0); + assertTrue(factory.getMbean().getCacheHitsRate() > 0.0); + assertEquals(factory.getMbean().getCacheMissesRate(), 0.0); + assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0); + assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 348ee81d4e2bc..9faf26e5997aa 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -110,23 +110,26 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; -import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; -import org.apache.pulsar.metadata.api.extended.SessionEvent; -import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -2591,9 +2594,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti Set activeCursors = Sets.newHashSet(); activeCursors.add(cursor1); activeCursors.add(cursor2); - Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); - cacheField.setAccessible(true); - EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger); + EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache"); Iterator activeCursor = ledger.getActiveCursors().iterator(); @@ -2665,10 +2666,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti @Test public void testActiveDeactiveCursor() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_eviction_ledger"); - - Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); - cacheField.setAccessible(true); - EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger); + EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache"); final int totalInsertedEntries = 20; for (int i = 0; i < totalInsertedEntries; i++) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index f032e2415e1d6..1fab473c4d0b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.service.nonpersistent; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create; import com.carrotsearch.hppc.ObjectObjectHashMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java index 43dbd642f8056..5dd4c0b0402c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java @@ -25,7 +25,7 @@ import io.netty.buffer.PoolSubpageMetric; import io.netty.buffer.PooledByteBufAllocator; import java.util.stream.Collectors; -import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.common.stats.AllocatorStats; import org.apache.pulsar.common.stats.AllocatorStats.PoolArenaStats; import org.apache.pulsar.common.stats.AllocatorStats.PoolChunkListStats; @@ -38,7 +38,7 @@ public static AllocatorStats generate(String allocatorName) { if ("default".equals(allocatorName)) { allocator = PooledByteBufAllocator.DEFAULT; } else if ("ml-cache".equals(allocatorName)) { - allocator = EntryCacheImpl.ALLOCATOR; + allocator = RangeEntryCacheImpl.ALLOCATOR; } else { throw new IllegalArgumentException("Invalid allocator name : " + allocatorName); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java index 2474ce2b8ded5..8e46b4cf254af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java @@ -25,7 +25,7 @@ import io.netty.buffer.PooledByteBufAllocator; import java.util.List; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; -import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.stats.Metrics; @@ -54,7 +54,7 @@ public synchronized List generate() { m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput()); m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput()); - PooledByteBufAllocator allocator = EntryCacheImpl.ALLOCATOR; + PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR; long activeAllocations = 0; long activeAllocationsSmall = 0; long activeAllocationsNormal = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 9de664f98ec9d..bef9c9d772d4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -44,9 +44,11 @@ import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -310,7 +312,7 @@ public void testActiveSubscriptionWithCache() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); cacheField.setAccessible(true); - EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger); + EntryCache entryCache = (EntryCache) cacheField.get(ledger); /************* Validation on non-empty active-cursor **************/ // (4) Get ActiveCursor : which is list of active subscription diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 555191ae59988..8a5b87d2c337d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,8 +77,8 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; -import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -106,6 +106,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1068,13 +1069,9 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); - cacheField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(cacheField, cacheField.getModifiers() & ~Modifier.FINAL); - EntryCacheImpl entryCache = spy((EntryCacheImpl) cacheField.get(ledger)); - cacheField.set(ledger, entryCache); + EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache")); + Whitebox.setInternalState(ledger, "entryCache", entryCache); + Message msg; // 2. Produce messages diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index e4cb941c650ca..6b5f09f69c505 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -31,8 +31,6 @@ import com.google.common.collect.Sets; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; @@ -52,8 +50,8 @@ import java.util.stream.Collectors; import lombok.Cleanup; -import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -76,6 +74,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -651,13 +650,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); - Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); - cacheField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(cacheField, cacheField.getModifiers() & ~Modifier.FINAL); - EntryCacheImpl entryCache = spy((EntryCacheImpl) cacheField.get(ledger)); - cacheField.set(ledger, entryCache); + EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache")); + Whitebox.setInternalState(ledger, "entryCache", entryCache); Messagemsg = null; // 2. Produce messages