Skip to content

Commit

Permalink
[improve][broker] refactor ManagedLedger cacheEvictionTask implement (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang authored Jul 29, 2022
1 parent 57b008a commit 3619edc
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 39 deletions.
9 changes: 6 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1572,4 +1572,7 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
# multiple entries.
persistentUnackedRangesWithMultipleEntriesEnabled=false
persistentUnackedRangesWithMultipleEntriesEnabled=false

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0
7 changes: 5 additions & 2 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1137,6 +1137,9 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0

### --- Transaction config variables --- ###

# Enable transaction coordinator in broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig {
private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();

/**
* Frequency of cache eviction triggering. Default is 100 times per second.
* Interval of cache eviction triggering. Default is 10 ms times.
*/
private double cacheEvictionFrequency = 100;
private long cacheEvictionIntervalMs = 10;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand All @@ -94,8 +95,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;

private final ExecutorService cacheEvictionExecutor;
private final ScheduledExecutorService cacheEvictionExecutor;

@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;
Expand Down Expand Up @@ -184,7 +184,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
.name("bookkeeper-ml-scheduler")
.build();
cacheEvictionExecutor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
this.metadataServiceAvailable = true;
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
Expand All @@ -203,8 +203,9 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
.toNanos(config.getCacheEvictionTimeThresholdMillis());


cacheEvictionExecutor.execute(this::cacheEvictionTask);
long evictionTaskInterval = config.getCacheEvictionIntervalMs();
cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction),
evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS);
closed = false;

metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
Expand Down Expand Up @@ -258,24 +259,6 @@ private synchronized void refreshStats() {
lastStatTimestamp = now;
}

private void cacheEvictionTask() {
double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
long waitTimeMillis = (long) (1000 / evictionFrequency);

while (!closed) {
try {
doCacheEviction();

Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
// Factory is shutting down
return;
} catch (Throwable t) {
log.warn("Exception while performing cache eviction: {}", t.getMessage(), t);
}
}
}

private synchronized void doCacheEviction() {
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void verifyHitsMisses() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(7 * 10);
config.setCacheEvictionWatermark(0.8);
config.setCacheEvictionFrequency(1);
config.setCacheEvictionIntervalMs(1000);

@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
Expand Down Expand Up @@ -329,7 +329,7 @@ public void verifyHitsMisses() throws Exception {
public void verifyTimeBasedEviction() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(1000);
config.setCacheEvictionFrequency(100);
config.setCacheEvictionIntervalMs(10);
config.setCacheEvictionTimeThresholdMillis(100);

@Cleanup("shutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2668,7 +2668,7 @@ public void testGetNextValidPosition() throws Exception {
@Test
public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
conf.setCacheEvictionFrequency(0.1);
conf.setCacheEvictionIntervalMs(10000);

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_PLUGIN = "Broker Plugin";

private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001;
private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0;
private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L;

/***** --- pulsar configuration. --- ****/
@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1807,8 +1811,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
doc = "Configure the cache eviction frequency for the managed ledger cache.")
@Deprecated
private double managedLedgerCacheEvictionFrequency = 0;

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms")
private long managedLedgerCacheEvictionIntervalMs = 10;

@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
Expand Down Expand Up @@ -3018,4 +3028,12 @@ public int getMetadataStoreOperationTimeoutSeconds() {
public int getMetadataStoreCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds;
}

public long getManagedLedgerCacheEvictionIntervalMs() {
return managedLedgerCacheEvictionFrequency > 0
? (long) (1000 / Math.max(
Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY),
MIN_ML_CACHE_EVICTION_FREQUENCY))
: Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setManagedLedgerCacheEvictionFrequency(0.1);
conf.setManagedLedgerCacheEvictionIntervalMs(10000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Object[][] topicDomainProvider() {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setManagedLedgerCacheEvictionFrequency(0.1);
conf.setManagedLedgerCacheEvictionIntervalMs(10000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down

0 comments on commit 3619edc

Please sign in to comment.