Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IndicesRequestCache clean up logic #13597

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -410,7 +411,8 @@ static class Key implements Accountable, Writeable {
this.shardId = in.readOptionalWriteable(ShardId::new);
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
this.indexShardHashCode = in.readInt();
this.indexShardHashCode = in.readInt(); // We are serializing/de-serializing this as we need to store the
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
}

@Override
Expand Down Expand Up @@ -450,7 +452,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(shardId);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
out.writeInt(indexShardHashCode);
out.writeInt(indexShardHashCode); // We are serializing/de-serializing this as we need to store the
// key as part of tiered/disk cache. The key is not passed between nodes at this point.
}
}

Expand Down Expand Up @@ -713,15 +716,16 @@ private synchronized void cleanCache(double stalenessThreshold) {
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects of a closed shard.
final Set<Object> cleanupKeysFromClosedShards = new HashSet<>();
final Set<Tuple<ShardId, Integer>> cleanupKeysFromClosedShards = new HashSet<>();

for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) {
// null indicates full cleanup, as does a closed shard
ShardId shardId = ((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId();
cleanupKeysFromClosedShards.add(shardId);
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
// Add both shardId and indexShardHashCode to uniquely identify an indexShard.
cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode()));
} else {
cleanupKeysFromOutdatedReaders.add(cleanupKey);
}
Expand All @@ -735,14 +739,22 @@ private synchronized void cleanCache(double stalenessThreshold) {

for (Iterator<ICacheKey<Key>> iterator = cache.keys().iterator(); iterator.hasNext();) {
ICacheKey<Key> key = iterator.next();
if (cleanupKeysFromClosedShards.contains(key.key.shardId)) {
Key delegatingKey = key.key;
if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) {
// Since the shard is closed, the cache should drop stats for this shard.
dimensionListsToDrop.add(key.dimensions);
iterator.remove();
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.key.shardId).orElse(null), key.key.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null);
if (cacheEntity == null) {
// If cache entity is null, it means that index or shard got deleted/closed meanwhile.
// So we will delete this key.
iterator.remove();
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
iterator.remove();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testSerializer() throws Exception {
Random rand = Randomness.get();
for (int valueLength : valueLengths) {
for (int i = 0; i < NUM_KEYS; i++) {
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId());
IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId(), System.identityHashCode(indexShard));
byte[] serialized = ser.serialize(key);
assertTrue(ser.equals(key, serialized));
IndicesRequestCache.Key deserialized = ser.deserialize(serialized);
Expand All @@ -39,13 +39,13 @@ public void testSerializer() throws Exception {
}
}

private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard) {
private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard, int indexShardHashCode) {
byte[] value = new byte[valueLength];
for (int i = 0; i < valueLength; i++) {
value[i] = (byte) (random.nextInt(126 - 32) + 32);
}
BytesReference keyValue = new BytesArray(value);
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), indexShardHashCode); // same UUID
// source as used in real key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingHelper;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -55,12 +62,14 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.AbstractBytesReference;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentHelper;
Expand All @@ -69,9 +78,12 @@
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand All @@ -95,6 +107,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.indices.IndicesRequestCache.INDEX_DIMENSION_NAME;
import static org.opensearch.indices.IndicesRequestCache.INDICES_CACHE_QUERY_SIZE;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
Expand Down Expand Up @@ -1298,6 +1312,113 @@ public void testGetOrComputeConcurrentlyWithMultipleIndices() throws Exception {
executorService.shutdownNow();
}

public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
Index idx = resolveIndex(indexName);
ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry();
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");

// We again try to create a shard with same ShardId
ShardRouting newRouting = shardRouting;
String nodeId = newRouting.currentNodeId();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom");
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
indexShard = indexService.createShard(
newRouting,
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null,
localNode,
null,
DiscoveryNodes.builder().add(localNode).build()
);

// Verify that the new shard requestStats entries are empty.
stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count()); // Still contains the old indexShard stale entry
assertEquals(0, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() == 0);
IndexShardTestCase.updateRoutingEntry(indexShard, newRouting);

// Now we cache again with new IndexShard(same shardId as older one).
dir = newDirectory();
writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
loader = new Loader(reader, 0);
cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
termQuery = new TermQueryBuilder("id", "bar");
termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Assert response and stats. We verify that cache now has 2 entries, one for older/removed shard and other
// for the current shard.
assertEquals("foo", value.streamInput().readString());
stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(2, cache.count()); // One entry for older shard and other for the current shard.
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Trigger clean up of cache.
cache.cacheCleanupManager.cleanCache();
// Verify that cache still has entries for current shard and only removed older shards entries.
assertEquals(1, cache.count());

// Now make current indexShard entries stale as well.
reader.close();
// Trigger clean up of cache and verify that cache has no entries now.
cache.cacheCleanupManager.cleanCache();
assertEquals(0, cache.count());

IOUtils.close(reader, writer, dir, cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down
Loading