Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management #13070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2911c25
Update IndicesRequestCache.java
kiranprakash154 Apr 3, 2024
05610d3
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 3, 2024
917df93
Update CHANGELOG.md
kiranprakash154 Apr 3, 2024
ac1464f
revert
kiranprakash154 Apr 4, 2024
f2d72f2
revert
kiranprakash154 Apr 4, 2024
148ff74
Update IndicesRequestCache.java
kiranprakash154 Apr 4, 2024
d8451c2
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 4, 2024
ad88936
Update IndicesRequestCache.java
kiranprakash154 Apr 4, 2024
87e5c46
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 4, 2024
6b36ee6
Update IndicesRequestCache.java
kiranprakash154 Apr 8, 2024
bebfc3a
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
4179f46
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 8, 2024
f42ebb8
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
56f4e1f
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
6630cfc
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
5de80d0
Update IndicesRequestCache.java
kiranprakash154 Apr 8, 2024
139cbfc
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
eaeba38
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
6122aa2
Update IndicesRequestCache.java
kiranprakash154 Apr 9, 2024
e870a98
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
f658de6
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 9, 2024
bed1121
Update IndicesRequestCache.java
kiranprakash154 Apr 9, 2024
e178ea7
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
e14a54c
Merge branch 'kp/stalekey-status-check' of https://github.com/kiranpr…
kiranprakash154 Apr 9, 2024
3a2e852
code comments only
kiranprakash154 Apr 10, 2024
5989e1d
docs changes
kiranprakash154 Apr 10, 2024
68b949b
Update CHANGELOG.md
kiranprakash154 Apr 10, 2024
dd91613
revert catching AlreadyClosedException
kiranprakash154 Apr 11, 2024
707f8bd
assert
kiranprakash154 Apr 11, 2024
00dee8d
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 12, 2024
70cc990
conflicts
kiranprakash154 Apr 12, 2024
b6b5f2b
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 12, 2024
4c69187
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
31d9c54
address comments
kiranprakash154 Apr 12, 2024
9aeb82d
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
8bfa6c4
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
e37282f
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 23, 2024
d5cdf50
address conflicts
kiranprakash154 Apr 23, 2024
9068227
spotless apply
kiranprakash154 Apr 23, 2024
5a15c87
address comments
kiranprakash154 Apr 23, 2024
430e57c
update code comments
kiranprakash154 Apr 23, 2024
581ea2a
address bug & add tests
kiranprakash154 Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047))
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -216,19 +217,16 @@ void clear(CacheEntity entity) {
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
// shards as part of request cache.

// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
key,
notification.getValue(),
notification.getRemovalReason()
);

cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
);
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
}

private ICacheKey<Key> getICacheKey(Key key) {
Expand Down Expand Up @@ -272,10 +270,11 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
} else {
cacheEntity.onHit();
}

return value;
}

Expand Down Expand Up @@ -508,7 +507,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
*
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
return;
}
Expand All @@ -524,8 +523,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
/**
* Handles the eviction of a cache entry.
*
* <p>This method is called when an entry is evicted from the cache.
* We consider all removal notifications except with the reason Replaced
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
*
* @param cleanupKey the CleanupKey that has been evicted from the cache
* @param notification RemovalNotification of the cache entry evicted
*/
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
// does not affect the staleness count, we skip such notifications.
return;
}
if (cleanupKey.entity == null) {
// entity will only be null when the shard is closed/deleted
sohami marked this conversation as resolved.
Show resolved Hide resolved
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
// readers
staleKeysCount.decrementAndGet();
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -535,23 +555,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
// decrement the stale key count
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
// If ShardId is not present or readerCacheKeyId is not present
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
int newValue = currentValue - 1;
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
return newValue == 0 ? null : newValue;
});
return keyCountMap;
// Return the current map
return readerCacheKeyMap;
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
// but do not decrement the staleKeysCount
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
// this should never be null
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount > 0) {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
} else {
// Remove the readerCacheKeyId entry if new count is zero
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
}
});
}

/**
* Updates the count of stale keys in the cache.
* This method is called when a CleanupKey is added to the keysToClean set.
*
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
*
Expand All @@ -569,7 +607,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
ShardId shardId = indexShard.shardId();

// Using computeIfPresent to atomically operate on the countMap for a given shardId
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
if (cleanupKey.readerCacheKeyId == null) {
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
Expand All @@ -578,18 +616,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
return null;
} else {
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
staleKeysCount.addAndGet(v);
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
staleKeysCount.addAndGet(count);
// Return null to remove the key after updating staleKeysCount
return null;
});

// Check if countMap is empty after removal to decide if we need to remove the shardId entry
if (countMap.isEmpty()) {
return null; // Returning null removes the entry for shardId
// Returning null removes the entry for shardId
return null;
}
}
return countMap; // Return the modified countMap to keep the mapping
// Return the modified countMap to retain updates
return countMap;
});
}

Expand Down Expand Up @@ -715,6 +754,11 @@ public void close() {
this.cacheCleaner.close();
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand Down
Loading
Loading