Skip to content

Commit

Permalink
PIP-174: New managed ledger entry cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jul 25, 2022
1 parent 93e947b commit 5544ed2
Show file tree
Hide file tree
Showing 26 changed files with 1,784 additions and 80 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,12 @@ managedLedgerCacheSizeMB=
# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# The class name for the implementation of ManagedLedger cache manager component.
# Options are:
# - org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl
# - org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl
managedLedgerCacheManagerImplementationClass=org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,12 @@ managedLedgerCacheSizeMB=
# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# The class name for the implementation of ManagedLedger cache manager component.
# Options are:
# - org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl
# - org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl
managedLedgerCacheManagerImplementationClass=org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;

/**
Expand Down Expand Up @@ -91,4 +92,15 @@ public class ManagedLedgerFactoryConfig {
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();

/**
* Class name for the implementation of {@link org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager}.
*
* Options are:
* <ul>
* <li>{@link org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl}</li>
* <li>{@link org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl}</li>
* </ul>
*/
private String entryCacheManagerClassName = SharedEntryCacheManagerImpl.class.getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand Down Expand Up @@ -193,7 +192,11 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this);

Class<EntryCacheManager> ecmClass =
(Class<EntryCacheManager>) Class.forName(config.getEntryCacheManagerClassName());
this.entryCacheManager = ecmClass.getDeclaredConstructor(ManagedLedgerFactoryImpl.class).newInstance(this);

this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
Expand Down Expand Up @@ -592,7 +595,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
entryCacheManager.clear();

entryCacheManager.close();
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdown();
Expand Down Expand Up @@ -653,7 +657,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

scheduledExecutor.shutdownNow();

entryCacheManager.clear();
entryCacheManager.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
EntryImpl entry = EntryCacheManager.create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;

public interface EntryCacheManager {
public interface EntryCacheManager extends AutoCloseable {
EntryCache getEntryCache(ManagedLedgerImpl ml);

void removeEntryCache(String name);
Expand All @@ -36,4 +42,32 @@ public interface EntryCacheManager {
void updateCacheEvictionWatermark(double cacheEvictionWatermark);

double getCacheEvictionWatermark();

static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
}

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class RangeEntryCacheImpl implements EntryCache {

private final RangeEntryCacheManagerImpl manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;

Expand Down Expand Up @@ -221,7 +221,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -306,7 +306,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
long totalSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
EntryImpl entry = EntryCacheManager.create(e, interceptor);
entriesToReturn.add(entry);
totalSize += entry.getLength();
if (shouldCacheEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,13 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,29 +162,9 @@ public void clear() {
caches.values().forEach(EntryCache::clear);
}

public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
@Override
public void close() {
clear();
}

private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;

public interface SharedCacheSegment extends AutoCloseable {

boolean insert(long ledgerId, long entryId, ByteBuf entry);

ByteBuf get(long ledgerId, long entryId);

int getSize();

void clear();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;

class SharedCacheSegmentBufferCopy implements AutoCloseable, SharedCacheSegment {

private final ByteBuf cacheBuffer;
private final AtomicInteger currentOffset = new AtomicInteger();
private final ConcurrentLongLongPairHashMap index;
private final int segmentSize;

private static final int ALIGN_64_MASK = ~(64 - 1);

SharedCacheSegmentBufferCopy(int segmentSize) {
this.segmentSize = segmentSize;
this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
this.cacheBuffer.writerIndex(segmentSize - 1);
this.index = ConcurrentLongLongPairHashMap.newBuilder()
// We are going to often clear() the map, with the expectation that it's going to get filled again
// immediately after. In these conditions it does not make sense to shrink it each time.
.autoShrink(false)
.concurrencyLevel(Runtime.getRuntime().availableProcessors() * 8)
.build();
}

@Override
public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
int entrySize = entry.readableBytes();
int alignedSize = align64(entrySize);
int offset = currentOffset.getAndAdd(alignedSize);

if (offset + entrySize > segmentSize) {
// The segment is full
return false;
} else {
// Copy entry into read cache segment
cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
long value = offset << 32 | entrySize;
index.put(ledgerId, entryId, value, 0);
return true;
}
}

@Override
public ByteBuf get(long ledgerId, long entryId) {
long value = index.getFirstValue(ledgerId, entryId);
if (value >= 0) {
int offset = (int) (value >> 32);
int entryLen = (int) value;

ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);
entry.writeBytes(cacheBuffer, offset, entryLen);
return entry;
} else {
return null;
}
}

@Override
public int getSize() {
return currentOffset.get();
}

@Override
public void close() {
cacheBuffer.release();
}

private static int align64(int size) {
return (size + 64 - 1) & ALIGN_64_MASK;
}

@Override
public void clear() {
index.clear();
currentOffset.set(0);
}
}
Loading

0 comments on commit 5544ed2

Please sign in to comment.