From 4082fe87bd906560edc55e68a10a036f8b081cdb Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Mon, 5 Feb 2024 14:22:47 -0800 Subject: [PATCH] [Tiered caching] Framework changes (#10753) (#11971) * Added javadoc for new files/packages * Added changelog * Fixing javadoc warnings * Addressing comments * Addressing additional minor comments * Moving non null check to builder for OS onHeapCache * Adding package-info for new packages * Removing service and adding different cache interfaces along with event listener support * Fixing gradle missingDoc issue * Changing listener logic, removing tiered cache integration with IRC * Adding opensearch.internal tag for LoadAwareCacheLoader * Fixing thread safety issue * Remove compute function and event listener logic change for TieredCache * Making Cache.compute function private * Adding javadoc and more test for cache.put * Adding write locks to refresh API as well * Removing unwanted EventType class and refactoring one UT * Removing TieredCache interface --------- (cherry picked from commit ebda9639f4ca98a2181bdcb2c43f82224b1d2a34) Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- CHANGELOG.md | 1 + .../org/opensearch/common/cache/Cache.java | 112 +-- .../org/opensearch/common/cache/ICache.java | 34 + .../common/cache/LoadAwareCacheLoader.java | 20 + .../cache/store/OpenSearchOnHeapCache.java | 128 +++ .../common/cache/store/StoreAwareCache.java | 23 + .../StoreAwareCacheRemovalNotification.java | 33 + .../cache/store/StoreAwareCacheValue.java | 35 + .../builders/StoreAwareCacheBuilder.java | 73 ++ .../cache/store/builders/package-info.java | 12 + .../cache/store/enums/CacheStoreType.java | 20 + .../cache/store/enums/package-info.java | 10 + .../StoreAwareCacheEventListener.java | 30 + .../cache/store/listeners/package-info.java | 10 + .../common/cache/store/package-info.java | 10 + .../cache/tier/TieredSpilloverCache.java | 268 ++++++ .../common/cache/tier/package-info.java | 10 + .../indices/IndicesRequestCache.java | 6 +- .../cache/tier/TieredSpilloverCacheTests.java | 786 ++++++++++++++++++ 19 files changed, 1566 insertions(+), 55 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/cache/ICache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/package-info.java create mode 100644 server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 43d38d16e6d5b..dffd56e7ce8ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)) - Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967), [#11983](https://github.com/opensearch-project/OpenSearch/pull/11983)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247)) diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 30e7c014a2ec0..d8aa4e93735e6 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }); if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + value = compute(key, loader); + } + return value; + } - try (ReleasableLock ignored = segment.writeLock.acquire()) { - future = segment.map.putIfAbsent(key, completableFuture); - } + private V compute(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); - } - return ok.value; - } else { - try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - } - } - return null; - } - }; + try (ReleasableLock ignored = segment.writeLock.acquire()) { + future = segment.map.putIfAbsent(key, completableFuture); + } - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + try (ReleasableLock ignored = lruLock.acquire()) { + promote(ok, now); } + return ok.value; } else { - completableValue = future.handle(handler); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + CompletableFuture> sanity = segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + } + } + return null; } + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } return value; } diff --git a/server/src/main/java/org/opensearch/common/cache/ICache.java b/server/src/main/java/org/opensearch/common/cache/ICache.java new file mode 100644 index 0000000000000..c6ea5fca1a8fe --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/ICache.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * Represents a cache interface. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public interface ICache { + V get(K key); + + void put(K key, V value); + + V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception; + + void invalidate(K key); + + void invalidateAll(); + + Iterable keys(); + + long count(); + + void refresh(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java b/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java new file mode 100644 index 0000000000000..57aa4aa39c782 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * Extends a cache loader with awareness of whether the data is loaded or not. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public interface LoadAwareCacheLoader extends CacheLoader { + boolean isLoaded(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java new file mode 100644 index 0000000000000..c497c8dbb7ea9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; + +/** + * This variant of on-heap cache uses OpenSearch custom cache implementation. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public class OpenSearchOnHeapCache implements StoreAwareCache, RemovalListener { + + private final Cache cache; + + private final StoreAwareCacheEventListener eventListener; + + public OpenSearchOnHeapCache(Builder builder) { + CacheBuilder cacheBuilder = CacheBuilder.builder() + .setMaximumWeight(builder.getMaxWeightInBytes()) + .weigher(builder.getWeigher()) + .removalListener(this); + if (builder.getExpireAfterAcess() != null) { + cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess()); + } + cache = cacheBuilder.build(); + this.eventListener = builder.getEventListener(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + if (value != null) { + eventListener.onHit(key, value, CacheStoreType.ON_HEAP); + } else { + eventListener.onMiss(key, CacheStoreType.ON_HEAP); + } + return value; + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + eventListener.onCached(key, value, CacheStoreType.ON_HEAP); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + V value = cache.computeIfAbsent(key, key1 -> loader.load(key)); + if (!loader.isLoaded()) { + eventListener.onHit(key, value, CacheStoreType.ON_HEAP); + } else { + eventListener.onMiss(key, CacheStoreType.ON_HEAP); + eventListener.onCached(key, value, CacheStoreType.ON_HEAP); + } + return value; + } + + @Override + public void invalidate(K key) { + cache.invalidate(key); + } + + @Override + public void invalidateAll() { + cache.invalidateAll(); + } + + @Override + public Iterable keys() { + return cache.keys(); + } + + @Override + public long count() { + return cache.count(); + } + + @Override + public void refresh() { + cache.refresh(); + } + + @Override + public CacheStoreType getTierType() { + return CacheStoreType.ON_HEAP; + } + + @Override + public void onRemoval(RemovalNotification notification) { + eventListener.onRemoval( + new StoreAwareCacheRemovalNotification<>( + notification.getKey(), + notification.getValue(), + notification.getRemovalReason(), + CacheStoreType.ON_HEAP + ) + ); + } + + /** + * Builder object + * @param Type of key + * @param Type of value + */ + public static class Builder extends StoreAwareCacheBuilder { + + @Override + public StoreAwareCache build() { + return new OpenSearchOnHeapCache(this); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java new file mode 100644 index 0000000000000..45ca48d94c140 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Represents a cache with a specific type of store like onHeap, disk etc. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public interface StoreAwareCache extends ICache { + CacheStoreType getTierType(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java new file mode 100644 index 0000000000000..492dbff3532a1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Removal notification for store aware cache. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public class StoreAwareCacheRemovalNotification extends RemovalNotification { + private final CacheStoreType cacheStoreType; + + public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) { + super(key, value, removalReason); + this.cacheStoreType = cacheStoreType; + } + + public CacheStoreType getCacheStoreType() { + return cacheStoreType; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java new file mode 100644 index 0000000000000..4fbbbbfebfaa7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Represents a store aware cache value. + * @param Type of value. + * + * @opensearch.internal + */ +public class StoreAwareCacheValue { + private final V value; + private final CacheStoreType source; + + public StoreAwareCacheValue(V value, CacheStoreType source) { + this.value = value; + this.source = source; + } + + public V getValue() { + return value; + } + + public CacheStoreType getCacheStoreType() { + return source; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java b/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java new file mode 100644 index 0000000000000..fc5aa48aae90f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.builders; + +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.unit.TimeValue; + +import java.util.function.ToLongBiFunction; + +/** + * Builder for store aware cache. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public abstract class StoreAwareCacheBuilder { + + private long maxWeightInBytes; + + private ToLongBiFunction weigher; + + private TimeValue expireAfterAcess; + + private StoreAwareCacheEventListener eventListener; + + public StoreAwareCacheBuilder() {} + + public StoreAwareCacheBuilder setMaximumWeightInBytes(long sizeInBytes) { + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public StoreAwareCacheBuilder setWeigher(ToLongBiFunction weigher) { + this.weigher = weigher; + return this; + } + + public StoreAwareCacheBuilder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public StoreAwareCacheBuilder setEventListener(StoreAwareCacheEventListener eventListener) { + this.eventListener = eventListener; + return this; + } + + public long getMaxWeightInBytes() { + return maxWeightInBytes; + } + + public TimeValue getExpireAfterAcess() { + return expireAfterAcess; + } + + public ToLongBiFunction getWeigher() { + return weigher; + } + + public StoreAwareCacheEventListener getEventListener() { + return eventListener; + } + + public abstract StoreAwareCache build(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java new file mode 100644 index 0000000000000..ac4590ae3bff7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base package for builders. + */ +package org.opensearch.common.cache.store.builders; diff --git a/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java b/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java new file mode 100644 index 0000000000000..04c0825787b66 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.enums; + +/** + * Cache store types in tiered cache. + * + * @opensearch.internal + */ +public enum CacheStoreType { + + ON_HEAP, + DISK; +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java new file mode 100644 index 0000000000000..7a4e0fa7201fd --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package related to tiered cache enums */ +package org.opensearch.common.cache.store.enums; diff --git a/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java b/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java new file mode 100644 index 0000000000000..6d7e4b39aaf9f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.listeners; + +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * This can be used to listen to tiered caching events + * @param Type of key + * @param Type of value + * + * @opensearch.internal + */ +public interface StoreAwareCacheEventListener { + + void onMiss(K key, CacheStoreType cacheStoreType); + + void onRemoval(StoreAwareCacheRemovalNotification notification); + + void onHit(K key, V value, CacheStoreType cacheStoreType); + + void onCached(K key, V value, CacheStoreType cacheStoreType); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java new file mode 100644 index 0000000000000..c3222ca3ffb62 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package related to tiered cache listeners */ +package org.opensearch.common.cache.store.listeners; diff --git a/server/src/main/java/org/opensearch/common/cache/store/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/package-info.java new file mode 100644 index 0000000000000..edc1ecd7d5e7a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for store aware caches. */ +package org.opensearch.common.cache.store; diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java new file mode 100644 index 0000000000000..8b432c9484aed --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java @@ -0,0 +1,268 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.StoreAwareCacheValue; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.common.util.iterable.Iterables; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; + +/** + * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap + * and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full, + * then items are eventually evicted from it and removed which will result in cache miss. + * + * @param Type of key + * @param Type of value + * + * @opensearch.experimental + */ +public class TieredSpilloverCache implements ICache, StoreAwareCacheEventListener { + + // TODO: Remove optional when diskCache implementation is integrated. + private final Optional> onDiskCache; + private final StoreAwareCache onHeapCache; + private final StoreAwareCacheEventListener listener; + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); + ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); + + /** + * Maintains caching tiers in ascending order of cache latency. + */ + private final List> cacheList; + + TieredSpilloverCache(Builder builder) { + Objects.requireNonNull(builder.onHeapCacheBuilder, "onHeap cache builder can't be null"); + this.onHeapCache = builder.onHeapCacheBuilder.setEventListener(this).build(); + if (builder.onDiskCacheBuilder != null) { + this.onDiskCache = Optional.of(builder.onDiskCacheBuilder.setEventListener(this).build()); + } else { + this.onDiskCache = Optional.empty(); + } + this.listener = builder.listener; + this.cacheList = this.onDiskCache.map(diskTier -> Arrays.asList(this.onHeapCache, diskTier)).orElse(List.of(this.onHeapCache)); + } + + // Package private for testing + StoreAwareCache getOnHeapCache() { + return onHeapCache; + } + + // Package private for testing + Optional> getOnDiskCache() { + return onDiskCache; + } + + @Override + public V get(K key) { + StoreAwareCacheValue cacheValue = getValueFromTieredCache(true).apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.getValue(); + } + + @Override + public void put(K key, V value) { + try (ReleasableLock ignore = writeLock.acquire()) { + onHeapCache.put(key, value); + listener.onCached(key, value, CacheStoreType.ON_HEAP); + } + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + // We are skipping calling event listeners at this step as we do another get inside below computeIfAbsent. + // Where we might end up calling onMiss twice for a key not present in onHeap cache. + // Similary we might end up calling both onMiss and onHit for a key, in case we are receiving concurrent + // requests for the same key which requires loading only once. + StoreAwareCacheValue cacheValue = getValueFromTieredCache(false).apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. + // This is needed as there can be many requests for the same key at the same time and we only want to load + // the value once. + V value = null; + try (ReleasableLock ignore = writeLock.acquire()) { + value = onHeapCache.computeIfAbsent(key, loader); + } + if (loader.isLoaded()) { + listener.onMiss(key, CacheStoreType.ON_HEAP); + onDiskCache.ifPresent(diskTier -> listener.onMiss(key, CacheStoreType.DISK)); + listener.onCached(key, value, CacheStoreType.ON_HEAP); + } else { + listener.onHit(key, value, CacheStoreType.ON_HEAP); + } + return value; + } + listener.onHit(key, cacheValue.getValue(), cacheValue.getCacheStoreType()); + if (cacheValue.getCacheStoreType().equals(CacheStoreType.DISK)) { + listener.onMiss(key, CacheStoreType.ON_HEAP); + } + return cacheValue.getValue(); + } + + @Override + public void invalidate(K key) { + // We are trying to invalidate the key from all caches though it would be present in only of them. + // Doing this as we don't know where it is located. We could do a get from both and check that, but what will + // also trigger a hit/miss listener event, so ignoring it for now. + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.invalidate(key); + } + } + } + + @Override + public void invalidateAll() { + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.invalidateAll(); + } + } + } + + /** + * Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. + * @return An iterable over (onHeap + disk) keys + */ + @Override + public Iterable keys() { + Iterable onDiskKeysIterable; + if (onDiskCache.isPresent()) { + onDiskKeysIterable = onDiskCache.get().keys(); + } else { + onDiskKeysIterable = Collections::emptyIterator; + } + return Iterables.concat(onHeapCache.keys(), onDiskKeysIterable); + } + + @Override + public long count() { + long totalCount = 0; + for (StoreAwareCache storeAwareCache : cacheList) { + totalCount += storeAwareCache.count(); + } + return totalCount; + } + + @Override + public void refresh() { + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.refresh(); + } + } + } + + @Override + public void onMiss(K key, CacheStoreType cacheStoreType) { + // Misses for tiered cache are tracked here itself. + } + + @Override + public void onRemoval(StoreAwareCacheRemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason()) + || RemovalReason.CAPACITY.equals(notification.getRemovalReason())) { + switch (notification.getCacheStoreType()) { + case ON_HEAP: + try (ReleasableLock ignore = writeLock.acquire()) { + onDiskCache.ifPresent(diskTier -> { diskTier.put(notification.getKey(), notification.getValue()); }); + } + onDiskCache.ifPresent( + diskTier -> listener.onCached(notification.getKey(), notification.getValue(), CacheStoreType.DISK) + ); + break; + default: + break; + } + } + listener.onRemoval(notification); + } + + @Override + public void onHit(K key, V value, CacheStoreType cacheStoreType) { + // Hits for tiered cache are tracked here itself. + } + + @Override + public void onCached(K key, V value, CacheStoreType cacheStoreType) { + // onCached events for tiered cache are tracked here itself. + } + + private Function> getValueFromTieredCache(boolean triggerEventListener) { + return key -> { + try (ReleasableLock ignore = readLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + V value = storeAwareCache.get(key); + if (value != null) { + if (triggerEventListener) { + listener.onHit(key, value, storeAwareCache.getTierType()); + } + return new StoreAwareCacheValue<>(value, storeAwareCache.getTierType()); + } else { + if (triggerEventListener) { + listener.onMiss(key, storeAwareCache.getTierType()); + } + } + } + } + return null; + }; + } + + /** + * Builder object for tiered spillover cache. + * @param Type of key + * @param Type of value + */ + public static class Builder { + private StoreAwareCacheBuilder onHeapCacheBuilder; + private StoreAwareCacheBuilder onDiskCacheBuilder; + private StoreAwareCacheEventListener listener; + + public Builder() {} + + public Builder setOnHeapCacheBuilder(StoreAwareCacheBuilder onHeapCacheBuilder) { + this.onHeapCacheBuilder = onHeapCacheBuilder; + return this; + } + + public Builder setOnDiskCacheBuilder(StoreAwareCacheBuilder onDiskCacheBuilder) { + this.onDiskCacheBuilder = onDiskCacheBuilder; + return this; + } + + public Builder setListener(StoreAwareCacheEventListener listener) { + this.listener = listener; + return this; + } + + public TieredSpilloverCache build() { + return new TieredSpilloverCache<>(this); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/package-info.java b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java new file mode 100644 index 0000000000000..7ad81dbe3073c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for cache tier support. */ +package org.opensearch.common.cache.tier; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..4a19f8eb8714d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -247,7 +247,7 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { + public static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -328,6 +328,9 @@ public int hashCode() { } } + /** + * Logic to clean up in-memory cache. + */ synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); @@ -355,7 +358,6 @@ synchronized void cleanCache() { } } } - cache.refresh(); } diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java new file mode 100644 index 0000000000000..eb75244c6f8b1 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java @@ -0,0 +1,786 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TieredSpilloverCacheTests extends OpenSearchTestCase { + + public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + randomIntBetween(1, 4), + eventListener, + 0 + ); + int numOfItems1 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + List keys = new ArrayList<>(); + // Put values in cache. + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + keys.add(key); + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + int cacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + // Hit cache with stored key + cacheHit++; + int index = randomIntBetween(0, keys.size() - 1); + tieredSpilloverCache.computeIfAbsent(keys.get(index), getLoadAwareCacheLoader()); + } else { + // Hit cache with randomized key which is expected to miss cache always. + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), getLoadAwareCacheLoader()); + cacheMiss++; + } + } + assertEquals(cacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(numOfItems1 + cacheMiss, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + } + + public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + MockCacheEventListener eventListener = new MockCacheEventListener(); + StoreAwareCacheBuilder cacheBuilder = new OpenSearchOnHeapCache.Builder().setMaximumWeightInBytes( + onHeapCacheSize * 50 + ).setWeigher((k, v) -> 50); // Will support onHeapCacheSize entries. + + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diskCacheSize) + .setDeliberateDelay(0); + + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(cacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + long actualDiskCacheSize = tieredSpilloverCache.getOnDiskCache().get().count(); + assertEquals(numOfItems1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(actualDiskCacheSize, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + + assertEquals( + eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count(), + eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count() + ); + assertEquals(actualDiskCacheSize, eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count()); + + tieredSpilloverCache.getOnHeapCache().keys().forEach(onHeapKeys::add); + tieredSpilloverCache.getOnDiskCache().get().keys().forEach(diskTierKeys::add); + + assertEquals(tieredSpilloverCache.getOnHeapCache().count(), onHeapKeys.size()); + assertEquals(tieredSpilloverCache.getOnDiskCache().get().count(), diskTierKeys.size()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(50, 200); + int onHeapCacheHit = 0; + int diskCacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { // Hit cache with key stored in onHeap cache. + onHeapCacheHit++; + int index = randomIntBetween(0, onHeapKeys.size() - 1); + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(onHeapKeys.get(index), loadAwareCacheLoader); + assertFalse(loadAwareCacheLoader.isLoaded()); + } else { // Hit cache with key stored in disk cache. + diskCacheHit++; + int index = randomIntBetween(0, diskTierKeys.size() - 1); + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(diskTierKeys.get(index), loadAwareCacheLoader); + assertFalse(loadAwareCacheLoader.isLoaded()); + } + } + for (int iter = 0; iter < randomIntBetween(50, 200); iter++) { + // Hit cache with randomized key which is expected to miss cache always. + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + cacheMiss++; + } + // On heap cache misses would also include diskCacheHits as it means it missed onHeap cache. + assertEquals(numOfItems1 + cacheMiss + diskCacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(onHeapCacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(cacheMiss + numOfItems1, eventListener.enumMap.get(CacheStoreType.DISK).missCount.count()); + assertEquals(diskCacheHit, eventListener.enumMap.get(CacheStoreType.DISK).hitCount.count()); + } + + public void testComputeIfAbsentWithEvictionsFromBothTier() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count() > 0); + assertTrue(eventListener.enumMap.get(CacheStoreType.DISK).evictionsMetric.count() > 0); + } + + public void testGetAndCount() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + + for (int iter = 0; iter < numOfItems1; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { + int index = randomIntBetween(0, onHeapKeys.size() - 1); + assertNotNull(tieredSpilloverCache.get(onHeapKeys.get(index))); + } else { + int index = randomIntBetween(0, diskTierKeys.size() - 1); + assertNotNull(tieredSpilloverCache.get(diskTierKeys.get(index))); + } + } else { + assertNull(tieredSpilloverCache.get(UUID.randomUUID().toString())); + } + } + assertEquals(numOfItems1, tieredSpilloverCache.count()); + } + + public void testWithDiskTierNull() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockCacheEventListener eventListener = new MockCacheEventListener(); + + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(onHeapCacheBuilder) + .setListener(eventListener) + .build(); + + int numOfItems = randomIntBetween(onHeapCacheSize + 1, onHeapCacheSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), loadAwareCacheLoader); + } + assertTrue(eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count() > 0); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).evictionsMetric.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).missCount.count()); + } + + public void testPut() { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + tieredSpilloverCache.put(key, value); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).cachedCount.count()); + assertEquals(1, tieredSpilloverCache.count()); + } + + public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(200, 400); + int diskCacheSize = randomIntBetween(450, 800); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + for (int i = 0; i < onHeapCacheSize; i++) { + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) throws Exception { + return UUID.randomUUID().toString(); + } + }); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(0, tieredSpilloverCache.getOnDiskCache().get().count()); + + // Again try to put OnHeap cache capacity amount of new items. + List newKeyList = new ArrayList<>(); + for (int i = 0; i < onHeapCacheSize; i++) { + newKeyList.add(UUID.randomUUID().toString()); + } + + for (int i = 0; i < newKeyList.size(); i++) { + tieredSpilloverCache.computeIfAbsent(newKeyList.get(i), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) { + return UUID.randomUUID().toString(); + } + }); + } + + // Verify that new items are part of onHeap cache. + List actualOnHeapCacheKeys = new ArrayList<>(); + tieredSpilloverCache.getOnHeapCache().keys().forEach(actualOnHeapCacheKeys::add); + + assertEquals(newKeyList.size(), actualOnHeapCacheKeys.size()); + for (int i = 0; i < actualOnHeapCacheKeys.size(); i++) { + assertTrue(newKeyList.contains(actualOnHeapCacheKeys.get(i))); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnDiskCache().get().count()); + } + + public void testInvalidate() { + int onHeapCacheSize = 1; + int diskCacheSize = 10; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + // First try to invalidate without the key present in cache. + tieredSpilloverCache.invalidate(key); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).invalidationMetric.count()); + + // Now try to invalidate with the key present in onHeap cache. + tieredSpilloverCache.put(key, value); + tieredSpilloverCache.invalidate(key); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).invalidationMetric.count()); + assertEquals(0, tieredSpilloverCache.count()); + + tieredSpilloverCache.put(key, value); + // Put another key/value so that one of the item is evicted to disk cache. + String key2 = UUID.randomUUID().toString(); + tieredSpilloverCache.put(key2, UUID.randomUUID().toString()); + assertEquals(2, tieredSpilloverCache.count()); + // Again invalidate older key + tieredSpilloverCache.invalidate(key); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.DISK).invalidationMetric.count()); + assertEquals(1, tieredSpilloverCache.count()); + } + + public void testCacheKeys() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + // During first round add onHeapCacheSize entries. Will go to onHeap cache initially. + for (int i = 0; i < onHeapCacheSize; i++) { + String key = UUID.randomUUID().toString(); + diskTierKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); + } + // In another round, add another onHeapCacheSize entries. These will go to onHeap and above ones will be + // evicted to onDisk cache. + for (int i = 0; i < onHeapCacheSize; i++) { + String key = UUID.randomUUID().toString(); + onHeapKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); + } + + List actualOnHeapKeys = new ArrayList<>(); + List actualOnDiskKeys = new ArrayList<>(); + Iterable onHeapiterable = tieredSpilloverCache.getOnHeapCache().keys(); + Iterable onDiskiterable = tieredSpilloverCache.getOnDiskCache().get().keys(); + onHeapiterable.iterator().forEachRemaining(actualOnHeapKeys::add); + onDiskiterable.iterator().forEachRemaining(actualOnDiskKeys::add); + for (String onHeapKey : onHeapKeys) { + assertTrue(actualOnHeapKeys.contains(onHeapKey)); + } + for (String onDiskKey : actualOnDiskKeys) { + assertTrue(actualOnDiskKeys.contains(onDiskKey)); + } + + // Testing keys() which returns all keys. + List actualMergedKeys = new ArrayList<>(); + List expectedMergedKeys = new ArrayList<>(); + expectedMergedKeys.addAll(onHeapKeys); + expectedMergedKeys.addAll(diskTierKeys); + + Iterable mergedIterable = tieredSpilloverCache.keys(); + mergedIterable.iterator().forEachRemaining(actualMergedKeys::add); + + assertEquals(expectedMergedKeys.size(), actualMergedKeys.size()); + for (String key : expectedMergedKeys) { + assertTrue(actualMergedKeys.contains(key)); + } + } + + public void testRefresh() { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + tieredSpilloverCache.refresh(); + } + + public void testInvalidateAll() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, tieredSpilloverCache.count()); + tieredSpilloverCache.invalidateAll(); + assertEquals(0, tieredSpilloverCache.count()); + } + + public void testComputeIfAbsentConcurrently() throws Exception { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public Object load(Object key) throws Exception { + isLoaded = true; + return value; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + int numberOfTimesKeyLoaded = 0; + assertEquals(numberOfSameKeys, loadAwareCacheLoaderList.size()); + for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { + LoadAwareCacheLoader loader = loadAwareCacheLoaderList.get(i); + if (loader.isLoaded()) { + numberOfTimesKeyLoaded++; + } + } + assertEquals(1, numberOfTimesKeyLoaded); // It should be loaded only once. + } + + public void testConcurrencyForEvictionFlow() throws Exception { + int diskCacheSize = randomIntBetween(450, 800); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + StoreAwareCacheBuilder cacheBuilder = new OpenSearchOnHeapCache.Builder().setMaximumWeightInBytes( + 200 + ).setWeigher((k, v) -> 150); + + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diskCacheSize) + .setDeliberateDelay(500); + + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(cacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + + String keyToBeEvicted = "key1"; + String secondKey = "key2"; + + // Put first key on tiered cache. Will go into onHeap cache. + tieredSpilloverCache.computeIfAbsent(keyToBeEvicted, new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) { + return UUID.randomUUID().toString(); + } + }); + CountDownLatch countDownLatch = new CountDownLatch(1); + CountDownLatch countDownLatch1 = new CountDownLatch(1); + // Put second key on tiered cache. Will cause eviction of first key from onHeap cache and should go into + // disk cache. + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + Thread thread = new Thread(() -> { + try { + tieredSpilloverCache.computeIfAbsent(secondKey, loadAwareCacheLoader); + countDownLatch1.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + thread.start(); + assertBusy(() -> { assertTrue(loadAwareCacheLoader.isLoaded()); }, 100, TimeUnit.MILLISECONDS); // We wait for new key to be loaded + // after which it eviction flow is + // guaranteed to occur. + StoreAwareCache onDiskCache = tieredSpilloverCache.getOnDiskCache().get(); + + // Now on a different thread, try to get key(above one which got evicted) from tiered cache. We expect this + // should return not null value as it should be present on diskCache. + AtomicReference actualValue = new AtomicReference<>(); + Thread thread1 = new Thread(() -> { + try { + actualValue.set(tieredSpilloverCache.get(keyToBeEvicted)); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + thread1.start(); + countDownLatch.await(); + assertNotNull(actualValue.get()); + countDownLatch1.await(); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + assertEquals(1, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(1, onDiskCache.count()); + assertNotNull(onDiskCache.get(keyToBeEvicted)); + } + + class MockCacheEventListener implements StoreAwareCacheEventListener { + + EnumMap enumMap = new EnumMap<>(CacheStoreType.class); + + MockCacheEventListener() { + for (CacheStoreType cacheStoreType : CacheStoreType.values()) { + enumMap.put(cacheStoreType, new TestStatsHolder()); + } + } + + @Override + public void onMiss(K key, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).missCount.inc(); + } + + @Override + public void onRemoval(StoreAwareCacheRemovalNotification notification) { + if (notification.getRemovalReason().equals(RemovalReason.EVICTED)) { + enumMap.get(notification.getCacheStoreType()).evictionsMetric.inc(); + } else if (notification.getRemovalReason().equals(RemovalReason.INVALIDATED)) { + enumMap.get(notification.getCacheStoreType()).invalidationMetric.inc(); + } + } + + @Override + public void onHit(K key, V value, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).hitCount.inc(); + } + + @Override + public void onCached(K key, V value, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).cachedCount.inc(); + } + + class TestStatsHolder { + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); + final CounterMetric cachedCount = new CounterMetric(); + final CounterMetric invalidationMetric = new CounterMetric(); + } + } + + private LoadAwareCacheLoader getLoadAwareCacheLoader() { + return new LoadAwareCacheLoader() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + + private TieredSpilloverCache intializeTieredSpilloverCache( + int onHeapCacheSize, + int diksCacheSize, + StoreAwareCacheEventListener eventListener, + long diskDeliberateDelay + ) { + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diksCacheSize) + .setDeliberateDelay(diskDeliberateDelay); + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries + return new TieredSpilloverCache.Builder().setOnHeapCacheBuilder(onHeapCacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + } +} + +class MockOnDiskCache implements StoreAwareCache { + + Map cache; + int maxSize; + + long delay; + StoreAwareCacheEventListener eventListener; + + MockOnDiskCache(int maxSize, StoreAwareCacheEventListener eventListener, long delay) { + this.maxSize = maxSize; + this.eventListener = eventListener; + this.delay = delay; + this.cache = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + if (value != null) { + eventListener.onHit(key, value, CacheStoreType.DISK); + } else { + eventListener.onMiss(key, CacheStoreType.DISK); + } + return value; + } + + @Override + public void put(K key, V value) { + if (this.cache.size() >= maxSize) { // For simplification + eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED, CacheStoreType.DISK)); + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cache.put(key, value); + eventListener.onCached(key, value, CacheStoreType.DISK); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + V value = cache.computeIfAbsent(key, key1 -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + if (!loader.isLoaded()) { + eventListener.onHit(key, value, CacheStoreType.DISK); + } else { + eventListener.onMiss(key, CacheStoreType.DISK); + eventListener.onCached(key, value, CacheStoreType.DISK); + } + return value; + } + + @Override + public void invalidate(K key) { + if (this.cache.containsKey(key)) { + eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.DISK)); + } + this.cache.remove(key); + } + + @Override + public void invalidateAll() { + this.cache.clear(); + } + + @Override + public Iterable keys() { + return this.cache.keySet(); + } + + @Override + public long count() { + return this.cache.size(); + } + + @Override + public void refresh() {} + + @Override + public CacheStoreType getTierType() { + return CacheStoreType.DISK; + } + + public static class Builder extends StoreAwareCacheBuilder { + + int maxSize; + long delay; + + @Override + public StoreAwareCache build() { + return new MockOnDiskCache(maxSize, this.getEventListener(), delay); + } + + public Builder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder setDeliberateDelay(long millis) { + this.delay = millis; + return this; + } + } +}