Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-174: New managed ledger entry cache implementation #15955

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,12 @@ managedLedgerCacheSizeMB=
# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# The class name for the implementation of ManagedLedger cache manager component.
# Options are:
# - org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl
# - org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl
managedLedgerCacheManagerImplementationClass=org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,12 @@ managedLedgerCacheSizeMB=
# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false

# The class name for the implementation of ManagedLedger cache manager component.
# Options are:
# - org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl
# - org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl
managedLedgerCacheManagerImplementationClass=org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl

# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;

/**
Expand Down Expand Up @@ -91,4 +92,15 @@ public class ManagedLedgerFactoryConfig {
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();

/**
* Class name for the implementation of {@link org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager}.
*
* Options are:
* <ul>
* <li>{@link org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl}</li>
* <li>{@link org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl}</li>
* </ul>
*/
private String entryCacheManagerClassName = SharedEntryCacheManagerImpl.class.getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand Down Expand Up @@ -194,7 +193,11 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new RangeEntryCacheManagerImpl(this);

Class<EntryCacheManager> ecmClass =
(Class<EntryCacheManager>) Class.forName(config.getEntryCacheManagerClassName());
this.entryCacheManager = ecmClass.getDeclaredConstructor(ManagedLedgerFactoryImpl.class).newInstance(this);

this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
Expand Down Expand Up @@ -576,7 +579,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
entryCacheManager.clear();

entryCacheManager.close();
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdown();
Expand Down Expand Up @@ -637,7 +641,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

scheduledExecutor.shutdownNow();

entryCacheManager.clear();
entryCacheManager.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
EntryImpl entry = EntryCacheManager.create(e, interceptor);
entries.add(entry);
totalSize += entry.getLength();
}
Expand Down Expand Up @@ -118,7 +118,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;

public interface EntryCacheManager {
public interface EntryCacheManager extends AutoCloseable {
EntryCache getEntryCache(ManagedLedgerImpl ml);

void removeEntryCache(String name);
Expand All @@ -36,4 +42,32 @@ public interface EntryCacheManager {
void updateCacheEvictionWatermark(double cacheEvictionWatermark);

double getCacheEvictionWatermark();

static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
}

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class RangeEntryCacheImpl implements EntryCache {

private final RangeEntryCacheManagerImpl manager;
private final ManagedLedgerImpl ml;
private ManagedLedgerInterceptor interceptor;
private final ManagedLedgerInterceptor interceptor;
private final RangeCache<PositionImpl, EntryImpl> entries;
private final boolean copyEntries;

Expand Down Expand Up @@ -220,7 +220,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);

manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
Expand Down Expand Up @@ -305,7 +305,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
long totalSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
EntryImpl entry = EntryCacheManager.create(e, interceptor);
entriesToReturn.add(entry);
totalSize += entry.getLength();
if (shouldCacheEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,29 +162,9 @@ public void clear() {
caches.values().forEach(EntryCache::clear);
}

public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
@Override
public void close() {
clear();
}

private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;

public interface SharedCacheSegment extends AutoCloseable {

boolean insert(long ledgerId, long entryId, ByteBuf entry);

ByteBuf get(long ledgerId, long entryId);

int getSize();

void clear();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import io.netty.buffer.ByteBuf;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;

class SharedCacheSegmentBufferCopy implements AutoCloseable, SharedCacheSegment {

private final ByteBuf cacheBuffer;
private final AtomicInteger currentOffset = new AtomicInteger();
private final ConcurrentLongLongPairHashMap index;
private final int segmentSize;

private static final int ALIGN_64_MASK = ~(64 - 1);

SharedCacheSegmentBufferCopy(int segmentSize) {
this.segmentSize = segmentSize;
this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize, segmentSize);
this.cacheBuffer.writerIndex(segmentSize - 1);
this.index = ConcurrentLongLongPairHashMap.newBuilder()
// We are going to often clear() the map, with the expectation that it's going to get filled again
// immediately after. In these conditions it does not make sense to shrink it each time.
.autoShrink(false)
.concurrencyLevel(Runtime.getRuntime().availableProcessors() * 8)
.build();
}

@Override
public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
int entrySize = entry.readableBytes();
int alignedSize = align64(entrySize);
int offset = currentOffset.getAndAdd(alignedSize);

if (offset + entrySize > segmentSize) {
// The segment is full
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
currentOffset.getAndAdd(-alignedSize);
return false;
} else {
// Copy entry into read cache segment
cacheBuffer.setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
long value = ((long) offset << 32) | entrySize;
index.put(ledgerId, entryId, value, 0);
return true;
}
}

@Override
public ByteBuf get(long ledgerId, long entryId) {
long value = index.getFirstValue(ledgerId, entryId);
if (value >= 0) {
int offset = (int) (value >> 32);
int entryLen = (int) value;

ByteBuf entry = PulsarByteBufAllocator.DEFAULT.buffer(entryLen, entryLen);
entry.writeBytes(cacheBuffer, offset, entryLen);
return entry;
} else {
return null;
}
}

@Override
public int getSize() {
return currentOffset.get();
}

@Override
public void close() {
cacheBuffer.release();
}

private static int align64(int size) {
return (size + 64 - 1) & ALIGN_64_MASK;
}

@Override
public void clear() {
index.clear();
currentOffset.set(0);
}
}
Loading