diff --git a/conf/broker.conf b/conf/broker.conf index 2cde20736786b5..0072806d6296ea 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -990,8 +990,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval in milliseconds for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1499,4 +1499,7 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 # If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in # multiple entries. -persistentUnackedRangesWithMultipleEntriesEnabled=false \ No newline at end of file +persistentUnackedRangesWithMultipleEntriesEnabled=false + +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=0 \ No newline at end of file diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index fb4456f0b1dc6a..1ed98cd4e02c88 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -772,8 +772,8 @@ managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when eviction is triggered managedLedgerCacheEvictionWatermark=0.9 -# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) -managedLedgerCacheEvictionFrequency=100.0 +# Configure the cache eviction interval in milliseconds for the managed ledger cache +managedLedgerCacheEvictionIntervalMs=10 # All entries that have stayed in cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000 @@ -1137,6 +1137,9 @@ brokerServicePurgeInactiveFrequencyInSeconds=60 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 +# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead +managedLedgerCacheEvictionFrequency=0 + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 25fcb377e3e11c..4c7c699d49241a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -44,9 +44,16 @@ public class ManagedLedgerFactoryConfig { /** * Frequency of cache eviction triggering. Default is 100 times per second. + * @Deprecated Use {@link #cacheEvictionIntervalMs} instead. */ + @Deprecated private double cacheEvictionFrequency = 100; + /** + * Interval of cache eviction triggering. Default is 10 ms times. + */ + private long cacheEvictionIntervalMs = 10; + /** * All entries that have stayed in cache for more than the configured time, will be evicted. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 1a12f9da496067..d6734a720886fa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -34,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -94,8 +94,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final ManagedLedgerFactoryConfig config; @Getter protected final OrderedScheduler scheduledExecutor; - - private final ExecutorService cacheEvictionExecutor; + private final ScheduledExecutorService cacheEvictionExecutor; @Getter protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -184,7 +183,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, .name("bookkeeper-ml-scheduler") .build(); cacheEvictionExecutor = Executors - .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); this.metadataServiceAvailable = true; this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; @@ -203,8 +202,14 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS .toNanos(config.getCacheEvictionTimeThresholdMillis()); - - cacheEvictionExecutor.execute(this::cacheEvictionTask); + long evictionTaskInterval = config.getCacheEvictionIntervalMs(); + cacheEvictionExecutor.scheduleWithFixedDelay(() -> { + try { + doCacheEviction(); + } catch (Throwable t) { + log.warn("Exception while performing cache eviction: {}", t.getMessage(), t); + } + }, evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS); closed = false; metadataStore.registerSessionListener(this::handleMetadataStoreNotification); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 55f58ecd11c081..a1dcb4ea02ffe3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -260,7 +260,7 @@ public void verifyHitsMisses() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(7 * 10); config.setCacheEvictionWatermark(0.8); - config.setCacheEvictionFrequency(1); + config.setCacheEvictionIntervalMs(1000); @Cleanup("shutdown") ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config); @@ -329,7 +329,7 @@ public void verifyHitsMisses() throws Exception { public void verifyTimeBasedEviction() throws Exception { ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); config.setMaxCacheSize(1000); - config.setCacheEvictionFrequency(100); + config.setCacheEvictionIntervalMs(10); config.setCacheEvictionTimeThresholdMillis(100); @Cleanup("shutdown") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 04f75dd7232f69..022fa5ed784913 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2604,7 +2604,7 @@ public void testGetNextValidPosition() throws Exception { @Test public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception { ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); - conf.setCacheEvictionFrequency(0.1); + conf.setCacheEvictionIntervalMs(10000); @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b6a8ae02cfa4a8..5d7c1190dc40f9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1751,8 +1751,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private double managedLedgerCacheEvictionWatermark = 0.9; @FieldContext(category = CATEGORY_STORAGE_ML, - doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s") - private double managedLedgerCacheEvictionFrequency = 100.0; + doc = "Configure the cache eviction frequency for the managed ledger cache.") + @Deprecated + private double managedLedgerCacheEvictionFrequency = 0; + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms") + private long managedLedgerCacheEvictionIntervalMs = 10; + @FieldContext(category = CATEGORY_STORAGE_ML, dynamic = true, doc = "All entries that have stayed in cache for more than the configured time, will be evicted") @@ -2839,4 +2845,10 @@ public int getMetadataStoreOperationTimeoutSeconds() { public int getMetadataStoreCacheExpirySeconds() { return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds; } + + public long getManagedLedgerCacheEvictionIntervalMs() { + return managedLedgerCacheEvictionFrequency > 0 + ? (long) (1000 / Math.max(Math.min(managedLedgerCacheEvictionFrequency, 1000.0), 0.001)) + : Math.min(1000000, managedLedgerCacheEvictionIntervalMs); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index f06679ece78931..947c9342dd99a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -59,7 +59,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L); managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark()); managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads()); - managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency()); + managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs()); managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 9b154690d7e0dd..95e343acfefcb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index da9ece0b4ecf04..13f11c02612961 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -46,7 +46,7 @@ public Object[][] topicDomainProvider() { @BeforeClass @Override protected void setup() throws Exception { - conf.setManagedLedgerCacheEvictionFrequency(0.1); + conf.setManagedLedgerCacheEvictionIntervalMs(10000); super.internalSetup(); super.producerBaseSetup(); }