Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted interface for EntryCacheManager #15933

Merged
merged 6 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2832,7 +2832,7 @@ public void operationFailed(MetaStoreException e) {

boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.factory.isMetadataServiceAvailable()
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand All @@ -90,10 +92,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
private final boolean isBookkeeperManaged;
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;

private final ExecutorService cacheEvictionExecutor;

@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;

protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -189,7 +193,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -262,9 +264,16 @@ public enum PositionBound {
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;

@Getter
private final OrderedScheduler scheduledExecutor;

@Getter
private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;

@Getter
private final ManagedLedgerFactoryImpl factory;

@Getter
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;

Expand Down Expand Up @@ -1865,7 +1874,7 @@ void invalidateReadHandle(long ledgerId) {
}
}

void invalidateLedgerHandle(ReadHandle ledgerHandle) {
public void invalidateLedgerHandle(ReadHandle ledgerHandle) {
long ledgerId = ledgerHandle.getId();
LedgerHandle currentLedger = this.currentLedger;

Expand Down Expand Up @@ -3564,14 +3573,6 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
}

OrderedExecutor getExecutor() {
return executor;
}

private ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}
Expand Down Expand Up @@ -3679,10 +3680,6 @@ public State getState() {
return STATE_UPDATER.get(this);
}

public ManagedLedgerMBeanImpl getMBean() {
return mbean;
}

public long getCacheSize() {
return entryCache.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.reverseOrder;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void doEviction(List<EntryCache> caches, long sizeToFree) {
}

log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries,
cachesToEvict.size(), evictedSize / EntryCacheManager.MB);
cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB);
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheDefaultEvictionPolicy.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import java.util.Iterator;
import java.util.List;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.lang3.tuple.Pair;

/**
* Implementation of cache that always read from BookKeeper.
*/
public class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
}

@Override
public String getName() {
return ml.getName();
}

@Override
public boolean insert(EntryImpl entry) {
return false;
}

@Override
public void invalidateEntries(PositionImpl lastPosition) {
}

@Override
public void invalidateAllEntries(long ledgerId) {
}

@Override
public void clear() {
}

@Override
public Pair<Integer, Long> evictEntries(long sizeToFree) {
return Pair.of(0, (long) 0);
}

@Override
public void invalidateEntriesBeforeTimestamp(long timestamp) {
}

@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
List<Entry> entries = Lists.newArrayList();
long totalSize = 0;
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
} finally {
ledgerEntries.close();
}
ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize);
ml.getMbean().addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return;
}

try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);

ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}

@Override
public long getSize() {
return 0;
}

@Override
public int compareTo(EntryCache other) {
return Longs.compare(getSize(), other.getSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;

public interface EntryCacheManager {
EntryCache getEntryCache(ManagedLedgerImpl ml);

void removeEntryCache(String name);

long getSize();

long getMaxSize();

void clear();

void updateCacheSizeAndThreshold(long maxSize);

void updateCacheEvictionWatermark(double cacheEvictionWatermark);

double getCacheEvictionWatermark();
}
Loading