From 8cf6dad3891917d5e8007702dbf0fcbfd36515ae Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 14 Jan 2021 15:38:39 +0100 Subject: [PATCH] Improve shards evictions in searchable snapshot cache service (#67160) The searchable snapshot's cache service is notified when cache files of a specific shard must be evicted. The notifications are usually done in a cluster state applier thread that calls the CacheService# markShardAsEvictedInCache method. The markShardAsEvictedInCache adds the shard to an internal set of ShardEviction and submits the eviction of the shard to the generic thread pool. Because there's nothing preventing the cache service (and persistent cache service) to be closed before all shared evictions are processed, it is possible that invalidating a cache file fails and trips an assertion (as it happened in many tests failures recently #66958, #66730). This commit changes the CacheService so that it now waits for the evictions of shards to complete before closing the cache and persistent cache services. --- ...bleSnapshotsPersistentCacheIntegTests.java | 16 +- .../store/SearchableSnapshotDirectory.java | 8 +- .../SearchableSnapshotIndexEventListener.java | 14 +- ...eSnapshotIndexFoldersDeletionListener.java | 14 +- .../cache/CacheService.java | 240 ++++++++++++------ .../AbstractSearchableSnapshotsTestCase.java | 60 +++++ .../cache/CacheServiceTests.java | 190 +++++++++++--- .../cache/PersistentCacheTests.java | 80 +----- 8 files changed, 401 insertions(+), 221 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index 7cf678cda3977..5ffeb66ba1ce8 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -112,10 +112,10 @@ public void testCacheSurviveRestart() throws Exception { } assertFalse("no cache files found", cacheFiles.isEmpty()); - CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); + final CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); cacheService.synchronizeCache(); - PersistentCache persistentCache = cacheService.getPersistentCache(); + final PersistentCache persistentCache = cacheService.getPersistentCache(); assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @@ -142,21 +142,19 @@ public Settings onNodeStopped(String nodeName) { } }); - cacheService = internalCluster().getInstance(CacheService.class, dataNode); - persistentCache = cacheService.getPersistentCache(); + final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode); + final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache(); ensureGreen(restoredIndexName); cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); - assertThat("Cache files should be repopulated in cache", persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size())); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); assertBusy(() -> { cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))); - assertTrue(internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache().hasDeletions()); + cacheServiceAfterRestart.synchronizeCache(); + assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L)); }); - cacheService.synchronizeCache(); - - assertThat(persistentCache.getNumDocs(), equalTo(0L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 5b54bfd67d24c..e648fc4209e13 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -212,7 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener pr this.snapshot = snapshotSupplier.get(); this.loaded = true; cleanExistingRegularShardFiles(); - cleanExistingCacheFiles(); + waitForPendingEvictions(); this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState; prewarmCache(preWarmListener); } @@ -428,12 +428,12 @@ private void cleanExistingRegularShardFiles() { } /** - * Evicts all cache files associated to the current searchable snapshot shard in case a + * Waits for the eviction of cache files associated with the current searchable snapshot shard to be processed in case a * previous instance of that same shard has been marked as evicted on this node. */ - private void cleanExistingCacheFiles() { + private void waitForPendingEvictions() { assert Thread.holdsLock(this); - cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache); + cacheService.waitForCacheFilesEvictionIfNeeded(snapshotId.getUUID(), indexId.getName(), shardId); } private void prewarmCache(ActionListener listener) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index f3ede77a0c58d..c293e2e8c81bd 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -25,18 +25,14 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { @@ -116,14 +112,8 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason); cacheService.markShardAsEvictedInCache( - new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) - ), - new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) - ), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), shardId ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java index c1da42a9e65be..c1d88c5fcc52d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexFoldersDeletionListener.java @@ -12,18 +12,14 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.IndexStorePlugin; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import java.util.Objects; import java.util.function.Supplier; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; /** * This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If @@ -62,14 +58,8 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId); cacheService.markShardAsEvictedInCache( - new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) - ), - new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()) - ), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), shardId ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index 2d561c5847095..97c7ef847d87f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -16,7 +17,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -25,8 +25,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -40,15 +39,21 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; @@ -140,10 +145,12 @@ public class CacheService extends AbstractLifecycleComponent { private final ByteSizeValue cacheSize; private final ByteSizeValue rangeSize; private final ByteSizeValue recoveryRangeSize; - private final KeyedLock shardsEvictionLock; - private final Set evictedShards; + private final Map> pendingShardsEvictions; + private final ReadWriteLock shardsEvictionsLock; + private final Object shardsEvictionsMutex; private volatile int maxCacheFilesToSyncAtOnce; + private boolean allowShardsEvictions; public CacheService( final Settings settings, @@ -163,8 +170,6 @@ public CacheService( .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); this.persistentCache = Objects.requireNonNull(persistentCache); - this.shardsEvictionLock = new KeyedLock<>(); - this.evictedShards = ConcurrentCollections.newConcurrentSet(); this.numberOfCacheFilesToSync = new AtomicLong(); this.cacheSyncLock = new ReentrantLock(); this.cacheFilesToSync = new ConcurrentLinkedQueue<>(); @@ -174,6 +179,10 @@ public CacheService( this.cacheSyncTask = new CacheSynchronizationTask(threadPool, SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING, this::setCacheSyncInterval); this.cacheSyncStopTimeout = SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT.get(settings); + this.shardsEvictionsLock = new ReentrantReadWriteLock(); + this.pendingShardsEvictions = new HashMap<>(); + this.shardsEvictionsMutex = new Object(); + this.allowShardsEvictions = true; } public static Path getShardCachePath(ShardPath shardPath) { @@ -206,12 +215,16 @@ protected void doStop() { cacheSyncTask.close(); } finally { try { - persistentCache.close(); - } catch (Exception e) { - logger.warn("failed to close persistent cache", e); + processAllPendingShardsEvictions(); } finally { - if (acquired) { - cacheSyncLock.unlock(); + try { + persistentCache.close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache", e); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } } } } @@ -338,82 +351,138 @@ public void removeFromCache(final CacheKey cacheKey) { /** * Marks the specified searchable snapshot shard as evicted in cache. Cache files associated with this shard will be evicted from cache. * - * @param snapshotId the {@link SnapshotId} - * @param indexId the {@link SnapshotId} - * @param shardId the {@link SnapshotId} + * @param snapshotUUID the snapshot's unique identifier + * @param snapshotIndexName the name of the index in the snapshot + * @param shardId the {@link ShardId} */ - public void markShardAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - final ShardEviction shardEviction = new ShardEviction(snapshotId.getUUID(), indexId.getName(), shardId); - if (evictedShards.add(shardEviction)) { - threadPool.generic().submit(new AbstractRunnable() { - @Override - protected void doRun() { - runIfShardMarkedAsEvictedInCache(shardEviction, () -> { - assert shardsEvictionLock.isHeldByCurrentThread(shardEviction); - final Map cacheFilesToEvict = new HashMap<>(); - cache.forEach((cacheKey, cacheFile) -> { - if (shardEviction.matches(cacheKey)) { - cacheFilesToEvict.put(cacheKey, cacheFile); - } - }); - for (Map.Entry cacheFile : cacheFilesToEvict.entrySet()) { - try { - cache.invalidate(cacheFile.getKey(), cacheFile.getValue()); - } catch (RuntimeException e) { - assert false : e; - logger.warn(() -> new ParameterizedMessage("failed to evict cache file {}", cacheFile.getKey()), e); - } - } - }); - } + public void markShardAsEvictedInCache(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + synchronized (shardsEvictionsMutex) { + if (allowShardsEvictions) { + final ShardEviction shardEviction = new ShardEviction(snapshotUUID, snapshotIndexName, shardId); + pendingShardsEvictions.computeIfAbsent(shardEviction, shard -> threadPool.generic().submit(new AbstractRunnable() { + @Override + protected void doRun() { + processShardEviction(shardEviction); + } - @Override - public void onFailure(Exception e) { - assert false : e; - logger.warn( - () -> new ParameterizedMessage("failed to evict cache files associated with evicted shard {}", shardEviction), - e - ); - } - }); + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("failed to evict cache files associated with shard {}", shardEviction), + e + ); + assert false : e; + } + })); + } } } /** - * Allows to run the specified {@link Runnable} if the shard represented by the triplet ({@link SnapshotId}, {@link IndexId}, - * {@link SnapshotId}) is still marked as evicted at the time this method is executed. The @link Runnable} will be executed - * while the current thread is holding the lock associated to the shard. + * Waits for the cache files associated with the shard represented by ({@link SnapshotId}, {@link IndexId}, {@link SnapshotId}) to be + * evicted if the shard is marked as evicted in cache at the time this method is executed. * - * @param snapshotId the snapshot the evicted searchable snapshots shard belongs to - * @param indexId the index in the snapshot the evicted searchable snapshots shard belongs to - * @param shardId the searchable snapshots shard id - * @param runnable a runnable to execute + * @param snapshotUUID the snapshot's unique identifier + * @param snapshotIndexName the name of the index in the snapshot + * @param shardId the {@link ShardId} */ - public void runIfShardMarkedAsEvictedInCache(SnapshotId snapshotId, IndexId indexId, ShardId shardId, Runnable runnable) { - runIfShardMarkedAsEvictedInCache(new ShardEviction(snapshotId.getUUID(), indexId.getName(), shardId), runnable); + public void waitForCacheFilesEvictionIfNeeded(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + assert assertGenericThreadPool(); + final Future future; + synchronized (shardsEvictionsMutex) { + if (allowShardsEvictions == false) { + throw new AlreadyClosedException("Cannot wait for shard eviction to be processed, cache is stopping"); + } + future = pendingShardsEvictions.get(new ShardEviction(snapshotUUID, snapshotIndexName, shardId)); + if (future == null) { + return; + } + } + FutureUtils.get(future); } /** - * Allows to run the specified {@link Runnable} if the shard represented by {@link ShardEviction} is still marked as evicted at the time - * this method is executed. The @link Runnable} will be executed while the current thread is holding the lock associated to the shard. + * Evicts the cache files associated to the specified {@link ShardEviction}. * - * @param shardEviction a {@link ShardEviction} representing the shard marked as evicted - * @param runnable a runnable to execute + * @param shardEviction the shard eviction to process */ - private void runIfShardMarkedAsEvictedInCache(ShardEviction shardEviction, Runnable runnable) { - try (Releasable ignored = shardsEvictionLock.acquire(shardEviction)) { - boolean success = false; + private void processShardEviction(ShardEviction shardEviction) { + assert isPendingShardEviction(shardEviction) : "shard is not marked as evicted: " + shardEviction; + assert assertGenericThreadPool(); + + shardsEvictionsLock.readLock().lock(); + try { try { - if (evictedShards.remove(shardEviction)) { - runnable.run(); + final boolean canEvict; + synchronized (shardsEvictionsMutex) { + canEvict = allowShardsEvictions; + } + if (canEvict) { + final List cacheFilesToEvict = new ArrayList<>(); + cache.forEach((cacheKey, cacheFile) -> { + if (shardEviction.matches(cacheKey)) { + cacheFilesToEvict.add(cacheFile); + } + }); + for (CacheFile cacheFile : cacheFilesToEvict) { + try { + cache.invalidate(cacheFile.getCacheKey(), cacheFile); + } catch (RuntimeException e) { + logger.warn(() -> new ParameterizedMessage("failed to evict cache file {}", cacheFile.getCacheKey()), e); + assert false : e; + } + } + logger.debug( + "shard eviction [{}] processed with [{}] cache files invalidated", + shardEviction, + cacheFilesToEvict.size() + ); } - success = true; } finally { - if (success == false) { - final boolean added = evictedShards.add(shardEviction); - assert added : shardEviction; + synchronized (shardsEvictionsMutex) { + final Future removedFuture = pendingShardsEvictions.remove(shardEviction); + assert removedFuture != null; } } + } finally { + shardsEvictionsLock.readLock().unlock(); + } + } + + /** + * Processes and waits for all pending shard evictions to complete. + */ + private void processAllPendingShardsEvictions() { + synchronized (shardsEvictionsMutex) { + allowShardsEvictions = false; + } + boolean success = false; + try { + if (shardsEvictionsLock.writeLock().tryLock(10L, TimeUnit.SECONDS) == false) { + logger.warn("waiting for shards evictions to be processed"); + shardsEvictionsLock.writeLock().lock(); // wait indefinitely + } + success = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("interrupted while waiting shards evictions to be processed", e); + } finally { + if (success) { + shardsEvictionsLock.writeLock().unlock(); + } + } + } + + boolean isPendingShardEviction(ShardEviction shardEviction) { + synchronized (shardsEvictionsMutex) { + return pendingShardsEvictions.get(shardEviction) != null; + } + } + + // used in tests + Map> pendingShardsEvictions() { + synchronized (shardsEvictionsMutex) { + return org.elasticsearch.common.collect.Map.copyOf(pendingShardsEvictions); } } @@ -497,14 +566,6 @@ public void synchronizeCache() { final long value = numberOfCacheFilesToSync.decrementAndGet(); assert value >= 0 : value; - final CacheKey cacheKey = cacheFile.getCacheKey(); - if (evictedShards.contains( - new ShardEviction(cacheKey.getSnapshotUUID(), cacheKey.getSnapshotIndexName(), cacheKey.getShardId()) - )) { - logger.debug("cache file belongs to a shard marked as evicted, skipping synchronization for [{}]", cacheKey); - continue; - } - final Path cacheFilePath = cacheFile.getFile(); try { final SortedSet> ranges = cacheFile.fsync(); @@ -589,18 +650,30 @@ public String toString() { * Represents the searchable snapshots information of a shard that has been removed from the node. These information are kept around * to evict the cache files associated to that shard. */ - private static class ShardEviction { + static class ShardEviction { private final String snapshotUUID; private final String snapshotIndexName; private final ShardId shardId; - private ShardEviction(String snapshotUUID, String snapshotIndexName, ShardId shardId) { + ShardEviction(String snapshotUUID, String snapshotIndexName, ShardId shardId) { this.snapshotUUID = snapshotUUID; this.snapshotIndexName = snapshotIndexName; this.shardId = shardId; } + public String getSnapshotUUID() { + return snapshotUUID; + } + + public String getSnapshotIndexName() { + return snapshotIndexName; + } + + public ShardId getShardId() { + return shardId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -627,4 +700,11 @@ boolean matches(CacheKey cacheKey) { && Objects.equals(shardId, cacheKey.getShardId()); } } + + private static boolean assertGenericThreadPool() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains('[' + ThreadPool.Names.GENERIC + ']') + || threadName.startsWith("TEST-") : "expected generic thread pool but got " + threadName; + return true; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 7475709dd067e..2185d3c4b0e57 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Set; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.settings.ClusterSettings; @@ -25,6 +26,8 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -40,10 +43,20 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; + public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTestCase { private static final ClusterSettings CLUSTER_SETTINGS = new ClusterSettings( @@ -190,4 +203,51 @@ protected static void assertThreadPoolNotBusy(ThreadPool threadPool) throws Exce } }, 30L, TimeUnit.SECONDS); } + + /** + * Generates one or more cache files using the specified {@link CacheService}. Each cache files have been written at least once. + */ + protected List randomCacheFiles(CacheService cacheService) throws Exception { + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + + final List cacheFiles = new ArrayList<>(); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + final String snapshotUUID = UUIDs.randomBase64UUID(random()); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotUUID) + ); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotUUID, indexId.getName(), shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(100L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + SortedSet> ranges = Collections.emptySortedSet(); + while (ranges.isEmpty()) { + ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } + cacheFiles.add(cacheFile); + } finally { + cacheFile.release(listener); + } + } + } + } + } + return cacheFiles; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 4139cc17594a5..0177a13c09c91 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -8,21 +8,21 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.ShardEviction; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -30,15 +30,20 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.stream.Collectors; import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; +import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -221,44 +226,161 @@ public void testPut() throws Exception { } } - public void testRunIfShardMarkedAsEvictedInCache() throws Exception { - final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); - final Path cacheDir = Files.createDirectories(resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID())); - + public void testMarkShardAsEvictedInCache() throws Exception { final CacheService cacheService = defaultCacheService(); - cacheService.setCacheSyncInterval(TimeValue.ZERO); cacheService.start(); - cacheService.runIfShardMarkedAsEvictedInCache( - snapshotId, - indexId, - shardId, - () -> { assert false : "should not be called: shard is not marked as evicted yet"; } - ); + final List randomCacheFiles = randomCacheFiles(cacheService); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); - // this future is used to block the cache file eviction submitted by markShardAsEvictedInCache - final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); - final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); + final ShardEviction shard = randomShardEvictionFrom(randomCacheFiles); + final List cacheFilesAssociatedWithShard = filterByShard(shard, randomCacheFiles); + cacheFilesAssociatedWithShard.forEach(cacheFile -> assertTrue(Files.exists(cacheFile.getFile()))); - final CacheKey cacheKey = new CacheKey(snapshotId.getUUID(), indexId.getName(), shardId, "_0.dvd"); - final CacheFile cacheFile = cacheService.get(cacheKey, 100, cacheDir); - cacheFile.acquire(evictionListener); + final BlockingEvictionListener blockingListener = new BlockingEvictionListener(); + final CacheFile randomCacheFile = randomFrom(cacheFilesAssociatedWithShard); + assertTrue(Files.exists(randomCacheFile.getFile())); + randomCacheFile.acquire(blockingListener); + + final List randomEvictedCacheFiles = randomSubsetOf(randomCacheFiles); + for (CacheFile randomEvictedCacheFile : randomEvictedCacheFiles) { + if (randomEvictedCacheFile != randomCacheFile) { + cacheService.removeFromCache(randomEvictedCacheFile.getCacheKey()); + } + } + + for (int i = 0; i < between(1, 3); i++) { + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + } + + blockingListener.waitForBlock(); + + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(1)); + assertTrue(cacheService.isPendingShardEviction(shard)); + + blockingListener.unblock(); + + assertBusy(() -> assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0))); + + for (CacheFile cacheFile : randomCacheFiles) { + final boolean evicted = cacheFilesAssociatedWithShard.contains(cacheFile) || randomEvictedCacheFiles.contains(cacheFile); + assertThat( + "Cache file [" + cacheFile + "] should " + (evicted ? "be deleted" : "exist"), + Files.notExists(cacheFile.getFile()), + equalTo(evicted) + ); + } + cacheService.close(); - cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); if (randomBoolean()) { - cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); // no effect + // mark shard as evicted after cache service is stopped should have no effect + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); } - waitForEviction.get(30L, TimeUnit.SECONDS); - cacheFile.release(evictionListener); - - cacheService.runIfShardMarkedAsEvictedInCache( - snapshotId, - indexId, - shardId, - () -> { assert false : "should not be called: shard eviction marker is removed"; } + } + + public void testProcessShardEviction() throws Exception { + final CacheService cacheService = defaultCacheService(); + cacheService.start(); + + final List randomCacheFiles = randomCacheFiles(cacheService); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); + + final ShardEviction shard = randomShardEvictionFrom(randomCacheFiles); + final List cacheFilesAssociatedWithShard = filterByShard(shard, randomCacheFiles); + cacheFilesAssociatedWithShard.forEach(cacheFile -> assertTrue(Files.exists(cacheFile.getFile()))); + + final BlockingEvictionListener blockingListener = new BlockingEvictionListener(); + final CacheFile randomCacheFile = randomFrom(cacheFilesAssociatedWithShard); + assertTrue(Files.exists(randomCacheFile.getFile())); + randomCacheFile.acquire(blockingListener); + + cacheService.markShardAsEvictedInCache(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + + final Map afterShardRecoveryCacheFiles = ConcurrentCollections.newConcurrentMap(); + final Future waitForShardEvictionFuture = threadPool.generic().submit(() -> { + cacheService.waitForCacheFilesEvictionIfNeeded(shard.getSnapshotUUID(), shard.getSnapshotIndexName(), shard.getShardId()); + for (CacheFile cacheFile : cacheFilesAssociatedWithShard) { + afterShardRecoveryCacheFiles.put(cacheFile, Files.exists(cacheFile.getFile())); + } + }); + + blockingListener.waitForBlock(); + + final Map> pendingShardsEvictions = cacheService.pendingShardsEvictions(); + assertTrue(cacheService.isPendingShardEviction(shard)); + assertThat(pendingShardsEvictions, aMapWithSize(1)); + + final Future pendingShardEvictionFuture = pendingShardsEvictions.get(shard); + assertTrue(Files.exists(randomCacheFile.getFile())); + assertThat(pendingShardEvictionFuture, notNullValue()); + assertFalse(pendingShardEvictionFuture.isDone()); + + blockingListener.unblock(); + FutureUtils.get(waitForShardEvictionFuture); + + assertTrue(pendingShardEvictionFuture.isDone()); + FutureUtils.get(pendingShardEvictionFuture); + + cacheFilesAssociatedWithShard.forEach( + cacheFile -> assertFalse("Cache file should be evicted: " + cacheFile, Files.exists(cacheFile.getFile())) ); - cacheService.close(); + afterShardRecoveryCacheFiles.forEach( + (cacheFile, exists) -> assertFalse("Cache file should have been evicted after shard recovery: " + cacheFile, exists) + ); + assertThat(cacheService.pendingShardsEvictions(), aMapWithSize(0)); + + cacheService.stop(); + } + + private static class BlockingEvictionListener implements CacheFile.EvictionListener { + + private final CountDownLatch evictionLatch = new CountDownLatch(1); + private final CountDownLatch releaseLatch = new CountDownLatch(1); + + @Override + public void onEviction(CacheFile evictedCacheFile) { + try { + evictionLatch.countDown(); + releaseLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + evictedCacheFile.release(this); + } + } + + public void waitForBlock() { + try { + evictionLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + public void unblock() { + releaseLatch.countDown(); + } + } + + /** + * Picks up a random searchable snapshot shard from a list of existing cache files and builds a {@link ShardEviction} object from it. + * + * @param cacheFiles a list of existing cache files + * @return a random {@link ShardEviction} object + */ + private static ShardEviction randomShardEvictionFrom(List cacheFiles) { + return randomFrom(listOfShardEvictions(cacheFiles)); + } + + private static Set listOfShardEvictions(List cacheFiles) { + return cacheFiles.stream() + .map(CacheFile::getCacheKey) + .map(cacheKey -> new ShardEviction(cacheKey.getSnapshotUUID(), cacheKey.getSnapshotIndexName(), cacheKey.getShardId())) + .collect(Collectors.toSet()); + } + + private List filterByShard(ShardEviction shard, List cacheFiles) { + return cacheFiles.stream().filter(cacheFile -> shard.matches(cacheFile.getCacheKey())).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 454149ec3851c..cafcd77b2c6a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -9,38 +9,26 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile; -import org.elasticsearch.index.store.cache.CacheKey; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; -import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.SortedSet; import java.util.stream.Collectors; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals; -import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; @@ -140,20 +128,18 @@ public void testRepopulateCache() throws Exception { cacheService.setCacheSyncInterval(TimeValue.ZERO); cacheService.start(); - final List cacheFiles = generateRandomCacheFiles(cacheService); + final List cacheFiles = randomCacheFiles(cacheService); cacheService.synchronizeCache(); - if (cacheFiles.isEmpty() == false) { - final List removedCacheFiles = randomSubsetOf(cacheFiles); - for (CacheFile removedCacheFile : removedCacheFiles) { - if (randomBoolean()) { - // evict cache file from the cache - cacheService.removeFromCache(removedCacheFile.getCacheKey()); - } else { - IOUtils.rm(removedCacheFile.getFile()); - } - cacheFiles.remove(removedCacheFile); + final List removedCacheFiles = randomSubsetOf(cacheFiles); + for (CacheFile removedCacheFile : removedCacheFiles) { + if (randomBoolean()) { + // evict cache file from the cache + cacheService.removeFromCache(removedCacheFile.getCacheKey()); + } else { + IOUtils.rm(removedCacheFile.getFile()); } + cacheFiles.remove(removedCacheFile); } cacheService.stop(); @@ -172,7 +158,7 @@ public void testCleanUp() throws Exception { final List cacheFiles; try (CacheService cacheService = defaultCacheService()) { cacheService.start(); - cacheFiles = generateRandomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); + cacheFiles = randomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); if (randomBoolean()) { cacheService.synchronizeCache(); } @@ -186,50 +172,4 @@ public void testCleanUp() throws Exception { PersistentCache.cleanUp(nodeSettings, nodeEnvironment); assertTrue(cacheFiles.stream().noneMatch(Files::exists)); } - - /** - * Generates 1 or more cache files using the specified {@link CacheService}. - */ - private List generateRandomCacheFiles(CacheService cacheService) throws Exception { - final byte[] buffer = new byte[1024]; - Arrays.fill(buffer, (byte) 0xff); - - final List cacheFiles = new ArrayList<>(); - for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { - final String snapshotUUID = UUIDs.randomBase64UUID(random()); - for (int indices = 0; indices < between(1, 2); indices++) { - IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - for (int shards = 0; shards < between(1, 2); shards++) { - ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); - - final Path cacheDir = Files.createDirectories( - CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotUUID) - ); - - for (int files = 0; files < between(1, 2); files++) { - final CacheKey cacheKey = new CacheKey(snapshotUUID, indexId.getName(), shardId, "file_" + files); - final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); - - final CacheFile.EvictionListener listener = evictedCacheFile -> {}; - cacheFile.acquire(listener); - try { - SortedSet> ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { - try { - channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - if (ranges.isEmpty() == false) { - cacheFiles.add(cacheFile); - } - } finally { - cacheFile.release(listener); - } - } - } - } - } - return cacheFiles; - } }