From 58170bf36f076293058ff40dce4177aaadc0367b Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:12:55 -0700 Subject: [PATCH] [Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag (#12533) * Adding changelog * Fixing gradle build issue * Fixing CacheService test * Adding UT in IndicesRequestCache with feature flag for more coverage * Updating changelog and renaming feature flag setting * Moving feature flag setting handling logic to CacheService by maintaining backward compatibility * Fixing broken UTs --------- Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- distribution/src/config/opensearch.yml | 3 + modules/cache-common/build.gradle | 2 + .../TieredSpilloverCacheIT.java | 150 ++++++++++++++++ .../cache/common/tier/MockDiskCache.java | 133 ++++++++++++++ .../tier/TieredSpilloverCacheTests.java | 166 ++++-------------- .../opensearch/cache/EhcacheDiskCacheIT.java | 48 +++++ .../indices/IndicesRequestCacheIT.java | 9 +- .../common/cache/service/CacheService.java | 13 +- .../common/cache/settings/CacheSettings.java | 2 +- .../cache/store/OpenSearchOnHeapCache.java | 21 ++- .../cache/store/config/CacheConfig.java | 35 ++++ .../OpenSearchOnHeapCacheSettings.java | 19 +- .../common/settings/ClusterSettings.java | 6 +- .../common/settings/FeatureFlagSettings.java | 3 +- .../opensearch/common/util/FeatureFlags.java | 7 + .../indices/IndicesRequestCache.java | 37 ++-- .../opensearch/indices/IndicesService.java | 6 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../cache/service/CacheServiceTests.java | 94 ++++++++-- .../indices/IndicesRequestCacheTests.java | 80 ++++++++- .../snapshots/SnapshotResiliencyTests.java | 5 +- 21 files changed, 666 insertions(+), 176 deletions(-) create mode 100644 modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java create mode 100644 modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java create mode 100644 plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index 1d2cfe7eccae6..1a84ea24c73fc 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -121,3 +121,6 @@ ${path.logs} # index searcher threadpool. # #opensearch.experimental.feature.concurrent_segment_search.enabled: false +# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting. +# +#opensearch.experimental.feature.pluggable.caching.enabled: false diff --git a/modules/cache-common/build.gradle b/modules/cache-common/build.gradle index c7052896e609b..98cdec83b9ad1 100644 --- a/modules/cache-common/build.gradle +++ b/modules/cache-common/build.gradle @@ -6,6 +6,8 @@ * compatible open source license. */ +apply plugin: 'opensearch.internal-cluster-test' + opensearchplugin { description 'Module for caches which are optional and do not require additional security permission' classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin' diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java new file mode 100644 index 0000000000000..568ac4d188c51 --- /dev/null +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchType; +import org.opensearch.client.Client; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.CachePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.greaterThan; + +public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + MockDiskCache.MockDiskCacheFactory.NAME + ) + .build(); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream() + .anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin")) + ); + } + + public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("f", "type=date") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build()) + .get() + ); + indexRandom( + true, + client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"), + client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z") + ); + ensureSearchable("index"); + + // This is not a random example: serialization with time zones writes shared strings + // which used to not work well with the query cache because of the handles stream output + // see #9500 + final SearchResponse r1 = client.prepareSearch("index") + .setSize(0) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .addAggregation( + dateHistogram("histo").field("f") + .timeZone(ZoneId.of("+01:00")) + .minDocCount(0) + .dateHistogramInterval(DateHistogramInterval.MONTH) + ) + .get(); + assertSearchResponse(r1); + + // The cached is actually used + assertThat( + client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), + greaterThan(0L) + ); + } + + public static class MockDiskCachePlugin extends Plugin implements CachePlugin { + + public MockDiskCachePlugin() {} + + @Override + public Map getCacheFactoryMap() { + return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000)); + } + + @Override + public String getName() { + return "mock_disk_plugin"; + } + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java new file mode 100644 index 0000000000000..79b57b80c3aa0 --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.store.builders.ICacheBuilder; +import org.opensearch.common.cache.store.config.CacheConfig; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MockDiskCache implements ICache { + + Map cache; + int maxSize; + long delay; + + public MockDiskCache(int maxSize, long delay) { + this.maxSize = maxSize; + this.delay = delay; + this.cache = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + return value; + } + + @Override + public void put(K key, V value) { + if (this.cache.size() >= maxSize) { // For simplification + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { + V value = cache.computeIfAbsent(key, key1 -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return value; + } + + @Override + public void invalidate(K key) { + this.cache.remove(key); + } + + @Override + public void invalidateAll() { + this.cache.clear(); + } + + @Override + public Iterable keys() { + return this.cache.keySet(); + } + + @Override + public long count() { + return this.cache.size(); + } + + @Override + public void refresh() {} + + @Override + public void close() { + + } + + public static class MockDiskCacheFactory implements Factory { + + public static final String NAME = "mockDiskCache"; + final long delay; + final int maxSize; + + public MockDiskCacheFactory(long delay, int maxSize) { + this.delay = delay; + this.maxSize = maxSize; + } + + @Override + public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { + return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); + } + + @Override + public String getCacheName() { + return NAME; + } + } + + public static class Builder extends ICacheBuilder { + + int maxSize; + long delay; + + @Override + public ICache build() { + return new MockDiskCache(this.maxSize, this.delay); + } + + public Builder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder setDeliberateDelay(long millis) { + this.delay = millis; + return this; + } + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 7c9569f5defe2..2f7938934300e 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,19 +13,19 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; -import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -105,7 +105,7 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -126,8 +126,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ); @@ -164,7 +164,7 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -187,8 +187,8 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -232,8 +232,8 @@ public void testWithFactoryCreationWithDiskCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -256,6 +256,11 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -266,7 +271,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { ) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(0, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize); TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() .setOnHeapCacheFactory(onHeapCacheFactory) @@ -444,6 +449,10 @@ public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { diskCacheSize, removalListener, Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -741,13 +750,18 @@ public void testConcurrencyForEvictionFlow() throws Exception { MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); - ICache.Factory diskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(500, diskCacheSize); + ICache.Factory diskCacheFactory = new MockDiskCache.MockDiskCacheFactory(500, diskCacheSize); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) .setKeyType(String.class) .setWeigher((k, v) -> 150) .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -858,10 +872,19 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) - .setSettings(settings) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); return new TieredSpilloverCache.Builder().setCacheType(CacheType.INDICES_REQUEST_CACHE) .setRemovalListener(removalListener) @@ -871,118 +894,3 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); } } - -class MockOnDiskCache implements ICache { - - Map cache; - int maxSize; - long delay; - - MockOnDiskCache(int maxSize, long delay) { - this.maxSize = maxSize; - this.delay = delay; - this.cache = new ConcurrentHashMap(); - } - - @Override - public V get(K key) { - V value = cache.get(key); - return value; - } - - @Override - public void put(K key, V value) { - if (this.cache.size() >= maxSize) { // For simplification - return; - } - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.cache.put(key, value); - } - - @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { - V value = cache.computeIfAbsent(key, key1 -> { - try { - return loader.load(key); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - return value; - } - - @Override - public void invalidate(K key) { - this.cache.remove(key); - } - - @Override - public void invalidateAll() { - this.cache.clear(); - } - - @Override - public Iterable keys() { - return this.cache.keySet(); - } - - @Override - public long count() { - return this.cache.size(); - } - - @Override - public void refresh() {} - - @Override - public void close() { - - } - - public static class MockDiskCacheFactory implements Factory { - - static final String NAME = "mockDiskCache"; - final long delay; - final int maxSize; - - MockDiskCacheFactory(long delay, int maxSize) { - this.delay = delay; - this.maxSize = maxSize; - } - - @Override - public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { - return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); - } - - @Override - public String getCacheName() { - return NAME; - } - } - - public static class Builder extends ICacheBuilder { - - int maxSize; - long delay; - - @Override - public ICache build() { - return new MockOnDiskCache(this.maxSize, this.delay); - } - - public Builder setMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - - public Builder setDeliberateDelay(long millis) { - this.delay = millis; - return this; - } - } -} diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java new file mode 100644 index 0000000000000..c68455463ee3d --- /dev/null +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(EhcacheCachePlugin.class); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.EhcacheCachePlugin")) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index a1815d9be2daf..3e002f80ee24d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -81,10 +81,17 @@ public static Collection parameters() { new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } ); } + /* + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } + */ @Override protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + return Settings.builder() + .put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true") + .put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true") + .build(); } // One of the primary purposes of the query cache is to cache aggs results diff --git a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java index c6e970b58ea08..b6710e5e4b424 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java +++ b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java @@ -8,12 +8,15 @@ package org.opensearch.common.cache.service; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.Map; @@ -21,6 +24,7 @@ /** * Service responsible to create caches. */ +@ExperimentalApi public class CacheService { private final Map cacheStoreTypeFactories; @@ -42,8 +46,13 @@ public ICache createCache(CacheConfig config, CacheType cache cacheType.getSettingPrefix() ); String storeName = cacheSettingForCacheType.get(settings); - if (storeName == null || storeName.isBlank()) { - throw new IllegalArgumentException("No configuration exists for cache type: " + cacheType); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // Condition 1: In case feature flag is off, we default to onHeap. + // Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older + // settings, so we again fallback to onHeap to maintain backward compatibility. + // It is guaranteed that we will have this store name registered, so + // should be safe. + storeName = OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME; } if (!cacheStoreTypeFactories.containsKey(storeName)) { throw new IllegalArgumentException("No store name: [" + storeName + "] is registered for cache type: " + cacheType); diff --git a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java index eb4563fda2275..43a047f0f22c6 100644 --- a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java @@ -28,7 +28,7 @@ public class CacheSettings { (key) -> Setting.simpleString(key, "", Setting.Property.NodeScope) ); - public static Setting getConcreteSettingForCacheType(CacheType cacheType) { + public static Setting getConcreteStoreNameSettingForCacheType(CacheType cacheType) { return CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()); } } diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java index d218903de5b6d..c9bec4ba47def 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -15,15 +15,19 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.Map; +import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; /** @@ -111,9 +115,22 @@ public static class OpenSearchOnHeapCacheFactory implements Factory { public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = OpenSearchOnHeapCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); - return new Builder().setMaximumWeightInBytes( + ICacheBuilder builder = new Builder().setMaximumWeightInBytes( ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes() - ).setWeigher(config.getWeigher()).setRemovalListener(config.getRemovalListener()).build(); + ) + .setExpireAfterAccess(((TimeValue) settingList.get(EXPIRE_AFTER_ACCESS_KEY).get(settings))) + .setWeigher(config.getWeigher()) + .setRemovalListener(config.getRemovalListener()); + Setting cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ); + String storeName = cacheSettingForCacheType.get(settings); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // For backward compatibility as the user intent is to use older settings. + builder.setMaximumWeightInBytes(config.getMaxSizeInBytes()); + builder.setExpireAfterAccess(config.getExpireAfterAccess()); + } + return builder.build(); } @Override diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6fefea6578fb9..fa82e9be72e6e 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.function.ToLongBiFunction; @@ -41,12 +42,24 @@ public class CacheConfig { private final RemovalListener removalListener; + /** + * Max size in bytes for the cache. This is needed for backward compatibility. + */ + private final long maxSizeInBytes; + + /** + * Defines the expiration time for a cache entry. This is needed for backward compatibility. + */ + private final TimeValue expireAfterAccess; + private CacheConfig(Builder builder) { this.keyType = builder.keyType; this.valueType = builder.valueType; this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.maxSizeInBytes = builder.maxSizeInBytes; + this.expireAfterAccess = builder.expireAfterAccess; } public Class getKeyType() { @@ -69,6 +82,14 @@ public ToLongBiFunction getWeigher() { return weigher; } + public Long getMaxSizeInBytes() { + return maxSizeInBytes; + } + + public TimeValue getExpireAfterAccess() { + return expireAfterAccess; + } + /** * Builder class to build Cache config related parameters. * @param Type of key. @@ -86,6 +107,10 @@ public static class Builder { private ToLongBiFunction weigher; + private long maxSizeInBytes; + + private TimeValue expireAfterAccess; + public Builder() {} public Builder setSettings(Settings settings) { @@ -113,6 +138,16 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } + public Builder setMaxSizeInBytes(long sizeInBytes) { + this.maxSizeInBytes = sizeInBytes; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAccess) { + this.expireAfterAccess = expireAfterAccess; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java index bfd2d937fb430..5a2964ad011bf 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java @@ -11,6 +11,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.HashMap; @@ -33,9 +34,25 @@ public class OpenSearchOnHeapCacheSettings { (key) -> Setting.memorySizeSetting(key, "1%", NodeScope) ); + /** + * Setting to define expire after access. + * + * Setting pattern: {cache_type}.opensearch_onheap.expire + */ + public static final Setting.AffixSetting EXPIRE_AFTER_ACCESS_SETTING = Setting.suffixKeySetting( + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ".expire", + (key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope) + ); + public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes"; + public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access"; - private static final Map> KEY_SETTING_MAP = Map.of(MAXIMUM_SIZE_IN_BYTES_KEY, MAXIMUM_SIZE_IN_BYTES); + private static final Map> KEY_SETTING_MAP = Map.of( + MAXIMUM_SIZE_IN_BYTES_KEY, + MAXIMUM_SIZE_IN_BYTES, + EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS_SETTING + ); public static final Map>> CACHE_TYPE_MAP = getCacheTypeMap(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1f12971fe4771..83c70e2063ace 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -80,6 +80,8 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -697,6 +699,8 @@ public void apply(Settings value, Settings current, Settings previous) { SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING ), List.of(FeatureFlags.TELEMETRY), - List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY) + List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY), + List.of(FeatureFlags.PLUGGABLE_CACHE), + List.of(CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) ); } diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 90abc0a0765c1..cfc1634e60fcc 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -39,7 +39,8 @@ protected FeatureFlagSettings( FeatureFlags.EXTENSIONS_SETTING, FeatureFlags.IDENTITY_SETTING, FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING, - FeatureFlags.TELEMETRY_SETTING + FeatureFlags.TELEMETRY_SETTING, + FeatureFlags.PLUGGABLE_CACHE_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b89d2d0549823..d811262ab127a 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -55,6 +55,12 @@ public class FeatureFlags { */ public static final String TELEMETRY = "opensearch.experimental.feature.telemetry.enabled"; + /** + * Gates the functionality of pluggable cache. + * Enables OpenSearch to use pluggable caches with respective store names via setting. + */ + public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled"; + /** * Should store the settings from opensearch.yml. */ @@ -100,4 +106,5 @@ public static boolean isEnabled(String featureFlagName) { false, Property.NodeScope ); + public static final Setting PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6d5c23274dbd6..92fb278c946f1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,11 +39,13 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.service.CacheService; +import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -69,6 +71,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.ToLongBiFunction; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -116,22 +119,26 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; + private final ICache cache; private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction, CacheService cacheService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); + ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheEntityLookup = cacheEntityFunction; + this.cache = cacheService.createCache( + new CacheConfig.Builder().setSettings(settings) + .setWeigher(weigher) + .setValueType(BytesReference.class) + .setKeyType(Key.class) + .setRemovalListener(this) + .setMaxSizeInBytes(sizeInBytes) // for backward compatibility + .setExpireAfterAccess(expire) // for backward compatibility + .build(), + CacheType.INDICES_REQUEST_CACHE + ); } @Override @@ -204,7 +211,7 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements LoadAwareCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -403,7 +410,7 @@ synchronized void cleanCache() { /** * Returns the current size of the cache */ - int count() { + long count() { return cache.count(); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 00f13948709cf..f1f7075b5bfdf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -63,6 +63,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -385,7 +386,8 @@ public IndicesService( FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - RecoverySettings recoverySettings + RecoverySettings recoverySettings, + CacheService cacheService ) { this.settings = settings; this.threadPool = threadPool; @@ -402,7 +404,7 @@ public IndicesService( return Optional.empty(); } return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), cacheService); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index aaea8bc274f7b..0972c0c8d8cb0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -810,7 +810,8 @@ protected Node( fileCacheCleaner, searchRequestStats, remoteStoreStatsTrackerFactory, - recoverySettings + recoverySettings, + cacheService ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java index 9d39f8a43ea58..b355161f6f310 100644 --- a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java @@ -10,19 +10,20 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.test.OpenSearchTestCase; import java.util.List; import java.util.Map; -import static junit.framework.TestCase.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -33,22 +34,72 @@ public class CacheServiceTests extends OpenSearchTestCase { public void testWithCreateCacheForIndicesRequestCacheType() { CachePlugin mockPlugin1 = mock(CachePlugin.class); ICache.Factory factory1 = mock(ICache.Factory.class); - Map factoryMap = Map.of("cache1", factory1); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); - - CacheModule cacheModule = new CacheModule( - List.of(mockPlugin1), + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").build() ); CacheConfig config = mock(CacheConfig.class); - ICache onHeapCache = mock(OpenSearchOnHeapCache.class); - when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(onHeapCache); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrue() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, + Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").put(FeatureFlags.PLUGGABLE_CACHE, "true").build() + ); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); - CacheService cacheService = cacheModule.getCacheService(); ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); - assertEquals(onHeapCache, ircCache); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrueAndStoreNameIsNull() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheService cacheService = new CacheService(factoryMap, Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build()); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); } public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { @@ -61,12 +112,29 @@ public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No configuration exists for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + } + + public void testWithCreateCacheWithDefaultStoreNameForIRC() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + Map factoryMap = Map.of("cache1", factory1); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheModule cacheModule = new CacheModule(List.of(mockPlugin1), Settings.EMPTY); + CacheConfig config = mock(CacheConfig.class); + when(config.getSettings()).thenReturn(Settings.EMPTY); + when(config.getWeigher()).thenReturn((k, v) -> 100); + when(config.getRemovalListener()).thenReturn(mock(RemovalListener.class)); + + CacheService cacheService = cacheModule.getCacheService(); + ICache iCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertTrue(iCache instanceof OpenSearchOnHeapCache); } public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { ICache.Factory factory1 = mock(ICache.Factory.class); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); Map factoryMap = Map.of("cache1", factory1); CacheService cacheService = new CacheService( factoryMap, @@ -81,6 +149,6 @@ public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No store name: [cache] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 73728aec12e51..b9cbbb2c65162 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,9 +46,12 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.module.CacheModule; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; @@ -67,6 +70,7 @@ import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Optional; import java.util.UUID; @@ -80,7 +84,69 @@ public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + ); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + + // initial cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // Closing the cache doesn't modify an already returned CacheEntity + if (randomBoolean()) { + reader.close(); + } else { + indexShard.close("test", true, true); // closed shard but reader is still open + cache.clear(entity); + } + cache.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader, writer, dir, cache); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + + public void testBasicOperationsCacheWithFeatureFlag() throws Exception { + IndexShard indexShard = createIndex("test").getShard(0); + CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + cacheService ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -146,7 +212,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -247,7 +313,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -274,7 +341,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -319,7 +387,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -414,7 +482,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..826f1a4fde152 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -156,6 +156,7 @@ import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; +import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -237,6 +238,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -2072,7 +2074,8 @@ public void onFailure(final Exception e) { fileCacheCleaner, null, new RemoteStoreStatsTrackerFactory(clusterService, settings), - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + new CacheModule(new ArrayList<>(), settings).getCacheService() ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService(