Skip to content

Commit

Permalink
[enh][broker] Add metrics for entry cache insertion, eviction (#17248)
Browse files Browse the repository at this point in the history
Fixes #16584

### Motivation

With the `RangeCache`, it is hard to reason about its behavior other than cache hits/misses or the cache's size hitting the limit and triggering a size based eviction. This PR adds 3 new metrics to help provide additional insight into the cache's behavior. It adds `pulsar_ml_cache_inserted_entries_total`, `pulsar_ml_cache_evicted_entries_total`, and `pulsar_ml_cache_entries`.

### Modifications

* Add new metrics for cache insertion, eviction, and current number of entries.
* Add new methods to the `ManagedLedgerFactoryMXBean` interface.
* Update several method return values in the `RangeCache`.
* Update tests.

### Verifying this change

This change is covered by modified tests that already existed.

### Does this pull request potentially affect one of the following parts:

There is a breaking change to the `RangeCache` class for the `clear` and the `evictLEntriesBeforeTimestamp` methods. The previous result was a `long`, and now it is a `Pair<Integer, Long>`. The new result matches the same style as `evictLeastAccessedEntries`. Given that this class is only meant for use within the broker, I think it is reasonable to break these methods. I will send a note to the mailing list.

### Documentation
  
- [x] `doc`
  • Loading branch information
michaeljmarshall authored and Technoboy- committed Oct 13, 2022
1 parent 44d45af commit df33f59
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,19 @@ public interface ManagedLedgerFactoryMXBean {
* Get the number of cache evictions during the last minute.
*/
long getNumberOfCacheEvictions();

/**
* Cumulative number of entries inserted into the cache.
*/
long getCacheInsertedEntriesCount();

/**
* Cumulative number of entries evicted from the cache.
*/
long getCacheEvictedEntriesCount();

/**
* Current number of entries in the cache.
*/
long getCacheEntriesCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
import org.apache.pulsar.common.stats.Rate;

Expand All @@ -31,6 +32,10 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
final Rate cacheMisses = new Rate();
final Rate cacheEvictions = new Rate();

private final LongAdder insertedEntryCount = new LongAdder();
private final LongAdder evictedEntryCount = new LongAdder();
private final LongAdder cacheEntryCount = new LongAdder();

public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Exception {
this.factory = factory;
}
Expand Down Expand Up @@ -64,6 +69,16 @@ public void recordCacheEviction() {
cacheEvictions.recordEvent();
}

public void recordCacheInsertion() {
insertedEntryCount.increment();
cacheEntryCount.increment();
}

public void recordNumberOfCacheEntriesEvicted(int count) {
evictedEntryCount.add(count);
cacheEntryCount.add(-count);
}

@Override
public int getNumberOfManagedLedgers() {
return factory.ledgers.size();
Expand Down Expand Up @@ -104,4 +119,16 @@ public long getNumberOfCacheEvictions() {
return cacheEvictions.getCount();
}

public long getCacheInsertedEntriesCount() {
return insertedEntryCount.sum();
}

public long getCacheEvictedEntriesCount() {
return evictedEntryCount.sum();
}

public long getCacheEntriesCount() {
return cacheEntryCount.sum();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void invalidateEntries(final PositionImpl lastPosition) {
lastPosition, entriesRemoved, sizeRemoved);
}

manager.entriesRemoved(sizeRemoved);
manager.entriesRemoved(sizeRemoved, entriesRemoved);
}

@Override
Expand All @@ -184,7 +184,7 @@ public void invalidateAllEntries(long ledgerId) {
ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
}

manager.entriesRemoved(sizeRemoved);
manager.entriesRemoved(sizeRemoved, entriesRemoved);
}

@Override
Expand Down Expand Up @@ -338,8 +338,8 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo

@Override
public void clear() {
long removedSize = entries.clear();
manager.entriesRemoved(removedSize);
Pair<Integer, Long> removedPair = entries.clear();
manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
}

@Override
Expand All @@ -364,14 +364,14 @@ public Pair<Integer, Long> evictEntries(long sizeToFree) {
+ " -- Current Size: {} Mb",
ml.getName(), sizeToFree / MB, evictedEntries, evictedSize / MB, entries.getSize() / MB);
}
manager.entriesRemoved(evictedSize);
manager.entriesRemoved(evictedSize, evictedEntries);
return evicted;
}

@Override
public void invalidateEntriesBeforeTimestamp(long timestamp) {
long evictedSize = entries.evictLEntriesBeforeTimestamp(timestamp);
manager.entriesRemoved(evictedSize);
Pair<Integer, Long> evictedPair = entries.evictLEntriesBeforeTimestamp(timestamp);
manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
}

private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ boolean hasSpaceInCache() {
}

void entryAdded(long size) {
mlFactoryMBean.recordCacheInsertion();
currentSize.addAndGet(size);
}

void entriesRemoved(long size) {
void entriesRemoved(long size, int count) {
mlFactoryMBean.recordNumberOfCacheEntriesEvicted(count);
currentSize.addAndGet(-size);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
* @param maxTimestamp the max timestamp of the entries to be evicted
* @return the tota
*/
public long evictLEntriesBeforeTimestamp(long maxTimestamp) {
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
long removedSize = 0;
int removedCount = 0;

while (true) {
Map.Entry<Key, Value> entry = entries.firstEntry();
Expand All @@ -203,11 +204,12 @@ public long evictLEntriesBeforeTimestamp(long maxTimestamp) {

Value value = entry.getValue();
removedSize += weighter.getSize(value);
removedCount++;
value.release();
}

size.addAndGet(-removedSize);
return removedSize;
return Pair.of(removedCount, removedSize);
}

/**
Expand All @@ -226,8 +228,9 @@ public long getSize() {
*
* @return size of removed entries
*/
public synchronized long clear() {
public synchronized Pair<Integer, Long> clear() {
long removedSize = 0;
int removedCount = 0;

while (true) {
Map.Entry<Key, Value> entry = entries.pollFirstEntry();
Expand All @@ -236,12 +239,13 @@ public synchronized long clear() {
}
Value value = entry.getValue();
removedSize += weighter.getSize(value);
removedCount++;
value.release();
}

entries.clear();
size.getAndAdd(-removedSize);
return removedSize;
return Pair.of(removedCount, removedSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public void simple() throws Exception {
assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);

cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
Expand Down Expand Up @@ -129,6 +132,9 @@ public void simple() throws Exception {
assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 5);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 3);
}

@Test
Expand All @@ -153,6 +159,9 @@ public void doubleInsert() throws Exception {

assertEquals(cache1.getSize(), 7);
assertEquals(cacheManager.getSize(), 7);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
}

@Test
Expand Down Expand Up @@ -185,6 +194,9 @@ public void cacheSizeUpdate() throws Exception {

cacheManager.removeEntryCache(ml1.getName());
assertTrue(cacheManager.getSize() > 0);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 20);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 20);
}


Expand Down Expand Up @@ -217,6 +229,9 @@ public void cacheDisabled() throws Exception {
assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);

cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
Expand Down Expand Up @@ -253,6 +268,9 @@ public void verifyNoCacheIfNoConsumer() throws Exception {
assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ public void customTimeExtraction() {
assertEquals(cache.getSize(), 10);
assertEquals(cache.getNumberOfEntries(), 4);

long evictedSize = cache.evictLEntriesBeforeTimestamp(3);
assertEquals(evictedSize, 6);
Pair<Integer, Long> evictedSize = cache.evictLEntriesBeforeTimestamp(3);
assertEquals(evictedSize.getRight().longValue(), 6);
assertEquals(evictedSize.getLeft().longValue(), 3);

assertEquals(cache.getSize(), 4);
assertEquals(cache.getNumberOfEntries(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public synchronized List<Metrics> generate() {

m.put("brk_ml_count", mlCacheStats.getNumberOfManagedLedgers());
m.put("brk_ml_cache_used_size", mlCacheStats.getCacheUsedSize());
m.put("brk_ml_cache_inserted_entries_total", mlCacheStats.getCacheInsertedEntriesCount());
m.put("brk_ml_cache_evicted_entries_total", mlCacheStats.getCacheEvictedEntriesCount());
m.put("brk_ml_cache_entries", mlCacheStats.getCacheEntriesCount());
m.put("brk_ml_cache_evictions", mlCacheStats.getNumberOfCacheEvictions());
m.put("brk_ml_cache_hits_rate", mlCacheStats.getCacheHitsRate());
m.put("brk_ml_cache_misses_rate", mlCacheStats.getCacheMissesRate());
Expand Down
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ All the broker metrics are labelled with the following labels:
| Name | Type | Description |
|---|---|---|
| pulsar_ml_cache_evictions | Gauge | The number of cache evictions during the last minute. |
| pulsar_ml_cache_inserted_entries_total | Counter | The number of entries inserted into the entry cache. |
| pulsar_ml_cache_evicted_entries_total | Counter | The number of entries evicted from the entry cache. |
| pulsar_ml_cache_entries | Gauge | The number of entries in the entry cache. |
| pulsar_ml_cache_hits_rate | Gauge | The number of cache hits per second on the broker side. |
| pulsar_ml_cache_hits_throughput | Gauge | The amount of data (byte per second) retrieved from the cache on the broker side. |
| pulsar_ml_cache_misses_rate | Gauge | The number of cache missed per second on the broker side. |
Expand Down

0 comments on commit df33f59

Please sign in to comment.