Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SearchableSnapshotDirectory should not evict cache files when closed #66173

26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -650,6 +651,31 @@ public void remove() {
};
}

/**
* Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations
* that occurs within the same cache segment by acquiring the segment's read lock during all the iteration. As such, the specified
* consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer
* but modification that occur in non yet traveled segments should be.
*
* @param consumer the {@link Consumer}
*/
public void forEach(BiConsumer<K, V> consumer) {
for (CacheSegment<K, V> segment : segments) {
try (ReleasableLock ignored = segment.readLock.acquire()) {
for (CompletableFuture<Entry<K, V>> future : segment.map.values()) {
try {
if (future != null && future.isDone()) {
final Entry<K, V> entry = future.get();
consumer.accept(entry.key, entry.value);
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
}

private class CacheIterator implements Iterator<Entry<K, V>> {
private Entry<K, V> current;
private Entry<K, V> next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.indices.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -31,7 +29,6 @@

public class CompositeIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener {

private static final Logger logger = LogManager.getLogger(CompositeIndexFoldersDeletionListener.class);
private final List<IndexStorePlugin.IndexFoldersDeletionListener> listeners;

public CompositeIndexFoldersDeletionListener(List<IndexStorePlugin.IndexFoldersDeletionListener> listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
Expand Down Expand Up @@ -213,6 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> pr
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
cleanExistingCacheFiles();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache(preWarmListener);
}
Expand Down Expand Up @@ -330,9 +330,6 @@ private static UnsupportedOperationException unsupportedException() {
public final void close() {
if (closed.compareAndSet(false, true)) {
isOpen = false;
// Ideally we could let the cache evict/remove cached files by itself after the
// directory has been closed.
clearCache();
}
}

Expand Down Expand Up @@ -416,6 +413,15 @@ private void cleanExistingRegularShardFiles() {
}
}

/**
* Evicts all cache files associated to the current searchable snapshot shard in case a
* previous instance of that same shard has been marked as evicted on this node.
*/
private void cleanExistingCacheFiles() {
assert Thread.holdsLock(this);
cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache);
}

private void prewarmCache(ActionListener<Void> listener) {
if (prewarmCache == false) {
recoveryState.setPreWarmComplete();
Expand Down Expand Up @@ -566,7 +572,6 @@ public static Directory create(

final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID());
Files.createDirectories(cacheDir);
assert assertCacheIsEmpty(cacheDir);

return new InMemoryNoOpCommitDirectory(
new SearchableSnapshotDirectory(
Expand All @@ -587,17 +592,6 @@ public static Directory create(
);
}

private static boolean assertCacheIsEmpty(Path cacheDir) {
try (DirectoryStream<Path> cacheDirStream = Files.newDirectoryStream(cacheDir)) {
final Set<Path> cacheFiles = new HashSet<>();
cacheDirStream.forEach(cacheFiles::add);
assert cacheFiles.isEmpty() : "should start with empty cache, but found " + cacheFiles;
} catch (IOException e) {
assert false : e;
}
return true;
}

public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
while (dir != null) {
if (dir instanceof SearchableSnapshotDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
Expand All @@ -20,16 +24,31 @@
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final @Nullable CacheService cacheService;

public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheService cacheService) {
assert cacheService != null || DiscoveryNode.isDataNode(settings) == false;
this.cacheService = cacheService;
}

@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand Down Expand Up @@ -78,4 +97,35 @@ private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}

@Override
public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) {
if (cacheService != null && shouldEvictCacheFiles(reason)) {
final IndexSettings indexSettings = indexService.getIndexSettings();
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (IndexShard indexShard : indexService) {
final ShardId shardId = indexShard.shardId();

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
}
}

private static boolean shouldEvictCacheFiles(IndexRemovalReason reason) {
return reason == IndexRemovalReason.DELETED
|| reason == IndexRemovalReason.NO_LONGER_ASSIGNED
|| reason == IndexRemovalReason.FAILURE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;

/**
* This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If
* the index (or the shard) is a backed by a snapshot this listener notifies the {@link CacheService} that the cache files associated to the
* shard(s) must be evicted.
*/
public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final Supplier<CacheService> cacheService;

public SearchableSnapshotIndexFoldersDeletionListener(Supplier<CacheService> cacheService) {
this.cacheService = Objects.requireNonNull(cacheService);
}

@Override
public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (int shard = 0; shard < indexSettings.getNumberOfShards(); shard++) {
markShardAsEvictedInCache(new ShardId(index, shard), indexSettings);
}
}
}

@Override
public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
markShardAsEvictedInCache(shardId, indexSettings);
}
}

private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) {
final CacheService cacheService = this.cacheService.get();
assert cacheService != null : "cache service not initialized";

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,19 @@ public Collection<Object> createComponents(
@Override
public void onIndexModule(IndexModule indexModule) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(settings, cacheService.get()));
indexModule.addIndexEventListener(failShardsListener.get());
}
}

@Override
public List<IndexFoldersDeletionListener> getIndexFoldersDeletionListeners() {
if (DiscoveryNode.isDataNode(settings)) {
return List.of(new SearchableSnapshotIndexFoldersDeletionListener(cacheService::get));
}
return List.of();
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(new SystemIndexDescriptor(SNAPSHOT_BLOB_CACHE_INDEX, "Contains cached data of blob store repositories"));
Expand Down
Loading