diff --git a/conf/broker.conf b/conf/broker.conf index 7f07ae5444993..41809f8711685 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1014,8 +1014,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval in milliseconds for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1572,4 +1572,7 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 # If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in # multiple entries. -persistentUnackedRangesWithMultipleEntriesEnabled=false \ No newline at end of file +persistentUnackedRangesWithMultipleEntriesEnabled=false + +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=0 \ No newline at end of file diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index fb4456f0b1dc6..1ed98cd4e02c8 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -772,8 +772,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval in milliseconds for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1137,6 +1137,9 @@ brokerServicePurgeInactiveFrequencyInSeconds=60 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=0 + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 25fcb377e3e11..78314be45c390 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig { private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors(); /** - * Frequency of cache eviction triggering. Default is 100 times per second. + * Interval of cache eviction triggering. Default is 10 ms times. */ - private double cacheEvictionFrequency = 100; + private long cacheEvictionIntervalMs = 10; /** * All entries that have stayed in cache for more than the configured time, will be evicted. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 8e3271a03934e..629e96ba3e3ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -34,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -80,6 +80,7 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.Runnables; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -94,8 +95,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final ManagedLedgerFactoryConfig config; @Getter protected final OrderedScheduler scheduledExecutor; - - private final ExecutorService cacheEvictionExecutor; + private final ScheduledExecutorService cacheEvictionExecutor; @Getter protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -184,7 +184,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, .name("bookkeeper-ml-scheduler") .build(); cacheEvictionExecutor = Executors - .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); this.metadataServiceAvailable = true; this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; @@ -203,8 +203,9 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS .toNanos(config.getCacheEvictionTimeThresholdMillis()); - - cacheEvictionExecutor.execute(this::cacheEvictionTask); + long evictionTaskInterval = config.getCacheEvictionIntervalMs(); + cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction), + evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS); closed = false; metadataStore.registerSessionListener(this::handleMetadataStoreNotification); @@ -258,24 +259,6 @@ private synchronized void refreshStats() { lastStatTimestamp = now; } - private void cacheEvictionTask() { - double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001); - long waitTimeMillis = (long) (1000 / evictionFrequency); - - while (!closed) { - try { - doCacheEviction(); - - Thread.sleep(waitTimeMillis); - } catch (InterruptedException e) { - // Factory is shutting down - return; - } catch (Throwable t) { - log.warn("Exception while performing cache eviction: {}", t.getMessage(), t); - } - } - } - private synchronized void doCacheEviction() { long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 55f58ecd11c08..a1dcb4ea02ffe 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -260,7 +260,7 @@ public void verifyHitsMisses() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(7 * 10); config.setCacheEvictionWatermark(0.8); - config.setCacheEvictionFrequency(1); + config.setCacheEvictionIntervalMs(1000); @Cleanup("shutdown") ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config); @@ -329,7 +329,7 @@ public void verifyHitsMisses() throws Exception { public void verifyTimeBasedEviction() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(1000); - config.setCacheEvictionFrequency(100); + config.setCacheEvictionIntervalMs(10); config.setCacheEvictionTimeThresholdMillis(100); @Cleanup("shutdown") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index c60644301fa95..dff1176c86dd6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2668,7 +2668,7 @@ public void testGetNextValidPosition() throws Exception { @Test public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception { ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); - conf.setCacheEvictionFrequency(0.1); + conf.setCacheEvictionIntervalMs(10000); @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2dfa122d83dcf..178e047a32d38 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -105,6 +105,10 @@ public class ServiceConfiguration implements PulsarConfiguration { @Category private static final String CATEGORY_PLUGIN = "Broker Plugin"; + private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001; + private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0; + private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L; + /***** --- pulsar configuration. --- ****/ @FieldContext( category = CATEGORY_SERVER, @@ -1807,8 +1811,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private double managedLedgerCacheEvictionWatermark = 0.9; @FieldContext(category = CATEGORY_STORAGE_ML, - doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s") - private double managedLedgerCacheEvictionFrequency = 100.0; + doc = "Configure the cache eviction frequency for the managed ledger cache.") + @Deprecated + private double managedLedgerCacheEvictionFrequency = 0; + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms") + private long managedLedgerCacheEvictionIntervalMs = 10; + @FieldContext(category = CATEGORY_STORAGE_ML, dynamic = true, doc = "All entries that have stayed in cache for more than the configured time, will be evicted") @@ -3018,4 +3028,12 @@ public int getMetadataStoreOperationTimeoutSeconds() { public int getMetadataStoreCacheExpirySeconds() { return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds; } + + public long getManagedLedgerCacheEvictionIntervalMs() { + return managedLedgerCacheEvictionFrequency > 0 + ? (long) (1000 / Math.max( + Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY), + MIN_ML_CACHE_EVICTION_FREQUENCY)) + : Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index bb7cb6ffd8d7e..234e11bee643e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -59,7 +59,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads()); - managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency()); + managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs()); managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 9b154690d7e0d..95e343acfefcb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index da9ece0b4ecf0..13f11c0261296 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -46,7 +46,7 @@ public Object[][] topicDomainProvider() { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); }