Skip to content

Commit

Permalink
Segmented cache changes for TieredCache
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Sep 23, 2024
1 parent 1343367 commit 185cc14
Show file tree
Hide file tree
Showing 18 changed files with 1,573 additions and 551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage, int numberOfSegments) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
Expand All @@ -84,6 +84,12 @@ static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
numberOfSegments
)
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
Expand All @@ -94,7 +100,7 @@ static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
}

public void testPluginsAreInstalled() {
internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings("1%", getNumberOfSegments())).build());
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Expand All @@ -111,7 +117,8 @@ public void testPluginsAreInstalled() {
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build());
int numberOfSegments = getNumberOfSegments();
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -149,7 +156,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio

public void testWithDynamicTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -271,9 +278,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {

public void testInvalidationWithIndicesRequestCache() throws Exception {
int onHeapCacheSizeInBytes = 2000;
int numberOfSegments = getNumberOfSegments();
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -354,10 +362,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
}

public void testWithExplicitCacheClear() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -426,10 +435,13 @@ public void testWithExplicitCacheClear() throws Exception {
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -544,13 +556,17 @@ private RequestCacheStats getRequestCacheStats(Client client, String indexName)
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}

public static int getNumberOfSegments() {
return randomFrom(1, 2, 4, 8, 16, 64, 128, 256);
}

public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, TieredSpilloverCacheIT.MockDiskCachePlugin.class);
}

private final String HEAP_CACHE_SIZE_STRING = "10000B";
private final int HEAP_CACHE_SIZE = 10_000;
private final String index1Name = "index1";
private final String index2Name = "index2";
private static final String HEAP_CACHE_SIZE_STRING = "10000B";
private static final int HEAP_CACHE_SIZE = 10_000;
private static final String index1Name = "index1";
private static final String index2Name = "index2";

/**
* Test aggregating by indices
Expand All @@ -63,7 +63,7 @@ public void testIndicesLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testIndicesAndTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testTierLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
Expand All @@ -205,7 +205,6 @@ public void testTierLevelAggregation() throws Exception {
);
Client client = client();
Map<String, Integer> values = setupCacheForAggregationTests(client);

// Get values for tiers alone and check they add correctly across indices
ImmutableCacheStatsHolder tiersOnlyStatsHolder = getNodeCacheStatsResult(client, List.of(TIER_DIMENSION_NAME));
ImmutableCacheStats totalHeapExpectedStats = returnNullIfAllZero(
Expand Down Expand Up @@ -238,7 +237,7 @@ public void testInvalidLevelsAreIgnored() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING, TieredSpilloverCacheIT.getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
Expand Down Expand Up @@ -289,7 +288,7 @@ public void testStatsMatchOldApi() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING, TieredSpilloverCacheIT.getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
Expand Down Expand Up @@ -342,6 +341,82 @@ public void testStatsMatchOldApi() throws Exception {
assertEquals(oldAPIStats.getMemorySizeInBytes(), totalStats.getSizeInBytes());
}

public void testStatsWithMultipleSegments() throws Exception {
int numberOfSegments = randomFrom(2, 4, 8, 16, 64);
int singleSearchSizeApproxUpperBound = 700; // We know this from other tests and manually verifying
int heap_cache_size_per_segment = singleSearchSizeApproxUpperBound * numberOfSegments; // Worst case if all
// keys land up in same segment, it would still be able to accommodate.
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(heap_cache_size_per_segment * numberOfSegments + "B", numberOfSegments))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
Client client = client();
startIndex(client, index1Name);
// First search one time to calculate item size
searchIndex(client, index1Name, 0);
// get total stats
long singleSearchSize = getTotalStats(client).getSizeInBytes();
// Now try to hit queries same as number of segments. All these should be able to reside inside onHeap cache.
for (int i = 1; i < numberOfSegments; i++) {
searchIndex(client, index1Name, i);
}
ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult(
client,
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME)
);
ImmutableCacheStats index1OnHeapExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(0, numberOfSegments, 0, singleSearchSize * numberOfSegments, numberOfSegments)
);
assertEquals(
index1OnHeapExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP))
);
ImmutableCacheStats index1DiskCacheExpectedStats = returnNullIfAllZero(new ImmutableCacheStats(0, numberOfSegments, 0, 0, 0));
assertEquals(
index1DiskCacheExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK))
);

// Now fire same queries to get some hits
for (int i = 0; i < numberOfSegments; i++) {
searchIndex(client, index1Name, i);
}
allLevelsStatsHolder = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME));
index1OnHeapExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(numberOfSegments, numberOfSegments, 0, singleSearchSize * numberOfSegments, numberOfSegments)
);
assertEquals(
index1OnHeapExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP))
);

// Now try to evict from onheap cache by adding numberOfSegments ^ 2 which will guarantee this.
for (int i = numberOfSegments; i < numberOfSegments + numberOfSegments * numberOfSegments; i++) {
searchIndex(client, index1Name, i);
}
allLevelsStatsHolder = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME));
ImmutableCacheStats onHeapCacheStat = allLevelsStatsHolder.getStatsForDimensionValues(
List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP)
);
// Jut verifying evictions happened as can't fetch the exact number considering we don't have a way to get
// segment number for queries.
assertTrue(onHeapCacheStat.getEvictions() > 0);
ImmutableCacheStats diskCacheStat = allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK));

// Similarly verify items are present on disk cache now
assertEquals(onHeapCacheStat.getEvictions(), diskCacheStat.getItems());
assertTrue(diskCacheStat.getSizeInBytes() > 0);
assertTrue(diskCacheStat.getMisses() > 0);
assertTrue(diskCacheStat.getHits() == 0);
assertTrue(diskCacheStat.getEvictions() == 0);
}

private void startIndex(Client client, String indexName) throws InterruptedException {
assertAcked(
client.admin()
Expand Down Expand Up @@ -373,6 +448,7 @@ private Map<String, Integer> setupCacheForAggregationTests(Client client) throws
searchIndex(client, index1Name, 0);
// get total stats
long singleSearchSize = getTotalStats(client).getSizeInBytes();

int itemsOnHeapAfterTest = HEAP_CACHE_SIZE / (int) singleSearchSize; // As the heap tier evicts, the items on it after the test will
// be the same as its max capacity
int itemsOnDiskAfterTest = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk
Expand Down Expand Up @@ -416,7 +492,6 @@ private Map<String, Integer> setupCacheForAggregationTests(Client client) throws
for (int i = itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i < itemsOnDiskAfterTest + itemsOnHeapAfterTest; i++) {
searchIndex(client, index2Name, i);
}

// Get some hits on all combinations of indices and tiers
for (int i = itemsOnDiskAfterTest; i < itemsOnDiskAfterTest + hitsOnHeapIndex1; i++) {
// heap hits for index 1
Expand Down Expand Up @@ -499,6 +574,7 @@ private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client,
.addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName())
.setIndices(statsFlags)
.get();

// Can always get the first data node as there's only one in this test suite
assertEquals(1, nodeStatsResponse.getNodes().size());
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();
Expand Down
Loading

0 comments on commit 185cc14

Please sign in to comment.