Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x][Tiered caching] Framework changes (#10753) #11971

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return value;
}
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/ICache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Represents a cache interface.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface ICache<K, V> {
V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);

void invalidateAll();

Iterable<K> keys();

long count();

void refresh();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Extends a cache loader with awareness of whether the data is loaded or not.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
boolean isLoaded();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;

/**
* This variant of on-heap cache uses OpenSearch custom cache implementation.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;

private final StoreAwareCacheEventListener<K, V> eventListener;

public OpenSearchOnHeapCache(Builder<K, V> builder) {
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
.setMaximumWeight(builder.getMaxWeightInBytes())
.weigher(builder.getWeigher())
.removalListener(this);
if (builder.getExpireAfterAcess() != null) {
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());
}
cache = cacheBuilder.build();
this.eventListener = builder.getEventListener();
}

@Override
public V get(K key) {
V value = cache.get(key);
if (value != null) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void put(K key, V value) {
cache.put(key, value);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
if (!loader.isLoaded()) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void invalidate(K key) {
cache.invalidate(key);
}

@Override
public void invalidateAll() {
cache.invalidateAll();
}

@Override
public Iterable<K> keys() {
return cache.keys();
}

@Override
public long count() {
return cache.count();
}

@Override
public void refresh() {
cache.refresh();
}

@Override
public CacheStoreType getTierType() {
return CacheStoreType.ON_HEAP;
}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
eventListener.onRemoval(
new StoreAwareCacheRemovalNotification<>(
notification.getKey(),
notification.getValue(),
notification.getRemovalReason(),
CacheStoreType.ON_HEAP
)
);
}

/**
* Builder object
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {

@Override
public StoreAwareCache<K, V> build() {
return new OpenSearchOnHeapCache<K, V>(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Represents a cache with a specific type of store like onHeap, disk etc.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface StoreAwareCache<K, V> extends ICache<K, V> {
CacheStoreType getTierType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Removal notification for store aware cache.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
private final CacheStoreType cacheStoreType;

public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
super(key, value, removalReason);
this.cacheStoreType = cacheStoreType;
}

public CacheStoreType getCacheStoreType() {
return cacheStoreType;
}
}
Loading
Loading