diff --git a/conf/broker.conf b/conf/broker.conf index bbfd03f95ca833..fe41c8bdacc530 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -960,8 +960,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1429,4 +1429,6 @@ tlsEnabled=false # Enable Key_Shared subscription (default is enabled) # @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable. -subscriptionKeySharedEnable=true \ No newline at end of file +subscriptionKeySharedEnable=true +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=100.0 \ No newline at end of file diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index e0957fd66d2f2e..0794c9447c9f08 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -775,8 +775,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1134,6 +1134,9 @@ replicationTlsEnabled=false # Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds brokerServicePurgeInactiveFrequencyInSeconds=60 +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=100.0 + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index a00c1616410830..f1cfc3576b002e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -44,9 +44,16 @@ public class ManagedLedgerFactoryConfig { /** * Frequency of cache eviction triggering. Default is 100 times per second. + * @Deprecated Use {@link #cacheEvictionIntervalMs} instead. */ + @Deprecated private double cacheEvictionFrequency = 100; + /** + * Interval of cache eviction triggering. Default is 10 ms times. + */ + private long cacheEvictionIntervalMs = 10; + /** * All entries that have stayed in cache for more than the configured time, will be evicted. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f183d1ce8414a2..3b3c3e16c4670a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -33,8 +33,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -90,8 +90,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final boolean isBookkeeperManaged; private final ManagedLedgerFactoryConfig config; protected final OrderedScheduler scheduledExecutor; - - private final ExecutorService cacheEvictionExecutor; + private final ScheduledExecutorService cacheEvictionExecutor; protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -179,7 +178,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, .name("bookkeeper-ml-scheduler") .build(); cacheEvictionExecutor = Executors - .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); this.metadataServiceAvailable = true; this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; @@ -197,8 +196,9 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS .toNanos(config.getCacheEvictionTimeThresholdMillis()); - - cacheEvictionExecutor.execute(this::cacheEvictionTask); + long evictionTaskInterval = Math.min(config.getCacheEvictionIntervalMs(), 10); + cacheEvictionExecutor.scheduleWithFixedDelay(cacheEvictionTask, + evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS); closed = false; metadataStore.registerSessionListener(this::handleMetadataStoreNotification); @@ -252,23 +252,13 @@ private synchronized void refreshStats() { lastStatTimestamp = now; } - private void cacheEvictionTask() { - double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001); - long waitTimeMillis = (long) (1000 / evictionFrequency); - - while (true) { - try { - doCacheEviction(); - - Thread.sleep(waitTimeMillis); - } catch (InterruptedException e) { - // Factory is shutting down - return; - } catch (Throwable t) { - log.warn("Exception while performing cache eviction: {}", t.getMessage(), t); - } + Runnable cacheEvictionTask = () -> { + try { + doCacheEviction(); + } catch (Throwable t) { + log.warn("Exception while performing cache eviction: {}", t.getMessage(), t); } - } + }; private synchronized void doCacheEviction() { long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 364b2fb4491f8b..c6c5522802b3b9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -256,7 +256,7 @@ public void verifyHitsMisses() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(7 * 10); config.setCacheEvictionWatermark(0.8); - config.setCacheEvictionFrequency(1); + config.setCacheEvictionIntervalMs(1000); @Cleanup("shutdown") ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config); @@ -325,7 +325,7 @@ public void verifyHitsMisses() throws Exception { public void verifyTimeBasedEviction() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(1000); - config.setCacheEvictionFrequency(100); + config.setCacheEvictionIntervalMs(10); config.setCacheEvictionTimeThresholdMillis(100); @Cleanup("shutdown") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 982b91486a0ef6..e88e2c63750b1d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2327,7 +2327,7 @@ public void testGetNextValidPosition() throws Exception { @Test public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception { ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); - conf.setCacheEvictionFrequency(0.1); + conf.setCacheEvictionIntervalMs(10000); @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 87bdb4d4e317a7..e8d65970310cd3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1685,7 +1685,14 @@ public class ServiceConfiguration implements PulsarConfiguration { private double managedLedgerCacheEvictionWatermark = 0.9; @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s") + @Deprecated private double managedLedgerCacheEvictionFrequency = 100.0; + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Configure the cache eviction interval for the managed ledger cache. Must >= 1 ms, " + + " and default is 10 ms") + private long managedLedgerCacheEvictionIntervalMs = 10; + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "All entries that have stayed in cache for more than the configured time, will be evicted") private long managedLedgerCacheEvictionTimeThresholdMillis = 1000; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index da45f32863a1a3..6d3fd239a60d32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -59,7 +59,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads()); - managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency()); + managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs()); managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 9b154690d7e0dd..95e343acfefcb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index da9ece0b4ecf04..13f11c02612961 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -46,7 +46,7 @@ public Object[][] topicDomainProvider() { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); }