Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag #12600

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix error in RemoteSegmentStoreDirectory when debug logging is enabled ([#12328](https://github.com/opensearch-project/OpenSearch/pull/12328))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
Expand Down
4 changes: 4 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ ${path.logs}
# Once there is no observed impact on performance, this feature flag can be removed.
#
#opensearch.experimental.optimization.datetime_formatter_caching.enabled: false
#
# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting.
#
#opensearch.experimental.feature.pluggable.caching.enabled: false
2 changes: 2 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* compatible open source license.
*/

apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.build();
}

public void testPluginsAreInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream()
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin"))
);
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build())
.get()
);
indexRandom(
true,
client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"),
client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z")
);
ensureSearchable("index");

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client.prepareSearch("index")
.setSize(0)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(
dateHistogram("histo").field("f")
.timeZone(ZoneId.of("+01:00"))
.minDocCount(0)
.dateHistogramInterval(DateHistogramInterval.MONTH)
)
.get();
assertSearchResponse(r1);

// The cached is actually used
assertThat(
client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(),
greaterThan(0L)
);
}

public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
}

@Override
public String getName() {
return "mock_disk_plugin";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
int maxSize;
long delay;

public MockDiskCache(int maxSize, long delay) {
this.maxSize = maxSize;
this.delay = delay;
this.cache = new ConcurrentHashMap<K, V>();
}

@Override
public V get(K key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
return;
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cache.put(key, value);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return value;
}

@Override
public void invalidate(K key) {
this.cache.remove(key);
}

@Override
public void invalidateAll() {
this.cache.clear();
}

@Override
public Iterable<K> keys() {
return this.cache.keySet();
}

@Override
public long count() {
return this.cache.size();
}

@Override
public void refresh() {}

@Override
public void close() {

}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
final long delay;
final int maxSize;

public MockDiskCacheFactory(long delay, int maxSize) {
this.delay = delay;
this.maxSize = maxSize;
}

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize).setDeliberateDelay(delay).build();
}

@Override
public String getCacheName() {
return NAME;
}
}

public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay);
}

public Builder<K, V> setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}
}
}
Loading
Loading