Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] refactor ManagedLedger cacheEvictionTask implement #14488

Merged
merged 4 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1572,4 +1572,7 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
# multiple entries.
persistentUnackedRangesWithMultipleEntriesEnabled=false
persistentUnackedRangesWithMultipleEntriesEnabled=false

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0
7 changes: 5 additions & 2 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9

# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
Expand Down Expand Up @@ -1137,6 +1137,9 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0

### --- Transaction config variables --- ###

# Enable transaction coordinator in broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig {
private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();

/**
* Frequency of cache eviction triggering. Default is 100 times per second.
* Interval of cache eviction triggering. Default is 10 ms times.
*/
private double cacheEvictionFrequency = 100;
private long cacheEvictionIntervalMs = 10;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand All @@ -94,8 +95,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;

private final ExecutorService cacheEvictionExecutor;
private final ScheduledExecutorService cacheEvictionExecutor;

@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;
Expand Down Expand Up @@ -184,7 +184,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
.name("bookkeeper-ml-scheduler")
.build();
cacheEvictionExecutor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
this.metadataServiceAvailable = true;
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
Expand All @@ -203,8 +203,9 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
.toNanos(config.getCacheEvictionTimeThresholdMillis());


cacheEvictionExecutor.execute(this::cacheEvictionTask);
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
long evictionTaskInterval = config.getCacheEvictionIntervalMs();
cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction),
evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS);
closed = false;

metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
Expand Down Expand Up @@ -258,24 +259,6 @@ private synchronized void refreshStats() {
lastStatTimestamp = now;
}

private void cacheEvictionTask() {
double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
long waitTimeMillis = (long) (1000 / evictionFrequency);

while (!closed) {
try {
doCacheEviction();

Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
// Factory is shutting down
return;
} catch (Throwable t) {
log.warn("Exception while performing cache eviction: {}", t.getMessage(), t);
}
}
}

private synchronized void doCacheEviction() {
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void verifyHitsMisses() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(7 * 10);
config.setCacheEvictionWatermark(0.8);
config.setCacheEvictionFrequency(1);
config.setCacheEvictionIntervalMs(1000);

@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
Expand Down Expand Up @@ -329,7 +329,7 @@ public void verifyHitsMisses() throws Exception {
public void verifyTimeBasedEviction() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(1000);
config.setCacheEvictionFrequency(100);
config.setCacheEvictionIntervalMs(10);
config.setCacheEvictionTimeThresholdMillis(100);

@Cleanup("shutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2668,7 +2668,7 @@ public void testGetNextValidPosition() throws Exception {
@Test
public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
conf.setCacheEvictionFrequency(0.1);
conf.setCacheEvictionIntervalMs(10000);

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_PLUGIN = "Broker Plugin";

private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001;
private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0;
private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L;

/***** --- pulsar configuration. --- ****/
@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1807,8 +1811,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
doc = "Configure the cache eviction frequency for the managed ledger cache.")
@Deprecated
private double managedLedgerCacheEvictionFrequency = 0;
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms")
private long managedLedgerCacheEvictionIntervalMs = 10;

@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
Expand Down Expand Up @@ -3018,4 +3028,12 @@ public int getMetadataStoreOperationTimeoutSeconds() {
public int getMetadataStoreCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds;
}

public long getManagedLedgerCacheEvictionIntervalMs() {
return managedLedgerCacheEvictionFrequency > 0
? (long) (1000 / Math.max(
Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY),
MIN_ML_CACHE_EVICTION_FREQUENCY))
: Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setManagedLedgerCacheEvictionFrequency(0.1);
conf.setManagedLedgerCacheEvictionIntervalMs(10000);
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
super.internalSetup();
super.producerBaseSetup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Object[][] topicDomainProvider() {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setManagedLedgerCacheEvictionFrequency(0.1);
conf.setManagedLedgerCacheEvictionIntervalMs(10000);
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
super.internalSetup();
super.producerBaseSetup();
}
Expand Down