diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 67061a1533475..3364ed773bc84 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; @@ -649,6 +650,31 @@ public void remove() { }; } + /** + * Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations + * that occurs within the same cache segment by acquiring the segment's read lock during all the iteration. As such, the specified + * consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer + * but modification that occur in non yet traveled segments should be. + * + * @param consumer the {@link Consumer} + */ + public void forEach(BiConsumer consumer) { + for (CacheSegment segment : segments) { + try (ReleasableLock ignored = segment.readLock.acquire()) { + for (CompletableFuture> future : segment.map.values()) { + try { + if (future != null && future.isDone()) { + final Entry entry = future.get(); + consumer.accept(entry.key, entry.value); + } + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + } + } + } + private class CacheIterator implements Iterator> { private Entry current; private Entry next; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index cffc21876a641..1c22aa9b7ef3a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -61,7 +61,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; @@ -213,6 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener pr this.snapshot = snapshotSupplier.get(); this.loaded = true; cleanExistingRegularShardFiles(); + cleanExistingCacheFiles(); this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState; prewarmCache(preWarmListener); } @@ -330,9 +330,6 @@ private static UnsupportedOperationException unsupportedException() { public final void close() { if (closed.compareAndSet(false, true)) { isOpen = false; - // Ideally we could let the cache evict/remove cached files by itself after the - // directory has been closed. - clearCache(); } } @@ -416,6 +413,15 @@ private void cleanExistingRegularShardFiles() { } } + /** + * Evicts all cache files associated to the current searchable snapshot shard in case a + * previous instance of that same shard has been marked as evicted on this node. + */ + private void cleanExistingCacheFiles() { + assert Thread.holdsLock(this); + cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache); + } + private void prewarmCache(ActionListener listener) { if (prewarmCache == false) { recoveryState.setPreWarmComplete(); @@ -566,7 +572,6 @@ public static Directory create( final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID()); Files.createDirectories(cacheDir); - assert assertCacheIsEmpty(cacheDir); return new InMemoryNoOpCommitDirectory( new SearchableSnapshotDirectory( @@ -587,17 +592,6 @@ public static Directory create( ); } - private static boolean assertCacheIsEmpty(Path cacheDir) { - try (DirectoryStream cacheDirStream = Files.newDirectoryStream(cacheDir)) { - final Set cacheFiles = new HashSet<>(); - cacheDirStream.forEach(cacheFiles::add); - assert cacheFiles.isEmpty() : "should start with empty cache, but found " + cacheFiles; - } catch (IOException e) { - assert false : e; - } - return true; - } - public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { while (dir != null) { if (dir instanceof SearchableSnapshotDirectory) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index 1ad219f339419..f3f42ff233d7a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -10,8 +10,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.action.StepListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; @@ -20,16 +24,31 @@ import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class); + private final @Nullable CacheService cacheService; + + public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheService cacheService) { + assert cacheService != null || DiscoveryNode.isDataNode(settings) == false; + this.cacheService = cacheService; + } + @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); @@ -78,4 +97,35 @@ private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) { throw new TranslogException(shardId, "failed to associate a new translog", e); } } + + @Override + public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) { + if (cacheService != null && shouldEvictCacheFiles(reason)) { + final IndexSettings indexSettings = indexService.getIndexSettings(); + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) { + for (IndexShard indexShard : indexService) { + final ShardId shardId = indexShard.shardId(); + + logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason); + cacheService.markShardAsEvictedInCache( + new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) + ), + new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) + ), + shardId + ); + } + } + } + } + + private static boolean shouldEvictCacheFiles(IndexRemovalReason reason) { + return reason == IndexRemovalReason.DELETED + || reason == IndexRemovalReason.NO_LONGER_ASSIGNED + || reason == IndexRemovalReason.FAILURE; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java new file mode 100644 index 0000000000000..c1da42a9e65be --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.nio.file.Path; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; + +/** + * This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If + * the index (or the shard) is a backed by a snapshot this listener notifies the {@link CacheService} that the cache files associated to the + * shard(s) must be evicted. + */ +public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener { + + private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class); + + private final Supplier cacheService; + + public SearchableSnapshotIndexFoldersDeletionListener(Supplier cacheService) { + this.cacheService = Objects.requireNonNull(cacheService); + } + + @Override + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) { + for (int shard = 0; shard < indexSettings.getNumberOfShards(); shard++) { + markShardAsEvictedInCache(new ShardId(index, shard), indexSettings); + } + } + } + + @Override + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) { + markShardAsEvictedInCache(shardId, indexSettings); + } + } + + private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) { + final CacheService cacheService = this.cacheService.get(); + assert cacheService != null : "cache service not initialized"; + + logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId); + cacheService.markShardAsEvictedInCache( + new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) + ), + new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) + ), + shardId + ); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 9a069cfbdfa7a..880616af6f740 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -249,11 +249,19 @@ public Collection createGuiceModules() { @Override public void onIndexModule(IndexModule indexModule) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) { - indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); + indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(settings, cacheService.get())); indexModule.addIndexEventListener(failShardsListener.get()); } } + @Override + public List getIndexFoldersDeletionListeners() { + if (DiscoveryNode.isDataNode(settings)) { + return List.of(new SearchableSnapshotIndexFoldersDeletionListener(cacheService::get)); + } + return List.of(); + } + @Override public Collection getSystemIndexDescriptors(Settings settings) { return org.elasticsearch.common.collect.List.of( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index b3bdaafd96cc0..b71fd9275e650 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -23,17 +24,25 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.SortedSet; @@ -109,6 +118,8 @@ public class CacheService extends AbstractLifecycleComponent { private final ByteSizeValue cacheSize; private final Runnable cacheCleaner; private final ByteSizeValue rangeSize; + private final KeyedLock shardsEvictionLock; + private final Set evictedShards; private volatile int maxCacheFilesToSyncAtOnce; @@ -129,6 +140,8 @@ public CacheService( // are done with reading/writing the cache file .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); + this.shardsEvictionLock = new KeyedLock<>(); + this.evictedShards = ConcurrentCollections.newConcurrentSet(); this.numberOfCacheFilesToSync = new AtomicLong(); this.cacheSyncLock = new ReentrantLock(); this.cacheFilesToSync = new ConcurrentLinkedQueue<>(); @@ -277,6 +290,89 @@ public void removeFromCache(final CacheKey cacheKey) { cache.invalidate(cacheKey); } + /** + * Marks the specified searchable snapshot shard as evicted in cache. Cache files associated with this shard will be evicted from cache. + * + * @param snapshotId the {@link SnapshotId} + * @param indexId the {@link SnapshotId} + * @param shardId the {@link SnapshotId} + */ + public void markShardAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { + final ShardEviction shardEviction = new ShardEviction(snapshotId, indexId, shardId); + if (evictedShards.add(shardEviction)) { + threadPool.generic().submit(new AbstractRunnable() { + @Override + protected void doRun() { + runIfShardMarkedAsEvictedInCache(shardEviction, () -> { + assert shardsEvictionLock.isHeldByCurrentThread(shardEviction); + final Map cacheFilesToEvict = new HashMap<>(); + cache.forEach((cacheKey, cacheFile) -> { + if (shardEviction.matches(cacheKey)) { + cacheFilesToEvict.put(cacheKey, cacheFile); + } + }); + for (Map.Entry cacheFile : cacheFilesToEvict.entrySet()) { + try { + cache.invalidate(cacheFile.getKey(), cacheFile.getValue()); + } catch (RuntimeException e) { + assert false : e; + logger.warn(() -> new ParameterizedMessage("failed to evict cache file {}", cacheFile.getKey()), e); + } + } + }); + } + + @Override + public void onFailure(Exception e) { + assert false : e; + logger.warn( + () -> new ParameterizedMessage("failed to evict cache files associated with evicted shard {}", shardEviction), + e + ); + } + }); + } + } + + /** + * Allows to run the specified {@link Runnable} if the shard represented by the triplet ({@link SnapshotId}, {@link IndexId}, + * {@link SnapshotId}) is still marked as evicted at the time this method is executed. The @link Runnable} will be executed + * while the current thread is holding the lock associated to the shard. + * + * @param snapshotId the snapshot the evicted searchable snapshots shard belongs to + * @param indexId the index in the snapshot the evicted searchable snapshots shard belongs to + * @param shardId the searchable snapshots shard id + * @param runnable a runnable to execute + */ + public void runIfShardMarkedAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId, Runnable runnable) { + runIfShardMarkedAsEvictedInCache(new ShardEviction(snapshotId, indexId, shardId), runnable); + } + + /** + * Allows to run the specified {@link Runnable} if the shard represented by {@link ShardEviction} is still marked as evicted at the time + * this method is executed. The @link Runnable} will be executed while the current thread is holding the lock associated to the shard. + * + * @param shardEviction a {@link ShardEviction} representing the shard marked as evicted + * @param runnable a runnable to execute + */ + private void runIfShardMarkedAsEvictedInCache(ShardEviction shardEviction, Runnable runnable) { + try (Releasable ignored = shardsEvictionLock.acquire(shardEviction)) { + boolean success = false; + try { + if (evictedShards.remove(shardEviction)) { + runnable.run(); + } + success = true; + } finally { + assert success : "shard eviction should be successful: " + shardEviction; + if (success == false) { + final boolean added = evictedShards.add(shardEviction); + assert added : shardEviction; + } + } + } + } + void setCacheSyncInterval(TimeValue interval) { cacheSyncTask.setInterval(interval); } @@ -344,6 +440,13 @@ protected void synchronizeCache() { final long value = numberOfCacheFilesToSync.decrementAndGet(); assert value >= 0 : value; + + final CacheKey cacheKey = cacheFile.getCacheKey(); + if (evictedShards.contains(new ShardEviction(cacheKey.getSnapshotId(), cacheKey.getIndexId(), cacheKey.getShardId()))) { + logger.debug("cache file belongs to a shard marked as evicted, skipping synchronization for [{}]", cacheKey); + continue; + } + final Path cacheFilePath = cacheFile.getFile(); try { final SortedSet> ranges = cacheFile.fsync(); @@ -410,4 +513,47 @@ public String toString() { return "cache_synchronization_task"; } } + + /** + * Represents the searchable snapshots information of a shard that has been removed from the node. These information are kept around + * to evict the cache files associated to that shard. + */ + private static class ShardEviction { + + private final SnapshotId snapshotId; + private final IndexId indexId; + private final ShardId shardId; + + private ShardEviction(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { + this.snapshotId = snapshotId; + this.indexId = indexId; + this.shardId = shardId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardEviction that = (ShardEviction) o; + return Objects.equals(snapshotId, that.snapshotId) + && Objects.equals(indexId, that.indexId) + && Objects.equals(shardId, that.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotId, indexId, shardId); + } + + @Override + public String toString() { + return "[snapshotId=" + snapshotId + ", indexId=" + indexId + ", shardId=" + shardId + ']'; + } + + boolean matches(CacheKey cacheKey) { + return Objects.equals(snapshotId, cacheKey.getSnapshotId()) + && Objects.equals(indexId, cacheKey.getIndexId()) + && Objects.equals(shardId, cacheKey.getShardId()); + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 7a6828d116302..8a1fc0847df62 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; @@ -30,6 +31,7 @@ import java.util.Locale; import java.util.Map; import java.util.SortedSet; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; @@ -207,4 +209,43 @@ public void testPut() throws Exception { } } } + + public void testRunIfShardMarkedAsEvictedInCache() throws Exception { + final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); + + final CacheService cacheService = defaultCacheService(); + cacheService.setCacheSyncInterval(TimeValue.ZERO); + cacheService.start(); + + cacheService.runIfShardMarkedAsEvictedInCache( + snapshotId, + indexId, + shardId, + () -> { assert false : "should not be called: shard is not marked as evicted yet"; } + ); + + // this future is used to block the cache file eviction submitted by markShardAsEvictedInCache + final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); + final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); + + final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, createTempDir()); + cacheFile.acquire(evictionListener); + + cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); + if (randomBoolean()) { + cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); // no effect + } + waitForEviction.get(30L, TimeUnit.SECONDS); + cacheFile.release(evictionListener); + + cacheService.runIfShardMarkedAsEvictedInCache( + snapshotId, + indexId, + shardId, + () -> { assert false : "should not be called: shard eviction marker is removed"; } + ); + cacheService.close(); + } }