Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
SOLR-15555 Improved caching on FilterQuery (apache#2572)
Browse files Browse the repository at this point in the history
Create a new cache mode for CaffeineCache where we can optionally use an
async cache instead of the synchronous implementation. This is useful
for cases (esp FilterQuery) where many identical requests come in near
the same time and they would otherwise race to fill the same cache slot.

CaffeineCache computeIfAbsent now accepts an IOFunction instead of
the non-throwing java.util.Function interface.

This required an update to CaffeineCache 2.9, which updates putIfAbsent
with an optimistic get.

Also incidentally fixes a rare bug where cache ramBytesUsed would be
incorrectly reported under heavy cache contention/eviction loads.

Makes Async Caches the default.

(cherry picked from commit ac5df22)
  • Loading branch information
madrob authored and magibney committed Jul 27, 2022
1 parent fec2ac2 commit d94a2a9
Show file tree
Hide file tree
Showing 24 changed files with 360 additions and 237 deletions.
2 changes: 1 addition & 1 deletion lucene/ivy-versions.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ com.fasterxml.jackson.core.version = 2.11.2
/com.fasterxml.jackson.core/jackson-databind = ${com.fasterxml.jackson.core.version}
/com.fasterxml.jackson.dataformat/jackson-dataformat-smile = ${com.fasterxml.jackson.core.version}

/com.github.ben-manes.caffeine/caffeine = 2.8.4
/com.github.ben-manes.caffeine/caffeine = 2.9.2
/com.github.virtuald/curvesapi = 1.06

/com.github.zafarkhaja/java-semver = 0.9.0
Expand Down
10 changes: 7 additions & 3 deletions solr/core/src/java/org/apache/solr/query/SolrRangeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.solr.search.ExtendedQueryBase;
import org.apache.solr.search.Filter;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.TestInjection;

/** @lucene.experimental */
public final class SolrRangeQuery extends ExtendedQueryBase implements DocSetProducer {
Expand Down Expand Up @@ -168,6 +169,7 @@ public DocSet createDocSet(SolrIndexSearcher searcher) throws IOException {
}

private DocSet createDocSet(SolrIndexSearcher searcher, long cost) throws IOException {
assert TestInjection.injectDocSetDelay();
int maxDoc = searcher.maxDoc();
BitDocSet liveDocs = searcher.getLiveDocSet();
FixedBitSet liveBits = liveDocs.size() == maxDoc ? null : liveDocs.getBits();
Expand Down Expand Up @@ -393,23 +395,24 @@ private SegState getSegState(LeafReaderContext context) throws IOException {
// first time, check our filter cache
boolean doCheck = !checkedFilterCache && context.ord == 0;
checkedFilterCache = true;
SolrIndexSearcher solrSearcher = null;
final SolrIndexSearcher solrSearcher;
if (doCheck && searcher instanceof SolrIndexSearcher) {
solrSearcher = (SolrIndexSearcher)searcher;
solrSearcher = (SolrIndexSearcher) searcher;
if (solrSearcher.getFilterCache() == null) {
doCheck = false;
} else {
solrSearcher = (SolrIndexSearcher)searcher;
DocSet answer = solrSearcher.getFilterCache().get(SolrRangeQuery.this);
if (answer != null) {
filter = answer.getTopFilter();
}
}
} else {
doCheck = false;
solrSearcher = null;
}

if (filter != null) {
// I'm not sure this ever happens
return segStates[context.ord] = new SegState(filter.getDocIdSet(context, null));
}

Expand Down Expand Up @@ -440,6 +443,7 @@ private SegState getSegState(LeafReaderContext context) throws IOException {

if (doCheck) {
DocSet answer = createDocSet(solrSearcher, count);
// This can be a naked put because the cache usually gets checked in SolrIndexSearcher
solrSearcher.getFilterCache().put(SolrRangeQuery.this, answer);
filter = answer.getTopFilter();
return segStates[context.ord] = new SegState(filter.getDocIdSet(context, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,7 @@ public Shape value() throws IOException {
throw new IllegalStateException("Leaf " + readerContext.reader() + " is not suited for caching");
}
PerSegCacheKey key = new PerSegCacheKey(cacheHelper.getKey(), docId);
Shape shape = cache.computeIfAbsent(key, k -> {
try {
return targetFuncValues.value();
} catch (IOException e) {
return null;
}
});
Shape shape = cache.computeIfAbsent(key, k -> targetFuncValues.value());
if (shape != null) {
//optimize shape on a cache hit if possible. This must be thread-safe and it is.
if (shape instanceof JtsGeometry) {
Expand Down
148 changes: 116 additions & 32 deletions solr/core/src/java/org/apache/solr/search/CaffeineCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.solr.search;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -25,21 +26,24 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.solr.common.SolrException;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.util.IOFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,20 +76,28 @@ public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V
+ RamUsageEstimator.shallowSizeOfInstance(CacheStats.class)
+ 2 * RamUsageEstimator.shallowSizeOfInstance(LongAdder.class);

private static final long RAM_BYTES_PER_FUTURE = RamUsageEstimator.shallowSizeOfInstance(CompletableFuture.class);

private Executor executor;

private CacheStats priorStats;
private long priorHits;
private long priorInserts;
private long priorLookups;

private String description = "Caffeine Cache";
private LongAdder hits;
private LongAdder inserts;
private LongAdder lookups;
private Cache<K,V> cache;
private AsyncCache<K,V> asyncCache;
private long warmupTime;
private int maxSize;
private long maxRamBytes;
private int initialSize;
private int maxIdleTimeSec;
private boolean cleanupThread;
private boolean async;

private Set<String> metricNames = ConcurrentHashMap.newKeySet();
private MetricsMap cacheMap;
Expand All @@ -104,7 +116,7 @@ public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
super.init(args, regenerator);
String str = (String) args.get(SIZE_PARAM);
maxSize = (str == null) ? 1024 : Integer.parseInt(str);
str = (String) args.get("initialSize");
str = (String) args.get(INITIAL_SIZE_PARAM);
initialSize = Math.min((str == null) ? 1024 : Integer.parseInt(str), maxSize);
str = (String) args.get(MAX_IDLE_TIME_PARAM);
if (str == null) {
Expand All @@ -115,9 +127,12 @@ public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
str = (String) args.get(MAX_RAM_MB_PARAM);
int maxRamMB = str == null ? -1 : Double.valueOf(str).intValue();
maxRamBytes = maxRamMB < 0 ? Long.MAX_VALUE : maxRamMB * 1024L * 1024L;
str = (String) args.get(CLEANUP_THREAD_PARAM);
cleanupThread = str != null && Boolean.parseBoolean(str);
if (cleanupThread) {
cleanupThread = Boolean.parseBoolean((String) args.get(CLEANUP_THREAD_PARAM));
async = Boolean.parseBoolean((String) args.getOrDefault(ASYNC_PARAM, "true"));
if (async) {
// We record futures in the map to decrease bucket-lock contention, but need computation handled in same thread
executor = Runnable::run;
} else if (cleanupThread) {
executor = ForkJoinPool.commonPool();
} else {
executor = Runnable::run;
Expand All @@ -126,7 +141,9 @@ public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
description = generateDescription(maxSize, initialSize);

cache = buildCache(null);
hits = new LongAdder();
inserts = new LongAdder();
lookups = new LongAdder();

initialRamBytes =
RamUsageEstimator.shallowSizeOfInstance(cache.getClass()) +
Expand All @@ -153,7 +170,13 @@ private Cache<K, V> buildCache(Cache<K, V> prev) {
} else {
builder.maximumSize(maxSize);
}
Cache<K, V> newCache = builder.build();
Cache<K, V> newCache;
if (async) {
asyncCache = builder.buildAsync();
newCache = asyncCache.synchronous();
} else {
newCache = builder.build();
}
if (prev != null) {
newCache.putAll(prev.asMap());
}
Expand All @@ -179,33 +202,82 @@ public V get(K key) {
return cache.getIfPresent(key);
}

@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
return cache.get(key, k -> {
inserts.increment();
V value = mappingFunction.apply(k);
if (value == null) {
return null;
private V computeAsync(K key, IOFunction<? super K, ? extends V> mappingFunction) throws IOException {
CompletableFuture<V> future = new CompletableFuture<>();
CompletableFuture<V> result = asyncCache.asMap().putIfAbsent(key, future);
lookups.increment();
if (result != null) {
try {
// Another thread is already working on this computation, wait for them to finish
V value = result.join();
hits.increment();
return value;
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
// Computation had an IOException, likely index problems, so fail this result too
throw (IOException) cause;
}
throw e;
}
ramBytes.add(RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
RamUsageEstimator.sizeOfObject(value, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
ramBytes.add(RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
}
try {
// We reserved the slot, so we do the work
V value = mappingFunction.apply(key);
future.complete(value); // This will update the weight and expiration
recordRamBytes(key, null, value);
inserts.increment();
return value;
});
} catch (Error | RuntimeException | IOException e) {
// TimeExceeded exception is runtime and will bubble up from here
future.completeExceptionally(e);
throw e;
}
}

@Override
public V computeIfAbsent(K key, IOFunction<? super K, ? extends V> mappingFunction) throws IOException {
if (async) {
return computeAsync(key, mappingFunction);
}

try {
return cache.get(key, k -> {
V value;
try {
value = mappingFunction.apply(k);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (value == null) {
return null;
}
recordRamBytes(key, null, value);
inserts.increment();
return value;
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

@Override
public V put(K key, V val) {
inserts.increment();
V old = cache.asMap().put(key, val);
recordRamBytes(key, old, val);
return old;
}

private void recordRamBytes(K key, V oldValue, V newValue) {
ramBytes.add(RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
RamUsageEstimator.sizeOfObject(val, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
if (old != null) {
ramBytes.add(- RamUsageEstimator.sizeOfObject(old, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
} else {
RamUsageEstimator.sizeOfObject(newValue, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
if (oldValue == null) {
ramBytes.add(RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
if (async) ramBytes.add(RAM_BYTES_PER_FUTURE);
} else {
ramBytes.add(- RamUsageEstimator.sizeOfObject(oldValue, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
}
return old;
}

@Override
Expand Down Expand Up @@ -303,9 +375,8 @@ public void warm(SolrIndexSearcher searcher, SolrCache<K,V> old) {

// warm entries
if (isAutowarmingOn()) {
Eviction<K, V> policy = other.cache.policy().eviction().get();
int size = autowarm.getWarmCount(other.cache.asMap().size());
hottest = policy.hottest(size);
hottest = other.cache.policy().eviction().map(p -> p.hottest(size)).orElse(Collections.emptyMap());
}

for (Entry<K, V> entry : hottest.entrySet()) {
Expand All @@ -321,15 +392,20 @@ public void warm(SolrIndexSearcher searcher, SolrCache<K,V> old) {
}
}

hits.reset();
inserts.reset();
priorStats = other.cache.stats().plus(other.priorStats);
lookups.reset();
CacheStats oldStats = other.cache.stats();
priorStats = oldStats.plus(other.priorStats);
priorHits = oldStats.hitCount() + other.hits.sum() + other.priorHits;
priorInserts = other.inserts.sum() + other.priorInserts;
priorLookups = oldStats.requestCount() + other.lookups.sum() + other.priorLookups;
warmupTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - warmingStartTime, TimeUnit.NANOSECONDS);
}

/** Returns the description of this cache. */
private String generateDescription(int limit, int initialSize) {
return String.format(Locale.ROOT, "TinyLfu Cache(maxSize=%d, initialSize=%d%s)",
return String.format(Locale.ROOT, "Caffeine Cache(maxSize=%d, initialSize=%d%s)",
limit, initialSize, isAutowarmingOn() ? (", " + getAutowarmDescription()) : "");
}

Expand Down Expand Up @@ -376,11 +452,13 @@ public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
cacheMap = new MetricsMap(map -> {
if (cache != null) {
CacheStats stats = cache.stats();
long hitCount = stats.hitCount() + hits.sum();
long insertCount = inserts.sum();
long lookupCount = stats.requestCount() + lookups.sum();

map.put(LOOKUPS_PARAM, stats.requestCount());
map.put(HITS_PARAM, stats.hitCount());
map.put(HIT_RATIO_PARAM, stats.hitRate());
map.put(LOOKUPS_PARAM, lookupCount);
map.put(HITS_PARAM, hitCount);
map.put(HIT_RATIO_PARAM, hitRate(hitCount, lookupCount));
map.put(INSERTS_PARAM, insertCount);
map.put(EVICTIONS_PARAM, stats.evictionCount());
map.put(SIZE_PARAM, cache.asMap().size());
Expand All @@ -389,13 +467,19 @@ public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
map.put(MAX_RAM_MB_PARAM, getMaxRamMB());

CacheStats cumulativeStats = priorStats.plus(stats);
map.put("cumulative_lookups", cumulativeStats.requestCount());
map.put("cumulative_hits", cumulativeStats.hitCount());
map.put("cumulative_hitratio", cumulativeStats.hitRate());
long cumLookups = priorLookups + lookupCount;
long cumHits = priorHits + hitCount;
map.put("cumulative_lookups", cumLookups);
map.put("cumulative_hits", cumHits);
map.put("cumulative_hitratio", hitRate(cumHits, cumLookups));
map.put("cumulative_inserts", priorInserts + insertCount);
map.put("cumulative_evictions", cumulativeStats.evictionCount());
}
});
solrMetricsContext.gauge(this, cacheMap, true, scope, getCategory().toString());
}

private static double hitRate(long hitCount, long lookupCount) {
return lookupCount == 0 ? 1.0 : (double) hitCount / lookupCount;
}
}
Loading

0 comments on commit d94a2a9

Please sign in to comment.