Skip to content

Commit

Permalink
Make searchable snapshots cache persistent (#65725)
Browse files Browse the repository at this point in the history
The searchable snapshots cache implemented in 7.10 is 
not persisted across node restarts, forcing data nodes to 
download files from the snapshot repository again once 
the node is restarted.

This commit introduces a new Lucene index that is used 
to store information about cache files. The information 
about cache files are periodically updated and committed 
in this index as part of the cache synchronization task 
added in #64696. When the data node starts the Lucene 
index is used to load in memory the cache files information; 
these information are then used to repopulate the searchable 
snapshots cache with the cache files that exist on disk.

Since data nodes can have one or more data paths, this 
change introduces a Lucene index per data path. Information 
about cache files are updated in the Lucene index located 
on the same data path of the cache files.
  • Loading branch information
tlrx authored Dec 14, 2020
1 parent 1ced913 commit 672972c
Show file tree
Hide file tree
Showing 14 changed files with 1,014 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,88 +4,41 @@
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.apache.lucene.document.Document;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase {

private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider;

@BeforeClass
public static void installDeleteBlockingFileSystemProvider() {
FileSystem current = PathUtils.getDefaultFileSystem();
deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current);
PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null));
}

@AfterClass
public static void removeDeleteBlockingFileSystemProvider() {
PathUtilsForTesting.teardown();
}

void startBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(true);
}

void stopBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(false);
}

private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider {

AtomicBoolean injectFailures = new AtomicBoolean();

DeleteBlockingFileSystemProvider(FileSystem inner) {
super("deleteblocking://", inner);
}

@Override
public boolean deleteIfExists(Path path) throws IOException {
if (injectFailures.get()) {
throw new IOException("blocked deletion of " + path);
} else {
return super.deleteIfExists(path);
}
}

}
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand All @@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testCacheDirectoriesRemovedOnStartup() throws Exception {
public void testCacheSurviveRestart() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand All @@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName();

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
mountSnapshot(
fsRepoName,
snapshotName,
indexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(),
Strings.EMPTY_ARRAY,
true
restoredIndexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()
);

final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen(restoredIndexName);

final Index restoredIndex = client().admin()
Expand All @@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
.getIndex();

final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex);
final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath());
final ShardPath shardPath = indexService.getShard(0).shardPath();
final Path shardCachePath = CacheService.getShardCachePath(shardPath);

assertTrue(Files.isDirectory(shardCachePath));
final Set<Path> cacheFiles = new HashSet<>();
try (DirectoryStream<Path> snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) {
Expand All @@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
}
assertFalse("no cache files found", cacheFiles.isEmpty());

startBlockingDeletes();
CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

PersistentCache persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
assertTrue(Files.isDirectory(shardCachePath));
for (Path cacheFile : cacheFiles) {
assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile));
try {
assertTrue(Files.isDirectory(shardCachePath));

final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath());
assertTrue(Files.isDirectory(persistentCacheIndexDir));

final Map<String, Document> documents = PersistentCache.loadDocuments(persistentCacheIndexDir);
assertThat(documents.size(), equalTo(cacheFiles.size()));

for (Path cacheFile : cacheFiles) {
final String cacheFileName = cacheFile.getFileName().toString();
assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile));
assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue());
}
} catch (IOException e) {
throw new AssertionError(e);
}
stopBlockingDeletes();
return Settings.EMPTY;
}
});

persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
ensureGreen(restoredIndexName);

for (Path cacheFile : cacheFiles) {
assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile));
}
cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ FileChannel getChannel() {
return reference == null ? null : reference.fileChannel;
}

// Only used in tests
SortedSet<Tuple<Long, Long>> getCompletedRanges() {
return tracker.getCompletedRanges();
}

public void acquire(final EvictionListener listener) throws IOException {
assert listener != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.nio.file.Path;

import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
Expand All @@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe
this.cacheService = cacheService;
}

/**
* Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard
* snapshot information that contains the list of shard's Lucene files.
*
* @param indexShard the shard that is about to recover
* @param indexSettings the shard's index settings
*/
@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand All @@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS
}

private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory());
assert directory != null;
final StepListener<Void> preWarmListener = new StepListener<>();
final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner;
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
Expand Down Expand Up @@ -210,12 +210,7 @@ public Collection<Object> createComponents(
this.threadPool.set(threadPool);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(
settings,
clusterService,
threadPool,
new NodeEnvironmentCacheCleaner(nodeEnvironment)
);
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
this.cacheService.set(cacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
Expand All @@ -226,6 +221,8 @@ public Collection<Object> createComponents(
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
} else {
PersistentCache.cleanUp(settings, nodeEnvironment);
}
return Collections.unmodifiableList(components);
}
Expand Down
Loading

0 comments on commit 672972c

Please sign in to comment.