Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Segmented cache changes #16047

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

public class TieredSpilloverCacheBaseIT extends OpenSearchIntegTestCase {

public Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage, int numberOfSegments) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
numberOfSegments
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public int getNumberOfSegments() {
return randomFrom(1, 2, 4, 8, 16, 64, 128, 256);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.IndicesRequestCache;
Expand All @@ -43,13 +39,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -58,43 +56,15 @@
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
public class TieredSpilloverCacheIT extends TieredSpilloverCacheBaseIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public void testPluginsAreInstalled() {
internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings("1%", getNumberOfSegments())).build());
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Expand All @@ -111,7 +81,8 @@ public void testPluginsAreInstalled() {
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build());
int numberOfSegments = getNumberOfSegments();
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -147,9 +118,97 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
// just a bit higher so that each segment can atleast hold 1 entry.
int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = numberOfSegments + 1; // Best case if all keys are distributed among different
// segment, atleast one of the segment will have 2 entries and we will see evictions.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
// Considering disk cache won't be used due to took time policy having a high value, we expect overall cache
// size to be less than or equal to onHeapCache size.
assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes);
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
// We should atleast one eviction considering disk cache isn't able to hold anything due to policy.
assertTrue(requestCacheStats.getEvictions() > 0);
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
for (int iterator = 0; iterator < numberOfIndexedItems * 2; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery(UUID.randomUUID().toString(), UUID.randomUUID().toString()))
.get();
assertSearchResponse(resp);
}

requestCacheStats = getRequestCacheStats(client, "index");
// We shouldn't see any new evictions now.
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
}

public void testWithDynamicTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -271,9 +330,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {

public void testInvalidationWithIndicesRequestCache() throws Exception {
int onHeapCacheSizeInBytes = 2000;
int numberOfSegments = getNumberOfSegments();
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -354,10 +414,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
}

public void testWithExplicitCacheClear() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -426,10 +487,13 @@ public void testWithExplicitCacheClear() throws Exception {
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
// that all items are
// cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -540,6 +604,27 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
}

public void testWithInvalidSegmentNumberSetting() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
assertThrows(
String.format(
Locale.ROOT,
INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE,
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
),
IllegalArgumentException.class,
() -> internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b", 300))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
)
);
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
Expand All @@ -550,7 +635,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
}

@Override
Expand Down
Loading
Loading