Skip to content

Commit

Permalink
Improve shards evictions in searchable snapshot cache service (elasti…
Browse files Browse the repository at this point in the history
…c#67160)

The searchable snapshot's cache service is notified when cache files
of a specific shard must be evicted. The notifications are usually done
in a cluster state applier thread that calls the CacheService#
markShardAsEvictedInCache method.

The markShardAsEvictedInCache adds the shard to an internal set
of ShardEviction and submits the eviction of the shard to the generic
 thread pool. Because there's nothing preventing the cache service
(and persistent cache service) to be closed before all shared evictions
are processed, it is possible that invalidating a cache file fails and trips
an assertion (as it happened in many tests failures recently elastic#66958, elastic#66730).

This commit changes the CacheService so that it now waits for the evictions
of shards to complete before closing the cache and persistent cache services.
  • Loading branch information
tlrx committed Jan 14, 2021
1 parent 24f8a79 commit 420de18
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public void testCacheSurviveRestart() throws Exception {
}
assertFalse("no cache files found", cacheFiles.isEmpty());

CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
final CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

PersistentCache persistentCache = cacheService.getPersistentCache();
final PersistentCache persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
Expand All @@ -142,21 +142,19 @@ public Settings onNodeStopped(String nodeName) {
}
});

cacheService = internalCluster().getInstance(CacheService.class, dataNode);
persistentCache = cacheService.getPersistentCache();
final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode);
final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache();
ensureGreen(restoredIndexName);

cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
assertThat("Cache files should be repopulated in cache", persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size()));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> {
cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)));
assertTrue(internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache().hasDeletions());
cacheServiceAfterRestart.synchronizeCache();
assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L));
});
cacheService.synchronizeCache();

assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> pr
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
cleanExistingCacheFiles();
waitForPendingEvictions();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache(preWarmListener);
}
Expand Down Expand Up @@ -428,12 +428,12 @@ private void cleanExistingRegularShardFiles() {
}

/**
* Evicts all cache files associated to the current searchable snapshot shard in case a
* Waits for the eviction of cache files associated with the current searchable snapshot shard to be processed in case a
* previous instance of that same shard has been marked as evicted on this node.
*/
private void cleanExistingCacheFiles() {
private void waitForPendingEvictions() {
assert Thread.holdsLock(this);
cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache);
cacheService.waitForCacheFilesEvictionIfNeeded(snapshotId.getUUID(), indexId.getName(), shardId);
}

private void prewarmCache(ActionListener<Void> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;

import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {
Expand Down Expand Up @@ -116,14 +112,8 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
shardId
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;

/**
* This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If
Expand Down Expand Up @@ -62,14 +58,8 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
shardId
);
}
Expand Down
Loading

0 comments on commit 420de18

Please sign in to comment.