Skip to content

Commit

Permalink
SearchableSnapshotDirectory should not evict cache files when closed (e…
Browse files Browse the repository at this point in the history
…lastic#66173)

This commit changes the SearchableSnapshotDirectory so
that it does not evict all its cache files at closing time, but
instead delegates this work to the CacheService.

This change is motivated by the fact that Lucene directories
are closed as the consequence of applying a new cluster
state and as such the closing is executed within the cluster
state applier thread; and we want to minimize disk IO
operations in such thread (like deleting a lot of evicted
cache files). It is also motivated by the future of the
searchable snapshot cache which should become persistent.

This change is built on top of the existing
SearchableSnapshotIndexEventListener and a new
 SearchableSnapshotIndexFoldersDeletionListener
(see elastic#65926) that are used to detect when a searchable
snapshot index (or searchable snapshot shard) is
removed from a data node.

When such a thing happens, the listeners notify the
 CacheService that maintains an internal list of removed
shards. This list is used to evict the cache files associated
to these shards as soon as possible (but not in the cluster
state applier thread) or right before the same searchable
snapshot shard is being built again on the same node.

In other situations like opening/closing a searchable
snapshot shard then the cache files are not evicted
anymore and should be reused.
  • Loading branch information
tlrx committed Dec 14, 2020
1 parent f0e2e08 commit 1bf2274
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 17 deletions.
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -649,6 +650,31 @@ public void remove() {
};
}

/**
* Performs an action for each cache entry in the cache. While iterating over the cache entries this method is protected from mutations
* that occurs within the same cache segment by acquiring the segment's read lock during all the iteration. As such, the specified
* consumer should not try to modify the cache. Modifications that occur in already traveled segments won't been seen by the consumer
* but modification that occur in non yet traveled segments should be.
*
* @param consumer the {@link Consumer}
*/
public void forEach(BiConsumer<K, V> consumer) {
for (CacheSegment<K, V> segment : segments) {
try (ReleasableLock ignored = segment.readLock.acquire()) {
for (CompletableFuture<Entry<K, V>> future : segment.map.values()) {
try {
if (future != null && future.isDone()) {
final Entry<K, V> entry = future.get();
consumer.accept(entry.key, entry.value);
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
}

private class CacheIterator implements Iterator<Entry<K, V>> {
private Entry<K, V> current;
private Entry<K, V> next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
Expand Down Expand Up @@ -213,6 +212,7 @@ public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> pr
this.snapshot = snapshotSupplier.get();
this.loaded = true;
cleanExistingRegularShardFiles();
cleanExistingCacheFiles();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache(preWarmListener);
}
Expand Down Expand Up @@ -330,9 +330,6 @@ private static UnsupportedOperationException unsupportedException() {
public final void close() {
if (closed.compareAndSet(false, true)) {
isOpen = false;
// Ideally we could let the cache evict/remove cached files by itself after the
// directory has been closed.
clearCache();
}
}

Expand Down Expand Up @@ -416,6 +413,15 @@ private void cleanExistingRegularShardFiles() {
}
}

/**
* Evicts all cache files associated to the current searchable snapshot shard in case a
* previous instance of that same shard has been marked as evicted on this node.
*/
private void cleanExistingCacheFiles() {
assert Thread.holdsLock(this);
cacheService.runIfShardMarkedAsEvictedInCache(snapshotId, indexId, shardId, this::clearCache);
}

private void prewarmCache(ActionListener<Void> listener) {
if (prewarmCache == false) {
recoveryState.setPreWarmComplete();
Expand Down Expand Up @@ -566,7 +572,6 @@ public static Directory create(

final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID());
Files.createDirectories(cacheDir);
assert assertCacheIsEmpty(cacheDir);

return new InMemoryNoOpCommitDirectory(
new SearchableSnapshotDirectory(
Expand All @@ -587,17 +592,6 @@ public static Directory create(
);
}

private static boolean assertCacheIsEmpty(Path cacheDir) {
try (DirectoryStream<Path> cacheDirStream = Files.newDirectoryStream(cacheDir)) {
final Set<Path> cacheFiles = new HashSet<>();
cacheDirStream.forEach(cacheFiles::add);
assert cacheFiles.isEmpty() : "should start with empty cache, but found " + cacheFiles;
} catch (IOException e) {
assert false : e;
}
return true;
}

public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
while (dir != null) {
if (dir instanceof SearchableSnapshotDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
Expand All @@ -20,16 +24,31 @@
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final @Nullable CacheService cacheService;

public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheService cacheService) {
assert cacheService != null || DiscoveryNode.isDataNode(settings) == false;
this.cacheService = cacheService;
}

@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand Down Expand Up @@ -78,4 +97,35 @@ private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}

@Override
public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) {
if (cacheService != null && shouldEvictCacheFiles(reason)) {
final IndexSettings indexSettings = indexService.getIndexSettings();
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (IndexShard indexShard : indexService) {
final ShardId shardId = indexShard.shardId();

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: {})", shardId, reason);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
}
}

private static boolean shouldEvictCacheFiles(IndexRemovalReason reason) {
return reason == IndexRemovalReason.DELETED
|| reason == IndexRemovalReason.NO_LONGER_ASSIGNED
|| reason == IndexRemovalReason.FAILURE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;

/**
* This {@link IndexStorePlugin.IndexFoldersDeletionListener} is called when an index folder or a shard folder is deleted from the disk. If
* the index (or the shard) is a backed by a snapshot this listener notifies the {@link CacheService} that the cache files associated to the
* shard(s) must be evicted.
*/
public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStorePlugin.IndexFoldersDeletionListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

private final Supplier<CacheService> cacheService;

public SearchableSnapshotIndexFoldersDeletionListener(Supplier<CacheService> cacheService) {
this.cacheService = Objects.requireNonNull(cacheService);
}

@Override
public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
for (int shard = 0; shard < indexSettings.getNumberOfShards(); shard++) {
markShardAsEvictedInCache(new ShardId(index, shard), indexSettings);
}
}
}

@Override
public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings())) {
markShardAsEvictedInCache(shardId, indexSettings);
}
}

private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) {
final CacheService cacheService = this.cacheService.get();
assert cacheService != null : "cache service not initialized";

logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
cacheService.markShardAsEvictedInCache(
new SnapshotId(
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings())
),
new IndexId(
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()),
SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings())
),
shardId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,19 @@ public Collection<Module> createGuiceModules() {
@Override
public void onIndexModule(IndexModule indexModule) {
if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) {
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(settings, cacheService.get()));
indexModule.addIndexEventListener(failShardsListener.get());
}
}

@Override
public List<IndexFoldersDeletionListener> getIndexFoldersDeletionListeners() {
if (DiscoveryNode.isDataNode(settings)) {
return List.of(new SearchableSnapshotIndexFoldersDeletionListener(cacheService::get));
}
return List.of();
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return org.elasticsearch.common.collect.List.of(
Expand Down
Loading

0 comments on commit 1bf2274

Please sign in to comment.