Skip to content

Commit

Permalink
Fix ShardNotFoundException during request cache clean up (opensearch-…
Browse files Browse the repository at this point in the history
…project#14219)

* Fix for ShardNotFoundException during request cache clean up

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Added changelog

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Fix forbidden gradle check

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 authored and Peter Alfonsi committed Sep 3, 2024
1 parent 406ae78 commit f85cd00
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final ClusterService clusterService;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public IndicesService(
if (indexService == null) {
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -798,15 +799,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
return new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}),
return new IndicesRequestCache(
settings,
indicesService.indicesRequestCache.cacheEntityLookup,
new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
threadPool,
ClusterServiceUtils.createClusterService(threadPool)
Expand Down Expand Up @@ -1415,6 +1410,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep
IOUtils.close(reader, writer, dir, cache);
}

public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");

assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS);

// Trigger clean up of cache. Should not throw any exception.
cache.cacheCleanupManager.cleanCache();
// Verify all cleared up.
assertEquals(0, cache.count());
IOUtils.close(cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down

0 comments on commit f85cd00

Please sign in to comment.