Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Fix cache maximum size settings not working properly with pluggable caching #16636

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Fix permissions error on scripted query against remote snapshot ([#16544](https://github.com/opensearch-project/OpenSearch/pull/16544))
- Fix `doc_values` only (`index:false`) IP field searching for masks ([#16628](https://github.com/opensearch-project/OpenSearch/pull/16628))
- Fix max request cache size settings not working properly with pluggable caching ([#16636](https://github.com/opensearch-project/OpenSearch/pull/16636))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the onHeap cache size to be used within tiered cache.
* This setting overrides size settings from the heap tier implementation.
* For example, if OpenSearchOnHeapCache is the heap tier in the request cache, and
* indices.requests.cache.opensearch_onheap.size is set, that value will be ignored in favor of this setting.
*
* Pattern: {cache_type}.tiered_spillover.onheap.store.size
* Example: indices.request.cache.tiered_spillover.onheap.store.size
Expand All @@ -96,6 +99,7 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the disk cache size to be used within tiered cache.
* Similarly, this setting overrides the size setting from the disk tier implementation.
*/
public static final Setting.AffixSetting<Long> TIERED_SPILLOVER_DISK_STORE_SIZE = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void close() {

}

long getMaxSize() {
return maxSize;
}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.MIN_DISK_CACHE_SIZE_IN_BYTES;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
Expand Down Expand Up @@ -2112,6 +2113,139 @@ public void testTieredCacheDefaultSegmentCount() {
assertTrue(VALID_SEGMENT_COUNT_VALUES.contains(tieredSpilloverCache.getNumberOfSegments()));
}

public void testSegmentSizesWhenUsingFactory() {
// The TSC's tier size settings, TIERED_SPILLOVER_ONHEAP_STORE_SIZE and TIERED_SPILLOVER_DISK_STORE_SIZE,
// should always be respected, overriding the individual implementation's size settings if present
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
long heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;
int numSegments = getNumberOfSegments();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
// These two size settings should be honored
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedHeapSize + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedDiskSize
)
// The size setting from the OpenSearchOnHeap implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()).getKey(),
numSegments
)
.build();
String storagePath = getStoragePath(settings);

TieredSpilloverCache<String, String> tieredSpilloverCache = (TieredSpilloverCache<
String,
String>) new TieredSpilloverCache.TieredSpilloverCacheFactory().create(
new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
// 20_000_000 ns = 20 ms to compute
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME,
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
MockDiskCache.MockDiskCacheFactory.NAME,
// The size value passed in here acts as the "implementation setting" for the disk tier, and should also be ignored
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, false, keyValueSize)
)
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

public void testSegmentSizesWhenNotUsingFactory() {
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
int heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;

Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
// The size setting from the OpenSearchOnHeapCache implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.build();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
int numSegments = getNumberOfSegments();
CacheConfig<String, String> cacheConfig = getCacheConfig(1, settings, removalListener, numSegments);
TieredSpilloverCache<String, String> tieredSpilloverCache = getTieredSpilloverCache(
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, true, keyValueSize),
cacheConfig,
null,
removalListener,
numSegments,
expectedHeapSize,
expectedDiskSize
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

private void checkSegmentSizes(TieredSpilloverCache<String, String> cache, long expectedHeapSize, long expectedDiskSize) {
OpenSearchOnHeapCache<String, String> segmentHeapCache = (OpenSearchOnHeapCache<
String,
String>) cache.tieredSpilloverCacheSegments[0].getOnHeapCache();
assertEquals(expectedHeapSize / cache.getNumberOfSegments(), segmentHeapCache.getMaximumWeight());

MockDiskCache<String, String> segmentDiskCache = (MockDiskCache<String, String>) cache.tieredSpilloverCacheSegments[0]
.getDiskCache();
assertEquals(expectedDiskSize / cache.getNumberOfSegments(), segmentDiskCache.getMaxSize());
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down Expand Up @@ -2401,9 +2535,9 @@ private void verifyComputeIfAbsentThrowsException(
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class EhcacheDiskCacheSettings {

/**
* Disk cache max size setting.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*/
public static final Setting.AffixSetting<Long> DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ private V deserializeValue(ByteArrayWrapper binary) {
return valueSerializer.deserialize(binary.value);
}

// Pkg-private for testing.
long getMaxWeightInBytes() {
return maxWeightInBytes;
}

/**
* Factory to create an ehcache disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -1201,6 +1203,65 @@ public void testEhcacheCloseWithDestroyCacheMethodThrowingException() throws Exc
ehcacheDiskCache.close();
}

public void testWithCacheConfigSizeSettings() throws Exception {
// The cache should get its size from the config if present, and otherwise should get it from the setting.
long maxSizeFromSetting = between(MINIMUM_MAX_SIZE_IN_BYTES + 1000, MINIMUM_MAX_SIZE_IN_BYTES + 2000);
long maxSizeFromConfig = between(MINIMUM_MAX_SIZE_IN_BYTES + 3000, MINIMUM_MAX_SIZE_IN_BYTES + 4000);

EhcacheDiskCache<String, String> cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, false);
assertEquals(maxSizeFromSetting, cache.getMaxWeightInBytes());

cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, true);
assertEquals(maxSizeFromConfig, cache.getMaxWeightInBytes());
}

// Modified from OpenSearchOnHeapCacheTests. Can't reuse, as we can't add a dependency on the server.test module.
private EhcacheDiskCache<String, String> setupMaxSizeTest(long maxSizeFromSetting, long maxSizeFromConfig, boolean putSizeInConfig)
throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) {
Settings settings = Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, true)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_MAX_SIZE_IN_BYTES_KEY)
.getKey(),
maxSizeFromSetting
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache/" + 0
)
.build();

CacheConfig.Builder<String, String> cacheConfigBuilder = new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setWeigher(getWeigher())
.setRemovalListener(listener)
.setSettings(settings)
.setDimensionNames(List.of(dimensionName))
.setStatsTrackingEnabled(true);
if (putSizeInConfig) {
cacheConfigBuilder.setMaxSizeInBytes(maxSizeFromConfig);
}

ICache.Factory cacheFactory = new EhcacheDiskCache.EhcacheDiskCacheFactory();
return (EhcacheDiskCache<String, String>) cacheFactory.create(
cacheConfigBuilder.build(),
CacheType.INDICES_REQUEST_CACHE,
null
);
}
}

static class MockEhcahceDiskCache extends EhcacheDiskCache<String, String> {

public MockEhcahceDiskCache(Builder<String, String> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public <K, V> ICache<K, V> createCache(CacheConfig<K, V> config, CacheType cache
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
if (!pluggableCachingEnabled(cacheType, settings)) {
// Condition 1: In case feature flag is off, we default to onHeap.
// Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older
// settings, so we again fallback to onHeap to maintain backward compatibility.
Expand All @@ -74,4 +74,15 @@ public NodeCacheStats stats(CommonStatsFlags flags) {
}
return new NodeCacheStats(statsMap, flags);
}

/**
* Check if pluggable caching is on, and if a store type is present for this cache type.
*/
public static boolean pluggableCachingEnabled(CacheType cacheType, Settings settings) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
return FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) && storeName != null && !storeName.isBlank();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
Expand Down Expand Up @@ -80,8 +81,8 @@ public OpenSearchOnHeapCache(Builder<K, V> builder) {
this.weigher = builder.getWeigher();
}

// package private for testing
long getMaximumWeight() {
// public for testing
public long getMaximumWeight() {
return this.maximumWeight;
}

Expand Down Expand Up @@ -192,8 +193,12 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
long maxSizeInBytes = ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes();

if (config.getMaxSizeInBytes() > 0) { // If this is passed from upstream(like tieredCache), then use this
// instead.
if (config.getMaxSizeInBytes() > 0) {
/*
Use the cache config value if present.
This can be passed down from the TieredSpilloverCache when creating individual segments,
but is not passed in from the IRC if pluggable caching is on.
*/
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes());
} else {
builder.setMaximumWeightInBytes(maxSizeInBytes);
Expand All @@ -204,8 +209,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
builder.setNumberOfSegments(-1); // By default it will use 256 segments.
}

String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
if (!CacheService.pluggableCachingEnabled(cacheType, settings)) {
// For backward compatibility as the user intent is to use older settings.
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes());
builder.setExpireAfterAccess(config.getExpireAfterAccess());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class OpenSearchOnHeapCacheSettings {

/**
* Setting to define maximum size for the cache as a percentage of heap memory available.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*
* Setting pattern: {cache_type}.opensearch_onheap.size
*/
Expand Down
Loading
Loading