Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option #11874

Merged
merged 25 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3afcb42
[Tiered caching] Integrating ehcache disk cache
sgup432 Jan 12, 2024
00f4545
Adding suppressForbidden for File.io used by ehcache
sgup432 Jan 16, 2024
30ad4e9
Fixing gradle build failure
sgup432 Jan 22, 2024
6bcf2f4
Adding changelog
sgup432 Jan 22, 2024
6c18a82
Merge branch 'main' into ehcache_disk_integ
sgup432 Jan 22, 2024
a5e5afd
Exposing ehcache disk cache variant as a plugin
sgup432 Jan 25, 2024
c74992f
Fixing gradle failures
sgup432 Jan 25, 2024
06386b0
Making ICache extend Closeable
sgup432 Jan 25, 2024
8d48951
Exposing plugin based settings and refactoring
sgup432 Jan 29, 2024
8bb9e58
Fixing gradle failures for server subproject
sgup432 Jan 29, 2024
e846ab6
Fixing server:javadoc failure
sgup432 Jan 30, 2024
aa4069b
Fixing cache-ehcache plugin javadov failures
sgup432 Jan 30, 2024
c2236c9
Addressing comment and additional unit tests
sgup432 Jan 31, 2024
65ff2c2
Moving ehcache version info to the plugin
sgup432 Jan 31, 2024
8f861ee
Moving tieredCache to module and refactoring caches interface and lis…
sgup432 Feb 14, 2024
140c4ba
Merge branch 'main' into ehcache_disk_integ
sgup432 Feb 14, 2024
7f20405
Introducing CacheService to create caches
sgup432 Feb 23, 2024
f625b51
Updating cache common build.gradle
sgup432 Feb 23, 2024
2586fa1
Making TieredCachePlugin constructor public
sgup432 Feb 27, 2024
6a2b374
Fixing CacheService unit test
sgup432 Feb 27, 2024
a8ea4c2
Removing unnecessary comments from Tiered cache test
sgup432 Feb 29, 2024
e6ae90e
Fixing ehcache test concurrentPut
sgup432 Feb 29, 2024
e902526
Merge branch 'main' into ehcache_disk_integ
sgup432 Mar 1, 2024
b866ed2
Fixing flaky ehcache test
sgup432 Mar 1, 2024
081e269
Fixing twice registration of same setting
sgup432 Mar 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option ([#11874](https://github.com/opensearch-project/OpenSearch/pull/11874))
- Add support for dependencies in plugin descriptor properties with semver range ([#11441](https://github.com/opensearch-project/OpenSearch/pull/11441))
- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121))
- Introduce query level setting `index.query.max_nested_depth` limiting nested queries ([#3268](https://github.com/opensearch-project/OpenSearch/issues/3268)
Expand Down
17 changes: 17 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
* then items are eventually evicted from it and removed which will result in cache miss.
*
* @param <K> Type of key
* @param <V> Type of value
*
* @opensearch.experimental
*/
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
}

// Package private for testing
ICache<K, V> getOnHeapCache() {
return onHeapCache;
}

// Package private for testing
ICache<K, V> getDiskCache() {
return diskCache;
}

@Override
public V get(K key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
return value;
}
return cacheValue;
}

@Override
public void invalidate(K key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
}
}
}

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidateAll();
}
}
}

/**
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
}

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
}
return count;
}

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.refresh();
}
}
}

@Override
public void close() throws IOException {
for (ICache<K, V> cache : cacheList) {
cache.close();
}
}

private Function<K, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
if (value != null) {
// update hit stats
return value;
} else {
// update miss stats
}
}
}
return null;
};
}

/**
* Factory to create TieredSpilloverCache objects.
*/
public static class TieredSpilloverCacheFactory implements ICache.Factory {

/**
* Defines cache name
*/
public static final String TIERED_SPILLOVER_CACHE_NAME = "tiered_spillover";

/**
* Default constructor
*/
public TieredSpilloverCacheFactory() {}

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
Settings settings = config.getSettings();
Setting<String> onHeapSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String onHeapCacheStoreName = onHeapSetting.get(settings);
if (!cacheFactories.containsKey(onHeapCacheStoreName)) {
throw new IllegalArgumentException(
"No associated onHeapCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory onHeapCacheFactory = cacheFactories.get(onHeapCacheStoreName);

Setting<String> onDiskSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String diskCacheStoreName = onDiskSetting.get(settings);
if (!cacheFactories.containsKey(diskCacheStoreName)) {
throw new IllegalArgumentException(
"No associated diskCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.build();
}

@Override
public String getCacheName() {
return TIERED_SPILLOVER_CACHE_NAME;
}
}

/**
* Builder object for tiered spillover cache.
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;

/**
* Default constructor
*/
public Builder() {}

/**
* Set onHeap cache factory
* @param onHeapCacheFactory Factory for onHeap cache.
* @return builder
*/
public Builder<K, V> setOnHeapCacheFactory(ICache.Factory onHeapCacheFactory) {
this.onHeapCacheFactory = onHeapCacheFactory;
return this;
}

/**
* Set disk cache factory
* @param diskCacheFactory Factory for disk cache.
* @return builder
*/
public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
this.diskCacheFactory = diskCacheFactory;
return this;
}

/**
* Set removal listener for tiered cache.
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = removalListener;
return this;
}

/**
* Set cache config.
* @param cacheConfig cache config.
* @return builder
*/
public Builder<K, V> setCacheConfig(CacheConfig<K, V> cacheConfig) {
this.cacheConfig = cacheConfig;
return this;
}

/**
* Set cache type.
* @param cacheType Cache type
* @return builder
*/
public Builder<K, V> setCacheType(CacheType cacheType) {
this.cacheType = cacheType;
return this;
}

/**
* Set cache factories
* @param cacheFactories cache factories
* @return builder
*/
public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactories) {
this.cacheFactories = cacheFactories;
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
*/
public TieredSpilloverCache<K, V> build() {
return new TieredSpilloverCache<>(this);
}
}
}
Loading
Loading