cacheManagers = ImmutableMap.of(
+ "RangeEntryCacheManager", RangeEntryCacheManagerImpl.class.getName(),
+ "SharedEntryCacheManager", SharedEntryCacheManagerImpl.class.getName());
+
+ public enum CopyMode {
+ Copy,
+ RefCount,
+ }
+
+ @State(Scope.Benchmark)
+ public static class TestState {
+ @Param({
+// "RangeEntryCacheManager",
+ "SharedEntryCacheManager",
+ })
+ private String entryCacheManagerName;
+
+ @Param({
+ "Copy",
+ "RefCount",
+ })
+ private CopyMode copyMode;
+
+ @Param({
+// "100",
+// "1024",
+ "65536",
+ })
+ private int entrySize;
+
+ private OrderedExecutor executor;
+ private MetadataStoreExtended metadataStore;
+ private ManagedLedgerFactoryImpl mlf;
+ private EntryCache entryCache;
+
+ private ByteBuf buffer;
+
+ @Setup(Level.Trial)
+ public void setup() throws Exception {
+ executor = OrderedExecutor.newBuilder().build();
+ metadataStore = MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build());
+
+ ManagedLedgerFactoryConfig mlfc = new ManagedLedgerFactoryConfig();
+ mlfc.setEntryCacheManagerClassName(cacheManagers.get(entryCacheManagerName));
+ mlfc.setCopyEntriesInCache(copyMode == CopyMode.Copy);
+ mlfc.setMaxCacheSize(1 * 1024 * 1024 * 1024);
+ PulsarMockBookKeeper bkc = new PulsarMockBookKeeper(executor);
+ mlf = new ManagedLedgerFactoryImpl(metadataStore, bkc, mlfc);
+
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) mlf.open("test-managed-ledger");
+
+ entryCache = mlf.getEntryCacheManager().getEntryCache(ml);
+
+ buffer = PooledByteBufAllocator.DEFAULT.directBuffer();
+ buffer.writeBytes(new byte[entrySize]);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws Exception {
+ mlf.shutdown();
+ metadataStore.close();
+
+ System.out.println("REF-COUNT: " + buffer.refCnt());
+ buffer.release();
+ executor.shutdownNow();
+ }
+ }
+
+ private static final AtomicLong ledgerIdSeq = new AtomicLong();
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ private long ledgerId;
+ private long entryId;
+
+ @Setup(Level.Iteration)
+ public void setup() throws Exception {
+ ledgerId = ledgerIdSeq.incrementAndGet();
+ entryId = 0;
+ }
+ }
+
+ @Benchmark
+ public void insertIntoCache(TestState s, ThreadState ts) {
+ EntryImpl entry = EntryImpl.create(ts.ledgerId, ts.entryId, s.buffer.duplicate());
+ s.entryCache.insert(entry);
+ ts.entryId++;
+ entry.release();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 34e302ffc4893..1c5202a105537 100644
--- a/pom.xml
+++ b/pom.xml
@@ -231,6 +231,7 @@ flexible messaging model and an intuitive client API.
1.17.2
2.2
+ 1.35
3.2.13
@@ -1290,6 +1291,18 @@ flexible messaging model and an intuitive client API.
netty-reactive-streams
${netty-reactive-streams.version}
+
+
+
+ org.openjdk.jmh
+ jmh-core
+ ${jmh.version}
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ ${jmh.version}
+
@@ -2117,6 +2130,7 @@ flexible messaging model and an intuitive client API.
distribution
docker
tests
+ microbenchmarks
@@ -2145,6 +2159,7 @@ flexible messaging model and an intuitive client API.
pulsar-broker-auth-sasl
pulsar-client-auth-sasl
pulsar-config-validation
+ microbenchmarks
pulsar-transaction
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ad4188d288275..05d25526bc692 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1786,6 +1786,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
+
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "The class name for the implementation of ManagedLedger cache manager component.\n"
+ + "Options are:\n"
+ + " - org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl\n"
+ + " - org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl")
+ private String managedLedgerCacheManagerImplementationClass =
+ "org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl";
+
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index bb7cb6ffd8d7e..c4f8ad4407218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -63,6 +63,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
+ managedLedgerFactoryConfig.setEntryCacheManagerClassName(conf.getManagedLedgerStorageClassName());
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(
conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7473fdaf78632..2fbbeeccbca3f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -40,6 +39,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -183,7 +183,7 @@ public void publishMessage(ByteBuf data, PublishContext callback) {
subscriptions.forEach((name, subscription) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
- Entry entry = create(0L, 0L, duplicateBuffer);
+ Entry entry = EntryCacheManager.create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
@@ -198,7 +198,7 @@ public void publishMessage(ByteBuf data, PublishContext callback) {
if (!replicators.isEmpty()) {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
- Entry entry = create(0L, 0L, duplicateBuffer);
+ Entry entry = EntryCacheManager.create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
replicator.sendMessage(entry);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
index eac7268ba672d..d7ef217bd2bae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
@@ -204,6 +204,12 @@ public LongPair get(long key1, long key2) {
return getSection(h).get(key1, key2, (int) h);
}
+ public long getFirstValue(long key1, long key2) {
+ checkBiggerEqualZero(key1);
+ long h = hash(key1, key2);
+ return getSection(h).getFirstValue(key1, key2, (int) h);
+ }
+
public boolean containsKey(long key1, long key2) {
return get(key1, key2) != null;
}
@@ -370,6 +376,55 @@ LongPair get(long key1, long key2, int keyHash) {
}
}
+ long getFirstValue(long key1, long key2, int keyHash) {
+ long stamp = tryOptimisticRead();
+ boolean acquiredLock = false;
+ int bucket = signSafeMod(keyHash, capacity);
+
+ try {
+ while (true) {
+ // First try optimistic locking
+ long storedKey1 = table[bucket];
+ long storedKey2 = table[bucket + 1];
+ long storedValue1 = table[bucket + 2];
+
+ if (!acquiredLock && validate(stamp)) {
+ // The values we have read are consistent
+ if (key1 == storedKey1 && key2 == storedKey2) {
+ return storedValue1;
+ } else if (storedKey1 == EmptyKey) {
+ // Not found
+ return ValueNotFound;
+ }
+ } else {
+ // Fallback to acquiring read lock
+ if (!acquiredLock) {
+ stamp = readLock();
+ acquiredLock = true;
+
+ bucket = signSafeMod(keyHash, capacity);
+ storedKey1 = table[bucket];
+ storedKey2 = table[bucket + 1];
+ storedValue1 = table[bucket + 2];
+ }
+
+ if (key1 == storedKey1 && key2 == storedKey2) {
+ return storedValue1;
+ } else if (storedKey1 == EmptyKey) {
+ // Not found
+ return ValueNotFound;
+ }
+ }
+
+ bucket = (bucket + 4) & (table.length - 1);
+ }
+ } finally {
+ if (acquiredLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean onlyIfAbsent) {
long stamp = writeLock();
int bucket = signSafeMod(keyHash, capacity);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairObjectHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairObjectHashMap.java
new file mode 100644
index 0000000000000..d3ccb61e3f40e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairObjectHashMap.java
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Map from long to an Object.
+ *
+ * Provides similar methods as a {@code ConcurrentMap>,Object>} with 2 differences:
+ *
+ * - No boxing/unboxing from (long,long) -> Object
+ *
- Open hash map with linear probing, no node allocations to store the values
+ *
+ *
+ * @param
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentLongPairObjectHashMap {
+
+ private static final Object EmptyValue = null;
+ private static final Object DeletedValue = new Object();
+
+ private static final int DefaultExpectedItems = 256;
+ private static final int DefaultConcurrencyLevel = 16;
+
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
+ public interface LongLongFunction {
+ R apply(long key1, long key2);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentLongHashMap.
+ */
+ public static class Builder {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongPairObjectHashMap build() {
+ return new ConcurrentLongPairObjectHashMap<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
+ private final Section[] sections;
+
+ @Deprecated
+ public ConcurrentLongPairObjectHashMap() {
+ this(DefaultExpectedItems);
+ }
+
+ @Deprecated
+ public ConcurrentLongPairObjectHashMap(int expectedItems) {
+ this(expectedItems, DefaultConcurrencyLevel);
+ }
+
+ @Deprecated
+ public ConcurrentLongPairObjectHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongPairObjectHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float shrinkFactor) {
+ checkArgument(expectedItems > 0);
+ checkArgument(concurrencyLevel > 0);
+ checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
+
+ int numSections = concurrencyLevel;
+ int perSectionExpectedItems = expectedItems / numSections;
+ int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
+ this.sections = (Section[]) new Section[numSections];
+
+ for (int i = 0; i < numSections; i++) {
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
+ public long size() {
+ long size = 0;
+ for (Section s : sections) {
+ size += s.size;
+ }
+ return size;
+ }
+
+ long getUsedBucketCount() {
+ long usedBucketCount = 0;
+ for (Section s : sections) {
+ usedBucketCount += s.usedBuckets;
+ }
+ return usedBucketCount;
+ }
+
+ public long capacity() {
+ long capacity = 0;
+ for (Section s : sections) {
+ capacity += s.capacity;
+ }
+ return capacity;
+ }
+
+ public boolean isEmpty() {
+ for (Section s : sections) {
+ if (s.size != 0) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public V get(long key1, long key2) {
+ long h = hash(key1, key2);
+ return getSection(h).get(key1, key2, (int) h);
+ }
+
+ public boolean containsKey(long key1, long key2) {
+ return get(key1, key2) != null;
+ }
+
+ public V put(long key1, long key2, V value) {
+ requireNonNull(value);
+ long h = hash(key1, key2);
+ return getSection(h).put(key1, key2, value, (int) h, false, null);
+ }
+
+ public V putIfAbsent(long key1, long key2, V value) {
+ requireNonNull(value);
+ long h = hash(key1, key2);
+ return getSection(h).put(key1, key2, value, (int) h, true, null);
+ }
+
+ public V computeIfAbsent(long key1, long key2, LongLongFunction provider) {
+ requireNonNull(provider);
+ long h = hash(key1, key2);
+ return getSection(h).put(key1, key2, null, (int) h, true, provider);
+ }
+
+ public V remove(long key1, long key2) {
+ long h = hash(key1, key2);
+ return getSection(h).remove(key1, key2, null, (int) h);
+ }
+
+ public boolean remove(long key1, long key2, Object value) {
+ requireNonNull(value);
+ long h = hash(key1, key2);
+ return getSection(h).remove(key1, key2, value, (int) h) != null;
+ }
+
+ private Section getSection(long hash) {
+ // Use 32 msb out of long to get the section
+ final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+ return sections[sectionIdx];
+ }
+
+ public void clear() {
+ for (int i = 0; i < sections.length; i++) {
+ sections[i].clear();
+ }
+ }
+
+ public void forEach(EntryProcessor processor) {
+ for (int i = 0; i < sections.length; i++) {
+ sections[i].forEach(processor);
+ }
+ }
+
+ /**
+ * @return a new list of all keys (makes a copy)
+ */
+ public List> keys() {
+ List> keys = Lists.newArrayListWithExpectedSize((int) size());
+ forEach((key1, key2, value) -> keys.add(Pair.of(key1, key2)));
+ return keys;
+ }
+
+ public List values() {
+ List values = Lists.newArrayListWithExpectedSize((int) size());
+ forEach((key1, key2, value) -> values.add(value));
+ return values;
+ }
+
+ /**
+ * Processor for one key-value entry, where the key is {@code long}.
+ *
+ * @param type of the value.
+ */
+ public interface EntryProcessor {
+ void accept(long key1, long key2, V value);
+ }
+
+ // A section is a portion of the hash map that is covered by a single
+ @SuppressWarnings("serial")
+ private static final class Section extends StampedLock {
+ private volatile long[] keys1;
+ private volatile long[] keys2;
+ private volatile V[] values;
+
+ private volatile int capacity;
+ private final int initCapacity;
+ private static final AtomicIntegerFieldUpdater SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
+
+ private volatile int size;
+ private int usedBuckets;
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
+ this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
+ this.keys1 = new long[this.capacity];
+ this.keys2 = new long[this.capacity];
+ this.values = (V[]) new Object[this.capacity];
+ this.size = 0;
+ this.usedBuckets = 0;
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
+ }
+
+ V get(long key1, long key2, int keyHash) {
+ int bucket = keyHash;
+
+ long stamp = tryOptimisticRead();
+ boolean acquiredLock = false;
+
+ try {
+ while (true) {
+ int capacity = this.capacity;
+ bucket = signSafeMod(bucket, capacity);
+
+ // First try optimistic locking
+ long storedKey1 = keys1[bucket];
+ long storedKey2 = keys2[bucket];
+ V storedValue = values[bucket];
+
+ if (!acquiredLock && validate(stamp)) {
+ // The values we have read are consistent
+ if (storedKey1 == key1 && storedKey2 == key2) {
+ return storedValue != DeletedValue ? storedValue : null;
+ } else if (storedValue == EmptyValue) {
+ // Not found
+ return null;
+ }
+ } else {
+ // Fallback to acquiring read lock
+ if (!acquiredLock) {
+ stamp = readLock();
+ acquiredLock = true;
+ storedKey1 = keys1[bucket];
+ storedKey2 = keys2[bucket];
+ storedValue = values[bucket];
+ }
+
+ if (capacity != this.capacity) {
+ // There has been a rehashing. We need to restart the search
+ bucket = keyHash;
+ continue;
+ }
+
+ if (storedKey1 == key1 && storedKey2 == key2) {
+ return storedValue != DeletedValue ? storedValue : null;
+ } else if (storedValue == EmptyValue) {
+ // Not found
+ return null;
+ }
+ }
+
+ ++bucket;
+ }
+ } finally {
+ if (acquiredLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
+ V put(long key1, long key2, V value, int keyHash, boolean onlyIfAbsent, LongLongFunction valueProvider) {
+ int bucket = keyHash;
+
+ long stamp = writeLock();
+ int capacity = this.capacity;
+
+ // Remember where we find the first available spot
+ int firstDeletedKey = -1;
+
+ try {
+ while (true) {
+ bucket = signSafeMod(bucket, capacity);
+
+ long storedKey1 = keys1[bucket];
+ long storedKey2 = keys2[bucket];
+ V storedValue = values[bucket];
+
+ if (storedKey1 == key1 && storedKey2 == key2) {
+ if (storedValue == EmptyValue) {
+ values[bucket] = value != null ? value : valueProvider.apply(key1, key2);
+ SIZE_UPDATER.incrementAndGet(this);
+ ++usedBuckets;
+ return valueProvider != null ? values[bucket] : null;
+ } else if (storedValue == DeletedValue) {
+ values[bucket] = value != null ? value : valueProvider.apply(key1, key2);
+ SIZE_UPDATER.incrementAndGet(this);
+ return valueProvider != null ? values[bucket] : null;
+ } else if (!onlyIfAbsent) {
+ // Over written an old value for same key
+ values[bucket] = value;
+ return storedValue;
+ } else {
+ return storedValue;
+ }
+ } else if (storedValue == EmptyValue) {
+ // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+ // key, we should write at that position
+ if (firstDeletedKey != -1) {
+ bucket = firstDeletedKey;
+ } else {
+ ++usedBuckets;
+ }
+
+ keys1[bucket] = key1;
+ keys2[bucket] = key2;
+ values[bucket] = value != null ? value : valueProvider.apply(key1, key2);
+ SIZE_UPDATER.incrementAndGet(this);
+ return valueProvider != null ? values[bucket] : null;
+ } else if (storedValue == DeletedValue) {
+ // The bucket contained a different deleted key
+ if (firstDeletedKey == -1) {
+ firstDeletedKey = bucket;
+ }
+ }
+
+ ++bucket;
+ }
+ } finally {
+ if (usedBuckets > resizeThresholdUp) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+ rehash(newCapacity);
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
+ }
+ }
+
+ private V remove(long key1, long key2, Object value, int keyHash) {
+ int bucket = keyHash;
+ long stamp = writeLock();
+
+ try {
+ while (true) {
+ int capacity = this.capacity;
+ bucket = signSafeMod(bucket, capacity);
+
+ long storedKey1 = keys1[bucket];
+ long storedKey2 = keys2[bucket];
+ V storedValue = values[bucket];
+ if (storedKey1 == key1 && storedKey2 == key2) {
+ if (value == null || value.equals(storedValue)) {
+ if (storedValue == EmptyValue || storedValue == DeletedValue) {
+ return null;
+ }
+
+ SIZE_UPDATER.decrementAndGet(this);
+ V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
+ if (nextValueInArray == EmptyValue) {
+ values[bucket] = (V) EmptyValue;
+ --usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedValue` state,
+ // so that we can reduce unnecessary expansions
+ int lastBucket = signSafeMod(bucket - 1, capacity);
+ while (values[lastBucket] == DeletedValue) {
+ values[lastBucket] = (V) EmptyValue;
+ --usedBuckets;
+
+ lastBucket = signSafeMod(lastBucket - 1, capacity);
+ }
+ } else {
+ values[bucket] = (V) DeletedValue;
+ }
+
+ return storedValue;
+ } else {
+ return null;
+ }
+ } else if (storedValue == EmptyValue) {
+ // Key wasn't found
+ return null;
+ }
+
+ ++bucket;
+ }
+
+ } finally {
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
+ }
+ }
+
+ void clear() {
+ long stamp = writeLock();
+
+ try {
+ Arrays.fill(keys1, 0);
+ Arrays.fill(keys2, 0);
+ Arrays.fill(values, EmptyValue);
+ this.size = 0;
+ this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ public void forEach(EntryProcessor processor) {
+ long stamp = tryOptimisticRead();
+
+ // We need to make sure that we read these 3 variables in a consistent way
+ int capacity = this.capacity;
+ long[] keys1 = this.keys1;
+ long[] keys2 = this.keys2;
+ V[] values = this.values;
+
+ // Validate no rehashing
+ if (!validate(stamp)) {
+ // Fallback to read lock
+ stamp = readLock();
+
+ capacity = this.capacity;
+ keys1 = this.keys1;
+ keys2 = this.keys2;
+ values = this.values;
+ unlockRead(stamp);
+ }
+
+ // Go through all the buckets for this section. We try to renew the stamp only after a validation
+ // error, otherwise we keep going with the same.
+ for (int bucket = 0; bucket < capacity; bucket++) {
+ if (stamp == 0) {
+ stamp = tryOptimisticRead();
+ }
+
+ long storedKey1 = keys1[bucket];
+ long storedKey2 = keys2[bucket];
+ V storedValue = values[bucket];
+
+ if (!validate(stamp)) {
+ // Fallback to acquiring read lock
+ stamp = readLock();
+
+ try {
+ storedKey1 = keys1[bucket];
+ storedKey2 = keys2[bucket];
+ storedValue = values[bucket];
+ } finally {
+ unlockRead(stamp);
+ }
+
+ stamp = 0;
+ }
+
+ if (storedValue != DeletedValue && storedValue != EmptyValue) {
+ processor.accept(storedKey1, storedKey2, storedValue);
+ }
+ }
+ }
+
+ private void rehash(int newCapacity) {
+ // Expand the hashmap
+ long[] newKeys1 = new long[newCapacity];
+ long[] newKeys2 = new long[newCapacity];
+ V[] newValues = (V[]) new Object[newCapacity];
+
+ // Re-hash table
+ for (int i = 0; i < keys1.length; i++) {
+ long storedKey1 = keys1[i];
+ long storedKey2 = keys2[i];
+ V storedValue = values[i];
+ if (storedValue != EmptyValue && storedValue != DeletedValue) {
+ insertKeyValueNoLock(newKeys1, newKeys2, newValues, storedKey1, storedKey2, storedValue);
+ }
+ }
+
+ keys1 = newKeys1;
+ keys2 = newKeys2;
+ values = newValues;
+ capacity = newCapacity;
+ usedBuckets = size;
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+ }
+
+ private static void insertKeyValueNoLock(long[] keys1, long[] keys2, V[] values, long key1, long key2,
+ V value) {
+ int bucket = (int) hash(key1, key2);
+
+ while (true) {
+ bucket = signSafeMod(bucket, keys1.length);
+
+ V storedValue = values[bucket];
+
+ if (storedValue == EmptyValue) {
+ // The bucket is empty, so we can use it
+ keys1[bucket] = key1;
+ keys2[bucket] = key2;
+ values[bucket] = value;
+ return;
+ }
+
+ ++bucket;
+ }
+ }
+ }
+
+ private static final long HashMixer = 0xc6a4a7935bd1e995L;
+ private static final int R = 47;
+
+ static final long hash(long key1, long key2) {
+ long hash = key1 * HashMixer;
+ hash ^= hash >>> R;
+ hash *= HashMixer;
+ hash += 31 + (key2 * HashMixer);
+ hash ^= hash >>> R;
+ hash *= HashMixer;
+ return hash;
+ }
+
+
+ static final int signSafeMod(long n, int max) {
+ return (int) n & (max - 1);
+ }
+
+ private static int alignToPowerOfTwo(int n) {
+ return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+ }
+}
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index e42ace8acf226..9fc578024da6a 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -297,6 +297,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
|managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2|
|managedLedgerCacheSizeMB| Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory ||
|managedLedgerCacheCopyEntries| Whether we should make a copy of the entry payloads when inserting in cache| false|
+|managedLedgerCacheManagerImplementationClass| The class name for the implementation of ManagedLedger cache manager component. Options are: org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl, org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl| org.apache.bookkeeper.mledger.impl.cache.SharedEntryCacheManagerImpl|
|managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered |0.9|
|managedLedgerCacheEvictionFrequency| Configure the cache eviction frequency for the managed ledger cache (evictions/sec) | 100.0 |
|managedLedgerCacheEvictionTimeThresholdMillis| All entries that have stayed in cache for more than the configured time, will be evicted | 1000 |