Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered caching] Framework changes #10753

Merged
merged 23 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b196615
[Tiered caching] Framework changes
sgup432 Oct 19, 2023
168076f
Added javadoc for new files/packages
sgup432 Oct 26, 2023
f98abc2
Merge branch 'main' into tiered_caching_framework
sgup432 Oct 26, 2023
8fed7c7
Added changelog
sgup432 Oct 26, 2023
5ace3a7
Fixing javadoc warnings
sgup432 Oct 26, 2023
2e9c478
Addressing comments
sgup432 Nov 22, 2023
c29d346
Addressing additional minor comments
sgup432 Nov 22, 2023
965cbef
Merge branch 'main' into tiered_caching_framework
sgup432 Nov 22, 2023
2e024a2
Moving non null check to builder for OS onHeapCache
sgup432 Nov 22, 2023
51d948f
Adding package-info for new packages
sgup432 Nov 23, 2023
64edcfb
Removing service and adding different cache interfaces along with eve…
sgup432 Dec 7, 2023
d8156af
Merge branch 'main' into tiered_caching_framework
sgup432 Dec 7, 2023
13a03a9
Fixing gradle missingDoc issue
sgup432 Dec 7, 2023
42a111d
Changing listener logic, removing tiered cache integration with IRC
sgup432 Dec 21, 2023
c9cf2c0
Adding opensearch.internal tag for LoadAwareCacheLoader
sgup432 Dec 21, 2023
7318252
Fixing thread safety issue
sgup432 Dec 28, 2023
79dd693
Remove compute function and event listener logic change for TieredCache
sgup432 Dec 28, 2023
7c10adf
Making Cache.compute function private
sgup432 Dec 28, 2023
1832df9
Adding javadoc and more test for cache.put
sgup432 Jan 3, 2024
49aab1f
Adding write locks to refresh API as well
sgup432 Jan 3, 2024
7e72d07
Merge branch 'main' into tiered_caching_framework
sgup432 Jan 3, 2024
fdda280
Removing unwanted EventType class and refactoring one UT
sgup432 Jan 3, 2024
9956abc
Removing TieredCache interface
sgup432 Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753]
(https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,68 +424,74 @@
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");

Check warning on line 491 in server/src/main/java/org/opensearch/common/cache/Cache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/Cache.java#L490-L491

Added lines #L490 - L491 were not covered by tests
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);

Check warning on line 494 in server/src/main/java/org/opensearch/common/cache/Cache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/Cache.java#L493-L494

Added lines #L493 - L494 were not covered by tests
}
return value;
}
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/ICache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Represents a cache interface.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface ICache<K, V> {
V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;
sgup432 marked this conversation as resolved.
Show resolved Hide resolved

void invalidate(K key);

void invalidateAll();

Iterable<K> keys();

long count();

void refresh();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Extends a cache loader with awareness of whether the data is loaded or not.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
boolean isLoaded();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;

/**
* This variant of on-heap cache uses OpenSearch custom cache implementation.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;

private final StoreAwareCacheEventListener<K, V> eventListener;

public OpenSearchOnHeapCache(Builder<K, V> builder) {
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
.setMaximumWeight(builder.getMaxWeightInBytes())
.weigher(builder.getWeigher())
.removalListener(this);
if (builder.getExpireAfterAcess() != null) {
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());

Check warning on line 39 in server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java#L39

Added line #L39 was not covered by tests
}
cache = cacheBuilder.build();
this.eventListener = builder.getEventListener();
}

@Override
public V get(K key) {
V value = cache.get(key);
if (value != null) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void put(K key, V value) {
cache.put(key, value);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
if (!loader.isLoaded()) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void invalidate(K key) {
cache.invalidate(key);
}

@Override
public void invalidateAll() {
cache.invalidateAll();
}

@Override
public Iterable<K> keys() {
return cache.keys();
}

@Override
public long count() {
return cache.count();
}

@Override
public void refresh() {
cache.refresh();
}

@Override
public CacheStoreType getTierType() {
return CacheStoreType.ON_HEAP;
}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
eventListener.onRemoval(
new StoreAwareCacheRemovalNotification<>(
notification.getKey(),
notification.getValue(),
notification.getRemovalReason(),
CacheStoreType.ON_HEAP
)
);
}

/**
* Builder object
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {

@Override
public StoreAwareCache<K, V> build() {
return new OpenSearchOnHeapCache<K, V>(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Represents a cache with a specific type of store like onHeap, disk etc.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface StoreAwareCache<K, V> extends ICache<K, V> {
CacheStoreType getTierType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Removal notification for store aware cache.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
private final CacheStoreType cacheStoreType;

public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
super(key, value, removalReason);
this.cacheStoreType = cacheStoreType;
}

public CacheStoreType getCacheStoreType() {
return cacheStoreType;
}
}
Loading
Loading