Skip to content

Commit

Permalink
Extracted interface for EntryCacheManager (apache#15933)
Browse files Browse the repository at this point in the history
* Extracted interface for EntryCacheManager

* Fixed references

* added more methods to the interface

* Fixed mocked test

* Removed unused import

* Fixed wrong casting in reflection access
  • Loading branch information
merlimat authored Jun 6, 2022
1 parent 49ee8a6 commit c7faf62
Show file tree
Hide file tree
Showing 22 changed files with 353 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2832,7 +2832,7 @@ public void operationFailed(MetaStoreException e) {

boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.factory.isMetadataServiceAvailable()
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand All @@ -90,10 +92,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
private final boolean isBookkeeperManaged;
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;

private final ExecutorService cacheEvictionExecutor;

@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;

protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -189,7 +193,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -262,9 +264,16 @@ public enum PositionBound {
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;

@Getter
private final OrderedScheduler scheduledExecutor;

@Getter
private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;

@Getter
private final ManagedLedgerFactoryImpl factory;

@Getter
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;

Expand Down Expand Up @@ -1865,7 +1874,7 @@ void invalidateReadHandle(long ledgerId) {
}
}

void invalidateLedgerHandle(ReadHandle ledgerHandle) {
public void invalidateLedgerHandle(ReadHandle ledgerHandle) {
long ledgerId = ledgerHandle.getId();
LedgerHandle currentLedger = this.currentLedger;

Expand Down Expand Up @@ -3564,14 +3573,6 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
}

OrderedExecutor getExecutor() {
return executor;
}

private ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}
Expand Down Expand Up @@ -3679,10 +3680,6 @@ public State getState() {
return STATE_UPDATER.get(this);
}

public ManagedLedgerMBeanImpl getMBean() {
return mbean;
}

public long getCacheSize() {
return entryCache.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.reverseOrder;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void doEviction(List<EntryCache> caches, long sizeToFree) {
}

log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries,
cachesToEvict.size(), evictedSize / EntryCacheManager.MB);
cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB);
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheDefaultEvictionPolicy.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import java.util.Iterator;
import java.util.List;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.lang3.tuple.Pair;

/**
* Implementation of cache that always read from BookKeeper.
*/
public class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
}

@Override
public String getName() {
return ml.getName();
}

@Override
public boolean insert(EntryImpl entry) {
return false;
}

@Override
public void invalidateEntries(PositionImpl lastPosition) {
}

@Override
public void invalidateAllEntries(long ledgerId) {
}

@Override
public void clear() {
}

@Override
public Pair<Integer, Long> evictEntries(long sizeToFree) {
return Pair.of(0, (long) 0);
}

@Override
public void invalidateEntriesBeforeTimestamp(long timestamp) {
}

@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
List<Entry> entries = Lists.newArrayList();
long totalSize = 0;
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
} finally {
ledgerEntries.close();
}
ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize);
ml.getMbean().addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return;
}

try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);

ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}

@Override
public long getSize() {
return 0;
}

@Override
public int compareTo(EntryCache other) {
return Longs.compare(getSize(), other.getSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;

public interface EntryCacheManager {
EntryCache getEntryCache(ManagedLedgerImpl ml);

void removeEntryCache(String name);

long getSize();

long getMaxSize();

void clear();

void updateCacheSizeAndThreshold(long maxSize);

void updateCacheEvictionWatermark(double cacheEvictionWatermark);

double getCacheEvictionWatermark();
}
Loading

0 comments on commit c7faf62

Please sign in to comment.